JavaScript APIs
Cloudflare QueuesはCloudflare Workersと統合されています。メッセージを送受信するには、Workerを使用する必要があります。
メッセージをQueueに送信できるWorkerはプロデューサーWorkerと呼ばれ、Queueからメッセージを受信できるWorkerはコンシューマーWorkerと呼ばれます。同じWorkerがプロデューサーとコンシューマーの両方になることも可能です。
将来的には、メッセージを送受信するためのHTTPエンドポイントなど、他のAPIのサポートを期待しています。バグを報告したり機能をリクエストするには、Cloudflare Community Forums ↗にアクセスしてください。フィードバックを提供するには、#queues-beta ↗ Discordチャンネルに行ってください。
これらのAPIは、プロデューサーWorkerがQueueにメッセージを送信することを可能にします。
Queueに単一のメッセージを書き込む例:
type Environment = { readonly MY_QUEUE: Queue;};
export default { async fetch(req: Request, env: Environment): Promise<Response> { await env.MY_QUEUE.send({ url: req.url, method: req.method, headers: Object.fromEntries(req.headers), }); return new Response('Sent!'); },};Queues APIは、複数のメッセージを一度に書き込むこともサポートしています:
const sendResultsToQueue = async (results: Array<any>, env: Environment) => { const batch: MessageSendRequest[] = results.map((value) => ({ body: JSON.stringify(value), })); await env.queue.sendBatch(batch);};プロデューサーがQueueにメッセージを送信するためのバインディング。
interface Queue<Body = unknown> { send(body: Body, options?: QueueSendOptions): Promise<void>; sendBatch(messages: Iterable<MessageSendRequest<Body>>, options?: QueueSendBatchOptions): Promise<void>;}-
send(bodyunknown, options?{ contentType?: QueuesContentType }):Promise<void>- メッセージをQueueに送信します。ボディは、構造的クローンアルゴリズム ↗でサポートされている任意のタイプで、サイズが128 KB未満である限り使用できます。
- プロミスが解決されると、メッセージがディスクに書き込まれたことが確認されます。
-
sendBatch(bodyIterable<MessageSendRequest<unknown>>):Promise<void>- メッセージのバッチをQueueに送信します。提供されたIterable ↗の各アイテムは、構造的クローンアルゴリズム ↗でサポートされている必要があります。バッチは最大100メッセージを含むことができますが、アイテムはそれぞれ128 KBに制限され、配列の合計サイズは256 KBを超えてはいけません。
- プロミスが解決されると、メッセージがディスクに書き込まれたことが確認されます。
メッセージバッチを送信するために使用されるラッパータイプ。
type MessageSendRequest<Body = unknown> = { body: Body; options?: QueueSendOptions;};-
bodyunknown- メッセージのボディ。
- ボディは、構造的クローンアルゴリズム ↗でサポートされている任意のタイプで、サイズが128 KB未満である限り使用できます。
-
optionsQueueSendOptions- 現在のメッセージに適用するオプションで、コンテンツタイプやメッセージの遅延設定を含みます。
メッセージをQueueに送信する際に適用されるオプションの設定。
-
contentTypeQueuesContentType- メッセージの明示的なコンテンツタイプで、ダッシュボードからメッセージをリストする機能で正しくプレビューできるようにします。オプションの引数です。
- 現在、このオプションは内部使用のためのものです。将来的には、
contentTypeが代替のコンシューマータイプによって使用され、メッセージをシリアライズされたものとして明示的にマークできるようになります。 - 可能な値についてはQueuesContentTypeを参照してください。
-
delaySecondsnumber- コンシューマーに配信される前に、Queue内でメッセージを遅延させるための秒数。
- 0から43200(12時間)の間の整数でなければなりません。この値をゼロに設定すると、Queueレベルでのグローバル(デフォルト)遅延があっても、メッセージの遅延が明示的に防止されます。
バッチのメッセージをQueueに送信する際に適用されるオプションの設定。
-
delaySecondsnumber- コンシューマーに配信される前に、Queue内でメッセージを遅延させるための秒数。
- 正の整数でなければなりません。
有効なメッセージコンテンツタイプを含むユニオンタイプ。
// デフォルト: jsontype QueuesContentType = "text" | "bytes" | "json" | "v8";"json"を使用して、JSONシリアライズ可能なJavaScriptオブジェクトを送信します。このコンテンツタイプはCloudflareダッシュボード ↗からプレビューできます。jsonコンテンツタイプはデフォルトです。"text"を使用して、Stringを送信します。このコンテンツタイプはダッシュボードからメッセージをリストする機能でプレビューできます。"bytes"を使用して、ArrayBufferを送信します。このコンテンツタイプはCloudflareダッシュボード ↗からプレビューできず、Base64エンコードされた形式で表示されます。"v8"を使用して、JSONシリアライズできないが構造的クローン ↗でサポートされているJavaScriptオブジェクト(例えばDateやMap)を送信します。このコンテンツタイプはCloudflareダッシュボード ↗からプレビューできず、Base64エンコードされた形式で表示されます。
無効なコンテンツタイプを指定するか、指定したコンテンツタイプがメッセージコンテンツのタイプと一致しない場合、送信操作はエラーで失敗します。
これらのAPIは、コンシューマーWorkerがQueueからメッセージを消費することを可能にします。
コンシューマーWorkerを定義するには、Workerのデフォルトエクスポートにqueue()関数を追加します。これにより、Queueからメッセージを受信できるようになります。
デフォルトでは、バッチ内のすべてのメッセージは、次のすべての条件が満たされるとすぐに確認されます:
queue()関数が返された。queue()関数がプロミスを返した場合、そのプロミスが解決された。waitUntil()に渡されたプロミスが解決された。
queue()関数が例外をスローするか、返されたプロミスまたはwaitUntil()に渡されたプロミスが拒否された場合、バッチ全体は失敗と見なされ、コンシューマーのリトライ設定に従って再試行されます。
export default { async queue( batch: MessageBatch, env: Environment, ctx: ExecutionContext ): Promise<void> { for (const message of batch.messages) { console.log('Received', message); } },};envおよびctxフィールドは、Workersドキュメントに記載されている通りです。
または、(非推奨の)サービスワーカー構文を使用して、キューコンシューマーを記述することもできます:
addEventListener('queue', (event) => { event.waitUntil(handleMessages(event));});サービスワーカー構文では、eventは、以下に定義されているMessageBatchと同じフィールドとメソッドを提供し、さらにwaitUntil() ↗を提供します。
コンシューマーWorkerに送信されるメッセージのバッチ。
interface MessageBatch<Body = unknown> { readonly queue: string; readonly messages: Message<Body>[]; ackAll(): void; retryAll(options?: QueueRetryOptions): void;}-
queuestring- このバッチに属するQueueの名前。
-
messagesMessage[]- バッチ内のメッセージの配列。メッセージの順序は最善の努力であり、公開された順序と正確に同じであることは保証されません。FIFO順序が保証されることに興味がある場合は、Queuesチームにメールを送信してください。
-
ackAll()voidqueue()コンシューマーハンドラーが成功裏に返されたかどうかに関係なく、すべてのメッセージを正常に配信されたとマークします。
-
retryAll(options?: QueueRetryOptions)void- すべてのメッセージを次のバッチで再試行するようにマークします。
- オプションの
optionsオブジェクトをサポートします。
コンシューマーWorkerに送信されるメッセージ。
interface Message<Body = unknown> { readonly id: string; readonly timestamp: Date; readonly body: Body; readonly attempts: number; ack(): void; retry(options?: QueueRetryOptions): void;}-
idstring- メッセージの一意のシステム生成ID。
-
timestampDate- メッセージが送信された時刻。
-
bodyunknown- メッセージのボディ。
- ボディは、構造的クローンアルゴリズム ↗でサポートされている任意のタイプで、サイズが128 KB未満である限り使用できます。
-
attemptsnumber- コンシューマーがこのメッセージを処理しようとした回数。1から始まります。
-
ack()void- メッセージを正常に配信されたとマークします。
queue()コンシューマーハンドラーが成功裏に返されたかどうかに関係なく。
- メッセージを正常に配信されたとマークします。
-
retry(options?: QueueRetryOptions)void- メッセージを次のバッチで再試行するようにマークします。
- オプションの
optionsオブジェクトをサポートします。
メッセージまたはメッセージのバッチを再試行のためにマークする際のオプション設定。
declare interface QueueRetryOptions { delaySeconds?: number;}-
delaySecondsnumber- コンシューマーに配信される前に、Queue内でメッセージを遅延させるための秒数。
- 正の整数でなければなりません。