コンテンツにスキップ

Workersとの統合

Pub/Subの最も強力な機能の1つは、Cloudflare Workers — エッジで実行される強力なサーバーレス関数 — に接続し、ブローカーに公開されたすべてのメッセージをフィルタリング、集約、変更できる能力です。Workersは、これらのメッセージを他のソースにミラーリングすることもでき、Cloudflare R2ストレージ、外部データベース、またはCloudflareを超えた他のクラウドサービスに書き込むことができるため、受信メッセージのペイロードやデータをスケールで永続化または分析することが容易になります。

WorkerをPub/Subと統合する方法は3つあります:

  1. ブローカーに公開されたすべてのメッセージを受信する「On Publish」フックとして。これにより、Workerはメッセージを変更したり、他の宛先(R2KVなど)にコピーしたり、フィルタリングしたり、配信前にメッセージを削除したりできます。
  2. (ベータ版ではまだ利用できません) WorkerからPub/Subトピックに直接公開すること。WorkerコードからPub/Subトピックにテレメトリやイベントを公開できます。
  3. (ベータ版ではまだ利用できません) Worker内からPub/Subトピック(またはトピック)にサブスクライブすること。これにより、Workerは他のサブスクライバーと同様に機能し、外部クライアント(MQTT経由)または他のWorkersから公開されたメッセージを消費できます。

必要に応じて、これらの統合の1つ、複数、またはすべてを使用できます。

On-Publishフック

「On-Publish」フックは、メッセージがPub/Subブローカーに公開される際にフィルタリングおよび変更するための強力な方法です。

  • Workerは「ポストパブリッシュ」フックとして実行され、メッセージはブローカーによって受け入れられ、Workerに渡され、Workerが有効なHTTPレスポンスを返した後にのみ、トピックにサブスクライブしたクライアントにメッセージが送信されます。
  • Workerがレスポンスを返さない場合(意図的であれそうでなかれ)、またはHTTP 200以外のHTTPステータスコードを返す場合、メッセージは削除されます。
  • あなたのブローカーに公開されたすべてのPUBLISHメッセージ(パケット)はWorkerに送信されます。他のMQTTパケット(CONNECTやAUTHパケットなど)は、Pub/Subによって自動的に処理されます。

Workerをブローカーに接続する

WorkerをPub/SubブローカーにOn-Publishフックとして接続するには、次の手順が必要です:

  1. ブローカーからの受信POSTリクエストを処理するCloudflare Workerを作成する(または既存のWorkerを拡張する)。Workerの公開URLは、ブローカーがメッセージを送信するように設定するURLになります。
  2. on_publish.urlフィールドを設定して、ブローカーがWorkerにメッセージを送信するように構成します。
  3. 重要: ペイロードの署名を確認して、リクエストがあなたのPub/Subブローカーからのものであり、信頼できない第三者や他のブローカーからのものでないことを確認します
  4. メッセージ(HTTPリクエストペイロード)を必要に応じて検査または変更します!
  5. 正しく形成されたレスポンスでHTTP 200 OKを返し、これによりブローカーはメッセージをサブスクライバーに送信できます。

以下は、次のことを示すエンドツーエンドの例です:

  • Pub/Subからの受信リクエストを認証する(およびPub/Subからでないものを拒否する)
  • 特定のトピックのメッセージのペイロードを置き換える
  • メッセージをブローカーに返して、サブスクライバーに転送できるようにする

Workerが受信リクエストを検証できるようにするために、公開鍵を環境変数を介してWorkerに利用可能にする必要があります。そのために、ブローカーから公開鍵を取得できます:

Terminal window
wrangler pubsub broker public-keys YOUR_BROKER --namespace=NAMESPACE_NAME

以下の例に似た成功レスポンスを受け取るはずで、Workerから設定された公開鍵が含まれます:

"keys": [
{
"use": "sig",
"kty": "OKP",
"kid": "JDPuYJqHOvqzlakkNFQ9kfN7WsYs5uHndp_ziRdmOCU",
"crv": "Ed25519",
"alg": "EdDSA",
"x": "Phf82R8tG1FdY475-AgtlaWIwH1lLFlfWu5LrsKhyjw"
},
{
"use": "sig",
"kty": "OKP",
"kid": "qk7Z4hbN738v-m2CKdVaKTav9pU32MAaQXB2tDaQ-_o",
"crv": "Ed25519",
"alg": "EdDSA",
"x": "Bt4kQWcK_XhZP1ZxEflsoYbqaBm9rEDk_jNWPdhxwTI"
}
]

公開鍵の配列をwrangler.tomlに環境変数としてコピーします:

name = "my-pubsub-worker"
type = "javascript"
account_id = "<YOUR ACCOUNT_ID>"
workers_dev = true
# トップレベルの環境変数を定義します
# `[vars]`ブロックの下で
# `key = "value"`形式を使用します
[vars]
# これは私たちのWorker内でenv.BROKER_PUBLIC_KEYSを介してアクセス可能になります
# 生のJSONの周りに三重のシングルクォート(')を使用することに注意してください
BROKER_PUBLIC_KEYS = '''{
"keys": [
{
"use": "sig",
"kty": "OKP",
"kid": "JDPuYJqHOvqzlakkNFQ9kfN7WsYs5uHndp_ziRdmOCU",
"crv": "Ed25519",
"alg": "EdDSA",
"x": "Phf82R8tG1FdY475-AgtlaWIwH1lLFlfWu5LrsKhyjw"
},
{
"use": "sig",
"kty": "OKP",
"kid": "qk7Z4hbN738v-m2CKdVaKTav9pU32MAaQXB2tDaQ-_o",
"crv": "Ed25519",
"alg": "EdDSA",
"x": "Bt4kQWcK_XhZP1ZxEflsoYbqaBm9rEDk_jNWPdhxwTI"
}
]
}'''

BROKER_PUBLIC_KEYS環境変数が設定されると、Workerコード内でこれらにアクセスできるようになります。@cloudflare/pubsubパッケージを使用すると、受信リクエストをブローカーの公開鍵に対して認証できます。

@cloudflare/pubsubをインストールするには、npmまたはyarnを使用できます:

Terminal window
npm i @cloudflare/pubsub

@cloudflare/pubsubがインストールされると、isValidBrokerRequest関数とPubSubMessage型をWorkerコードに直接インポートできます:

// Cloudflare WorkerからPub/Subメッセージを消費し変換する方法を示す例。
/// <reference types="@cloudflare/workers-types" />
import { isValidBrokerRequest, PubSubMessage } from "@cloudflare/pubsub";
async function pubsub(
messages: Array<PubSubMessage>,
env: any,
ctx: ExecutionContext,
): Promise<Array<PubSubMessage>> {
// メッセージは高いスループットでバッチ処理される可能性があるため、
// 受信メッセージをループして必要に応じて処理します。
for (let msg of messages) {
console.log(msg);
// トピック「test/topic」のメッセージ内容を置き換えます - 簡単な例として
if (msg.topic.startsWith("test/topic")) {
msg.payload = `replaced text payload at ${Date.now()}`;
}
}
return messages;
}
const worker = {
async fetch(req, env, ctx): Promise<Response> {
// これはあなたのブローカーの「publicKey」フィールドから取得します。
//
// 各ブローカーには、あなたのブローカーと他のブローカーを区別するためのユニークな鍵があります。
// これらの鍵は環境変数に保存されます(/workers/configuration/environment-variables/)
// すべてのリクエストで取得する必要がないようにします。
let publicKeys = env.BROKER_PUBLIC_KEYS;
// 重要: 受信リクエストがあなたのブローカーからのものであることを検証する必要があります。
//
// 将来的には、WorkersはあなたのPub/Subブローカーと同じアカウントのWorkersのために
// これを代わりに行うことができるようになります。
if (await isValidBrokerRequest(req, publicKeys)) {
// PubSubメッセージを解析します
let incomingMessages: Array<PubSubMessage> = await req.json();
// メッセージをpubsubハンドラーに渡し、返された
// メッセージをキャプチャします。
let outgoingMessages = await pubsub(incomingMessages, env, ctx);
// メッセージを再シリアライズし、HTTP 200を返します。
// Content-Typeはオプションですが、必ず
// "application/octet-stream"または空にする必要があります。
return new Response(JSON.stringify(outgoingMessages), { status: 200 });
}
return new Response("not a valid Broker request", { status: 403 });
},
} satisfies ExportedHandler;
export default worker;

npx wrangler deployを使用してWorkerをデプロイしたら、ブローカーがWorkerを呼び出すように構成する必要があります。これは、ブローカーの--on-publish-url値をWorkerの_公開アクセス可能な_ URLに設定することで行います:

Terminal window
wrangler pubsub broker update YOUR_BROKER --namespace=NAMESPACE_NAME --on-publish-url="https://your.worker.workers.dev"
{
"id": "4c63fa30ee13414ba95be5b56d896fea",
"name": "example-broker",
"authType": "TOKEN",
"created_on": "2022-05-11T23:19:24.356324Z",
"modified_on": "2022-05-11T23:19:24.356324Z",
"expiration": null,
"endpoint": "mqtts://example-broker.namespace.cloudflarepubsub.com:8883",
"on_publish": {
"url": "https://your-worker.your-account.workers.dev"
}
}

これを設定すると、クライアントからブローカーに送信されたすべてのMQTT PUBLISHメッセージがWorkerに配信され、さらなる処理が行われます。私たちのウェブベースのライブデモを使用して、Workerがリクエストを正しく検証し、メッセージをインターセプトしているかどうかをテストできます。

他のHTTPS対応のエンドポイントもメッセージを転送する有効な宛先ですが、メッセージが必然的に公共のインターネットを通過する必要があるため、レイテンシが発生したり、メッセージ配信成功率が低下する可能性があります。

メッセージペイロード

以下は、WorkerにHTTP経由で送信されたPubSubメッセージの例です:

[
{
"mid": 0,
"broker": "my-broker.my-namespace.cloudflarepubsub.com",
"topic": "us/external/metrics/abc-456-def-123/request_count",
"clientId": "broker01G24VP1T3B51JJ0WJQJWCSY61",
"receivedAt": 1651578191,
"contentType": null,
"payloadFormatIndicator": 1,
"payload": "<payload>"
},
{
"mid": 1,
"broker": "my-broker.my-namespace.cloudflarepubsub.com",
"topic": "ap/external/metrics/abc-456-def-123/transactions_processed",
"clientId": "broker01G24VS053KYGNBBX8RH3T7CY5",
"receivedAt": 1651578193,
"contentType": null,
"payloadFormatIndicator": 1,
"payload": "<payload>"
}
]

メッセージごとのメタデータとTypeScriptサポート

Workerに配信されるメッセージ、またはWorkerから送信されるメッセージは、メッセージに関する追加のメタデータでラップされており、トピック、メッセージ形式、その他のプロパティをより簡単に検査できるようにしています。

このメタデータには以下が含まれます:

  • メッセージが関連付けられているbroker、これによりコードが複数のブローカーからのメッセージを区別できます
  • クライアントによって公開されたメッセージのtopicこれは読み取り専用です:Worker内でトピックを変更しようとすると無効となり、そのメッセージは削除されます
  • Pub/Subが最初にメッセージを解析しデシリアライズしたときに設定されるreceivedTimestamp
  • メッセージのmid(“message id”)。これは、Pub/SubがWorkerに送信されたメッセージを追跡するためのユニークなIDであり、削除されたメッセージを含みます。midフィールドは不変であり、変更されたり欠落したmidを返すと、メッセージが削除される可能性があります。

このメタデータは、@cloudflare/pubsubライブラリのPubSubMessageインターフェースとして表現されます。

PubSubMessage型は、将来的に追加のフィールドを含むように成長する可能性があり、コードが将来の変更から利益を得られるように、@cloudflare/pubsubをインポートすることをお勧めします(コピー&ペーストするのではなく)。

バッチ処理

On-Publish Workerに送信されるメッセージはバッチ処理される可能性があります:各バッチは1つ以上のPubSubMessageの配列です。

  • バッチ処理は、Workerに対する呼び出しの数を減らし、上流サービスに書き込む際にメッセージをより良く集約できるようにします。
  • Pub/Subのバッチ処理メカニズムは、パブリッシャーから同時に到着するメッセージをバッチ処理するように設計されており、数秒待つことはありません。
  • メッセージ配信のレイテンシを測定可能に増加させることはありません。

On-Publishのベストプラクティス

  • 必要なトピックのみを検査して、Workerが行う必要のある計算を減らします。
  • ストレージに書き込む必要がある場合やリモートサービスと通信する必要がある場合は、ctx.waitUntilを使用して、これらの操作が完了するのを待っている間にメッセージ配信のレイテンシを増加させないようにします。
  • try-catchを使用して例外をキャッチします - あなたのOn-Publishフックが「オープンに失敗」できる場合、例外が発生した場合にメッセージをブローカーに返すためにcatchブロックを使用して、メッセージが削除されないようにします。

Worker統合のトラブルシューティング

Workerがメッセージを処理しているときに、サブスクライブされたクライアントにメッセージが送信されない一般的な失敗モードには以下が含まれます:

  • 受信リクエストを正しく検証できない。これは、正しい公開鍵を使用していない場合(鍵は各ブローカーに固有)、鍵が不正な形式である場合、またはWorkerに環境変数を介して鍵を設定していない場合に発生する可能性があります。
  • HTTP 200レスポンスを返さない。その他のHTTPステータスコードはエラーとして解釈され、メッセージは削除されます。
  • 有効なContent-Typeを返さない。HTTPレスポンスヘッダーのContent-Typeはapplication/octet-streamである必要があります。
  • レスポンスを返すのに時間がかかりすぎる(10秒以上)。メッセージをブローカーに返した後に他の宛先に書き込む必要がある場合は、ctx.waitUntilを使用できます。
  • 無効または構造化されていないボディ、サイズ制限を超えるボディまたはペイロード、またはボディが全く返されない。

WorkerがHTTPリクエスト-レスポンスライフサイクルの「サーバー」として機能しているため、Workerからの無効なレスポンスは静かに失敗する可能性があります。ブローカーはもはやエラーレスポンスを返すことができません。