コンシューマの同時実行
コンシューマの同時実行により、キューからメッセージを処理するコンシューマワーカーが、自動的に水平スケールアウトして、メッセージがキューに書き込まれる速度に追いつくことができます。
多くのシステムでは、キューにメッセージを書き込む速度が、単一のコンシューマが同じメッセージを読み取り処理する速度を容易に超えることがあります。これは、コンシューマがメッセージの内容を解析したり、ストレージやデータベースに書き込んだり、サードパーティ(上流)APIを呼び出したりするためです。
キューのプロデューサーは常にスケーラブルであり、最大サポートメッセージ毎秒(キューごと)の制限まで対応可能です。
デフォルトでは、すべてのキューで同時実行が有効になっています。キューのコンシューマは、キューのバックログやエラーレートを管理するために必要に応じて最大同時呼び出し数まで自動的にスケールアップします。
メッセージのバッチを処理した後、キューは同時コンシューマの数を調整する必要があるかどうかを確認します。キューに対して呼び出される同時コンシューマの数は、以下のいくつかの要因に基づいて自動的にスケールします。
- キュー内のメッセージ数(バックログ)とその成長率。
- 失敗した呼び出し(成功した呼び出しに対して)の比率。失敗した呼び出しとは、
queue()ハンドラーがvoid(何も返さない)ではなく、未捕捉の例外を返すことです。 - そのコンシューマに設定された
max_concurrencyの値。
可能な限り、キューはバックログが指数関数的に成長するのを防ぐよう最適化し、キュー内のメッセージのバックログが処理される前にメッセージ保持制限に達するシナリオを最小限に抑えます。
もし、1秒あたり100メッセージを処理するのに5秒かかる単一の同時コンシューマを持つキューに書き込んでいる場合、フライト中のメッセージ数は、コンシューマが追いつくよりも速い速度で増え続けます。
このシナリオでは、キューは増加するバックログに気付き、同時コンシューマワーカーの呼び出し数を(おおよそ)5にまでスケールアップします。これは、受信メッセージの速度が減少するか、コンシューマがメッセージをより早く処理するか、コンシューマがエラーを生成し始めるまで続きます。
コンシューマが自動スケーリングしない場合、いくつかの考えられる原因があります。
max_concurrencyが1に設定されています。- コンシューマワーカーがメッセージを処理するのではなく、エラーを返しています。コンシューマが正常であることを確認してください。
- メッセージのバッチが処理されています。キューは、メッセージのバッチ全体を処理した後にのみコンシューマを自動スケーリングするかどうかを確認するため、バッチが処理されている間は自動スケーリングしません。バッチサイズを減らすか、コンシューマをリファクタリングしてメッセージをより早く処理することを検討してください。
上流APIやシステムによって制限されるワークフローがある場合、バックログが増加することを好むかもしれません。これは、上流システムを圧倒しないように全体のレイテンシを増加させることと引き換えです。
コンシューマワーカーの同時実行を設定する方法は2つあります。
- Cloudflareダッシュボードで同時実行設定を行う
wrangler.tomlを介して同時実行設定を行う
ダッシュボードからコンシューマワーカーの同時実行設定を構成するには:
- Cloudflareダッシュボード ↗にログインし、アカウントを選択します。
- Workers & Pages > Queuesを選択します。
- キューを選択 > Settingsを選択します。
- コンシューマの詳細の下でEdit Consumerを選択します。
- Maximum consumer invocationsを
1から10の間の値に設定します。この値は、キューに利用可能な最大同時コンシューマ呼び出し数を表します。
固定の最大値を削除するには、**auto (recommended)**を選択します。
メッセージをキューに処理する速度よりも速く書き込んでいる場合、メッセージは最終的にそのキューに設定された最大保持期間に達する可能性があることに注意してください。その制限に達した個々のメッセージは、キューから期限切れとなり削除されます。
特定のキューに対して固定の最大同時コンシューマ呼び出し数を設定するには、wrangler.tomlファイルにmax_concurrencyを設定します:
[[queues.consumers]] queue = "my-queue" max_concurrency = 1制限を削除するには、特定のキューの[[queues.consumers]]設定からmax_concurrency設定を削除し、npx wrangler deployを呼び出して構成の更新をプッシュします。
複数のコンシューマワーカーが呼び出されると、各ワーカーの呼び出しにはCPU時間コストが発生します。
- キューに書き込まれたすべてのメッセージを処理するつもりであれば、_実際の全体コストは同じ_であり、同時実行が有効になっていても変わりません。
- 同時実行を有効にすることは、単にそのコストを前倒しにし、メッセージがメッセージ保持制限に達するのを防ぐのに役立ちます。
コンシューマの請求は、Workers標準使用モデルに従い、開発者はリクエストとリクエストで使用されたCPU時間に対して請求されます。
メッセージのバッチを処理するのに2秒かかるコンシューマワーカーは、50百万(50,000,000)メッセージを処理するための全体コストは、同時に(速く)処理するか、個別に(遅く)処理するかにかかわらず、同じになります。