コンテンツにスキップ

JavaScript APIs

Cloudflare QueuesはCloudflare Workersと統合されています。メッセージを送受信するには、Workerを使用する必要があります。

メッセージをQueueに送信できるWorkerはプロデューサーWorkerと呼ばれ、Queueからメッセージを受信できるWorkerはコンシューマーWorkerと呼ばれます。同じWorkerがプロデューサーとコンシューマーの両方になることも可能です。

将来的には、メッセージを送受信するためのHTTPエンドポイントなど、他のAPIのサポートを期待しています。バグを報告したり機能をリクエストするには、Cloudflare Community Forumsにアクセスしてください。フィードバックを提供するには、#queues-beta Discordチャンネルに行ってください。

プロデューサー

これらのAPIは、プロデューサーWorkerがQueueにメッセージを送信することを可能にします。

Queueに単一のメッセージを書き込む例:

type Environment = {
readonly MY_QUEUE: Queue;
};
export default {
async fetch(req: Request, env: Environment): Promise<Response> {
await env.MY_QUEUE.send({
url: req.url,
method: req.method,
headers: Object.fromEntries(req.headers),
});
return new Response('Sent!');
},
};

Queues APIは、複数のメッセージを一度に書き込むこともサポートしています:

const sendResultsToQueue = async (results: Array<any>, env: Environment) => {
const batch: MessageSendRequest[] = results.map((value) => ({
body: JSON.stringify(value),
}));
await env.queue.sendBatch(batch);
};

Queue

プロデューサーがQueueにメッセージを送信するためのバインディング。

interface Queue<Body = unknown> {
send(body: Body, options?: QueueSendOptions): Promise<void>;
sendBatch(messages: Iterable<MessageSendRequest<Body>>, options?: QueueSendBatchOptions): Promise<void>;
}
  • send(bodyunknown, options?{ contentType?: QueuesContentType }): Promise<void>

    • メッセージをQueueに送信します。ボディは、構造的クローンアルゴリズムでサポートされている任意のタイプで、サイズが128 KB未満である限り使用できます。
    • プロミスが解決されると、メッセージがディスクに書き込まれたことが確認されます。
  • sendBatch(bodyIterable<MessageSendRequest<unknown>>): Promise<void>

    • メッセージのバッチをQueueに送信します。提供されたIterableの各アイテムは、構造的クローンアルゴリズムでサポートされている必要があります。バッチは最大100メッセージを含むことができますが、アイテムはそれぞれ128 KBに制限され、配列の合計サイズは256 KBを超えてはいけません。
    • プロミスが解決されると、メッセージがディスクに書き込まれたことが確認されます。

MessageSendRequest

メッセージバッチを送信するために使用されるラッパータイプ。

type MessageSendRequest<Body = unknown> = {
body: Body;
options?: QueueSendOptions;
};
  • body unknown

  • optionsQueueSendOptions
    • 現在のメッセージに適用するオプションで、コンテンツタイプやメッセージの遅延設定を含みます。

QueueSendOptions

メッセージをQueueに送信する際に適用されるオプションの設定。

  • contentTypeQueuesContentType
    • メッセージの明示的なコンテンツタイプで、ダッシュボードからメッセージをリストする機能で正しくプレビューできるようにします。オプションの引数です。
    • 現在、このオプションは内部使用のためのものです。将来的には、contentTypeが代替のコンシューマータイプによって使用され、メッセージをシリアライズされたものとして明示的にマークできるようになります。
    • 可能な値についてはQueuesContentTypeを参照してください。
  • delaySecondsnumber
    • コンシューマーに配信される前に、Queue内でメッセージを遅延させるための秒数。
    • 0から43200(12時間)の間の整数でなければなりません。この値をゼロに設定すると、Queueレベルでのグローバル(デフォルト)遅延があっても、メッセージの遅延が明示的に防止されます。

QueueSendBatchOptions

バッチのメッセージをQueueに送信する際に適用されるオプションの設定。

  • delaySecondsnumber

QueuesContentType

有効なメッセージコンテンツタイプを含むユニオンタイプ。

// デフォルト: json
type QueuesContentType = "text" | "bytes" | "json" | "v8";
  • "json"を使用して、JSONシリアライズ可能なJavaScriptオブジェクトを送信します。このコンテンツタイプはCloudflareダッシュボードからプレビューできます。jsonコンテンツタイプはデフォルトです。
  • "text"を使用して、Stringを送信します。このコンテンツタイプはダッシュボードからメッセージをリストする機能でプレビューできます。
  • "bytes"を使用して、ArrayBufferを送信します。このコンテンツタイプはCloudflareダッシュボードからプレビューできず、Base64エンコードされた形式で表示されます。
  • "v8"を使用して、JSONシリアライズできないが構造的クローンでサポートされているJavaScriptオブジェクト(例えばDateMap)を送信します。このコンテンツタイプはCloudflareダッシュボードからプレビューできず、Base64エンコードされた形式で表示されます。

無効なコンテンツタイプを指定するか、指定したコンテンツタイプがメッセージコンテンツのタイプと一致しない場合、送信操作はエラーで失敗します。

コンシューマー

これらのAPIは、コンシューマーWorkerがQueueからメッセージを消費することを可能にします。

コンシューマーWorkerを定義するには、Workerのデフォルトエクスポートにqueue()関数を追加します。これにより、Queueからメッセージを受信できるようになります。

デフォルトでは、バッチ内のすべてのメッセージは、次のすべての条件が満たされるとすぐに確認されます:

  1. queue()関数が返された。
  2. queue()関数がプロミスを返した場合、そのプロミスが解決された。
  3. waitUntil()に渡されたプロミスが解決された。

queue()関数が例外をスローするか、返されたプロミスまたはwaitUntil()に渡されたプロミスが拒否された場合、バッチ全体は失敗と見なされ、コンシューマーのリトライ設定に従って再試行されます。

export default {
async queue(
batch: MessageBatch,
env: Environment,
ctx: ExecutionContext
): Promise<void> {
for (const message of batch.messages) {
console.log('Received', message);
}
},
};

envおよびctxフィールドは、Workersドキュメントに記載されている通りです。

または、(非推奨の)サービスワーカー構文を使用して、キューコンシューマーを記述することもできます:

addEventListener('queue', (event) => {
event.waitUntil(handleMessages(event));
});

サービスワーカー構文では、eventは、以下に定義されているMessageBatchと同じフィールドとメソッドを提供し、さらにwaitUntil()を提供します。

MessageBatch

コンシューマーWorkerに送信されるメッセージのバッチ。

interface MessageBatch<Body = unknown> {
readonly queue: string;
readonly messages: Message<Body>[];
ackAll(): void;
retryAll(options?: QueueRetryOptions): void;
}
  • queue string

    • このバッチに属するQueueの名前。
  • messages Message[]

    • バッチ内のメッセージの配列。メッセージの順序は最善の努力であり、公開された順序と正確に同じであることは保証されません。FIFO順序が保証されることに興味がある場合は、Queuesチームにメールを送信してください
  • ackAll() void

    • queue()コンシューマーハンドラーが成功裏に返されたかどうかに関係なく、すべてのメッセージを正常に配信されたとマークします。
  • retryAll(options?: QueueRetryOptions) void

    • すべてのメッセージを次のバッチで再試行するようにマークします。
    • オプションのoptionsオブジェクトをサポートします。

Message

コンシューマーWorkerに送信されるメッセージ。

interface Message<Body = unknown> {
readonly id: string;
readonly timestamp: Date;
readonly body: Body;
readonly attempts: number;
ack(): void;
retry(options?: QueueRetryOptions): void;
}
  • id string

    • メッセージの一意のシステム生成ID。
  • timestamp Date

    • メッセージが送信された時刻。
  • body unknown

  • attempts number

    • コンシューマーがこのメッセージを処理しようとした回数。1から始まります。
  • ack() void

    • メッセージを正常に配信されたとマークします。queue()コンシューマーハンドラーが成功裏に返されたかどうかに関係なく。
  • retry(options?: QueueRetryOptions) void

    • メッセージを次のバッチで再試行するようにマークします。
    • オプションのoptionsオブジェクトをサポートします。

QueueRetryOptions

メッセージまたはメッセージのバッチを再試行のためにマークする際のオプション設定。

declare interface QueueRetryOptions {
delaySeconds?: number;
}
  • delaySecondsnumber