コンテンツにスキップ

キューの仕組み

Cloudflare Queuesは、非同期処理のためにメッセージをキューに追加できる柔軟なメッセージキューです。メッセージキューは、eコマースサイトのチェックアウトや注文処理サービスのように、アプリケーションのコンポーネントを分離するのに優れています。分離されたサービスは、理解しやすく、デプロイしやすく、実装しやすいため、複雑なデプロイメントの同期を心配することなく、顧客を喜ばせる機能を提供できます。キューは、下流のサービスやAPIへの呼び出しをバッチ処理およびバッファリングすることも可能にします。

キューを理解するための4つの主要な概念があります:

  1. キュー
  2. プロデューサー
  3. コンシューマー
  4. メッセージ

キューとは

キューは、メッセージが書き込まれると自動的にスケールするバッファまたはリストであり、コンシューマーワーカーがその同じキューからメッセージを取得できるようにします。

キューは信頼性を持つように設計されており、キューに書き込まれたメッセージは、書き込みが成功した後は決して失われることはありません。同様に、メッセージはコンシューマーがメッセージを正常に消費するまでキューから削除されることはありません。

キューは、メッセージが公開された順序でコンシューマーに配信されることを保証しません。

開発者は複数のキューを作成できます。複数のキューを作成することは、以下のように役立ちます:

  • 異なるユースケースや処理要件を分離する:たとえば、ログキューとパスワードリセットキュー。
  • 複数のキューを使用して全体のスループット(秒あたりのメッセージ)を水平スケールする。
  • キューに接続された各コンシューマーのために異なるバッチ処理戦略を構成する。

ほとんどのアプリケーションでは、各キューに対して1つのプロデューサーワーカーと、そのキューからメッセージを消費する1つのコンシューマーワーカーを持つことで、各キューの処理を論理的に分離できます。

プロデューサー

プロデューサーは、キューにメッセージを公開または生成するクライアントを指す用語です。プロデューサーは、バインディングを使用してキューをワーカーにバインドし、そのバインディングを呼び出すことでキューにメッセージを書き込むことによって構成されます。

たとえば、my-first-queueという名前のキューをMY_FIRST_QUEUEのバインディングにバインドした場合、バインディングのsend()を呼び出すことでメッセージをキューに書き込むことができます:

type Environment = {
readonly MY_FIRST_QUEUE: Queue;
};
export default {
async fetch(req, env, context): Promise<Response> {
let message = {
url: req.url,
method: req.method,
headers: Object.fromEntries(req.headers),
};
await env.MY_FIRST_QUEUE.send(message); // 送信が失敗した場合は例外がスローされます
},
} satisfies ExportedHandler<Environment>;

キューには複数のプロデューサーワーカーを持つことができます。たとえば、ユーザーからのHTTPリクエストに基づいて、共有キューにイベントやログを書き込む複数のプロデューサーワーカーがあるかもしれません。単一のキューに書き込むことができるプロデューサーワーカーの総数に制限はありません。

さらに、複数のキューを単一のワーカーにバインドすることもできます。その単一のワーカーは、コード内で定義した任意のロジックに基づいて、どのキューに書き込むか(または複数のキューに書き込むか)を決定できます。

コンテンツタイプ

キューに公開されるメッセージは、コンシューマーとの相互運用性に応じて異なる形式で公開できます。デフォルトのコンテンツタイプはjsonであり、これはJSON.stringify()に渡すことができる任意のオブジェクトが受け入れられることを意味します。

コンテンツタイプを明示的に設定するか、代替のコンテンツタイプを指定するには、キューのsend()メソッドにcontentTypeオプションを渡します:

type Environment = {
readonly MY_FIRST_QUEUE: Queue;
};
export default {
async fetch(req, env): Promise<Response> {
let message = {
url: req.url,
method: req.method,
headers: Object.fromEntries(req.headers),
};
try {
await env.MY_FIRST_QUEUE.send(message, { contentType: "json" }); // "json"がデフォルト
} catch (e) {
// 送信が失敗した場合のケースをキャッチします。コンテンツタイプが一致しない場合も含まれます
console.log(e)
return Response.json({"msg": e}, { status: 500 })
}
},
} satisfies ExportedHandler<Environment>;

キューに書き込む際に単純な文字列のみを受け入れるには、代わりに{ contentType: "text" }を設定します:

try {
// キューに非文字列(ネイティブJavaScriptオブジェクトやArrayBufferなど)を渡すと例外(エラー)がスローされます。
await env.MY_FIRST_QUEUE.send("hello there", { contentType: "text" }); // 明示的に'text'を設定
} catch (e) {
console.log(e)
return Response.json({"msg": e}, { status: 500 })

QueuesContentType APIドキュメントでは、各形式がキューにどのようにシリアライズされるかについて説明しています。

コンシューマー

キューは2種類のコンシューマーをサポートしています:

  1. コンシューマーワーカー、プッシュベース:キューにメッセージが配信されるとワーカーが呼び出されます。
  2. HTTPプルコンシューマー、プルベース:コンシューマーがHTTP経由でキューエンドポイントを呼び出してメッセージを受信し、確認します。

キューには1種類のコンシューマーのみを構成できます。

コンシューマーワーカーの作成

コンシューマーは、キューからメッセージを購読または消費するクライアントを指す用語です。最も基本的な形では、コンシューマーはワーカー内にqueueハンドラーを作成することで定義されます:

export default {
async queue(batch: MessageBatch<Error>, env: Environment): Promise<void> {
// バッチ内のメッセージで何かを行う
// つまり、R2ストレージ、D1データベースに書き込む、または外部APIにPOSTする
// バッチ.messagesをループして各メッセージを反復処理することもできます
},
};

次に、そのコンシューマーをwrangler queues consumer <queue-name> <worker-script-name>を使用してキューに接続するか、wrangler.toml内に[[queues.consumers]]構成を手動で定義します:

[[queues.consumers]]
queue = "<your-queue-name>"
max_batch_size = 100 # オプション
max_batch_timeout = 30 # オプション

重要なことに、各キューには1つのアクティブなコンシューマーのみが存在できます。これにより、Cloudflare Queuesは少なくとも1回の配信を実現し、重複メッセージのリスクを最小限に抑えることができます。

特に、同じコンシューマーを複数のキューで使用することができます。コンシューマーワーカーを定義するキューハンドラーは、接続されているキューによって呼び出されます。

  • queueハンドラーに渡されるMessageBatchには、バッチが読み取られたキューの名前を持つqueueプロパティが含まれています。
  • これにより、書く必要のあるコードの量を減らし、キューの名前に基づいてメッセージを処理できます。

たとえば、複数のキューからメッセージを消費するように構成されたコンシューマーは、次のようになります:

export default {
async queue(batch: MessageBatch<Error>, env: Environment): Promise<void> {
// MessageBatchには、スイッチするための`queue`プロパティがあります
switch (batch.queue) {
case 'log-queue':
// バッチをR2に書き込む
break;
case 'debug-queue':
// メッセージをコンソールまたは別のキューに書き込む
break;
case 'email-reset':
// 外部APIを介してパスワードリセットメールをトリガーする
break;
default:
// 明示的に言及していないメッセージを処理する(ログを書き込む、DLQにプッシュする)
}
},
};

コンシューマーの削除

プロジェクトからキューを削除するには、wrangler queues consumer remove <queue-name> <script-name>を実行し、次にwrangler.tomlファイルの[[queues.consumers]]の下にある希望のキューを削除します。

プルコンシューマー

キューには、メッセージがワーカーにプッシュされるのではなく、キューからプルするHTTPベースのコンシューマーを持つことができます。

このコンシューマーは、インターネットを介して通信できる任意のHTTPサービスである可能性があります。キューのプルベースのコンシューマーを構成する方法については、プルコンシューマーガイドを確認してください。

メッセージ

メッセージは、キューに生成し、消費するオブジェクトです。

任意のJSONシリアライズ可能なオブジェクトをキューに公開できます。ほとんどの開発者にとって、これは単純な文字列またはJSONオブジェクトを意味します。メッセージを送信する際にコンテンツタイプを明示的に設定することができます。

メッセージ自体は、コンシューマーに配信される際にバッチ処理されることがあります。デフォルトでは、バッチ内のメッセージは、リトライを決定する際にすべてまたは何もないものとして扱われます。バッチ内の最後のメッセージの処理が失敗した場合、全体のバッチがリトライされます。また、メッセージが正常に処理されたときに明示的に確認することや、個々のメッセージをリトライするようにマークすることも選択できます。