バッチ処理、再試行、および遅延
キューのコンシューマーワーカーを設定する際に、メッセージが配信される際のバッチ処理の方法も定義できます。
バッチ処理は以下のことができます:
- コンシューマーワーカーを呼び出す回数を減らすことができ(コスト削減につながる可能性があります)。
- 外部APIやサービスに書き込む際にメッセージをバッチ処理できるようにします(書き込みを減少させる)。
- 特にプロデューサーワーカーがユーザー向けのアクティビティに関連している場合、時間をかけて負荷を分散させることができます。
メッセージのバッチ処理を設定する方法は2つあります。コンシューマーワーカーをキューに接続する際にバッチ処理を設定します。
max_batch_size- コンシューマーに配信されるバッチの最大サイズ(デフォルトは10メッセージ)。max_batch_timeout- コンシューマーにバッチを配信する前にキューが待機する_最大_時間(デフォルトは5秒)。
max_batch_sizeとmax_batch_timeoutは連携して機能します。どちらかの制限に最初に達した場合、バッチの配信がトリガーされます。
例えば、max_batch_size = 30およびmax_batch_timeout = 10の場合、30メッセージがキューに書き込まれると、コンシューマーは30メッセージのバッチを配信します。しかし、30メッセージがキューに書き込まれるのに10秒以上かかる場合、コンシューマーはその時点でキューにあったメッセージの数(この場合は1から29の間)を含むバッチを受け取ります。
サイズとタイムアウト設定を決定する際には、レイテンシ(メッセージを受信するまでどれくらい待てるか?)、全体のバッチサイズ(外部システムに書き込む際)、コスト(少ないが大きなバッチ)を考慮する必要があります。
以下のバッチレベルの設定を構成して、キューが設定されたコンシューマーにバッチを配信する方法を調整できます。
| 設定 | デフォルト | 最小 | 最大 |
|---|---|---|---|
最大バッチサイズ max_batch_size | 10メッセージ | 1メッセージ | 100メッセージ |
最大バッチタイムアウト max_batch_timeout | 5秒 | 0秒 | 30秒 |
バッチ内の個々のメッセージを処理する際に、各メッセージを明示的に確認することで、個別に確認できます。明示的に確認されたメッセージは、たとえキューのコンシューマーが後続のメッセージで失敗したり、バッチ処理時に正常に戻らなかった場合でも、再配信されることはありません。
- 各メッセージは、バッチ内で処理する際に確認でき、コンシューマーがバッチ処理中にエラーをスローした場合でも、バッチ全体が再配信されるのを回避します。
- 個々のメッセージを確認することは、外部APIを呼び出したり、データベースにメッセージを書き込んだり、個々のメッセージに対して非冪等(状態変更)アクションを実行する際に便利です。
メッセージを配信済みとして明示的に確認するには、メッセージのack()メソッドを呼び出します。
export default { async queue(batch: MessageBatch, env: Env, ctx: ExecutionContext) { for (const msg of batch.messages) { // TODO: メッセージに対して何かを行う // メッセージを配信済みとして明示的に確認 msg.ack(); } },};また、retry()を呼び出して、メッセージを次のバッチで再配信するように明示的に強制することもできます。これは「ネガティブ確認」と呼ばれます。これは、エラーをスローしてバッチ全体を再配信することなく、そのバッチ内の残りのメッセージを処理したい場合に特に便利です。
export default { async queue(batch: MessageBatch, env: Env, ctx: ExecutionContext) { for (const msg of batch.messages) { // TODO: 失敗するメッセージに対して何かを行う msg.retry(); } },};バッチレベルでメッセージを確認またはネガティブ確認することもできます。ackAll()およびretryAll()を使用します。コンシューマーワーカーに配信されたメッセージのバッチ(MessageBatch)に対してackAll()を呼び出すことは、正常に戻る(エラーをスローしない)コンシューマーワーカーと同じ動作をします。
ack()、retry()、およびそれらのackAll() / retryAllの呼び出しは、以下の優先順位ルールに従います:
- メッセージに対して
ack()を呼び出すと、その後のack()またはretry()の呼び出しは静かに無視されます。 - メッセージに対して
retry()を呼び出し、その後にack()を呼び出すと、ack()は無視されます。最初のメソッド呼び出しがすべてのケースで勝ちます。 - 単一のメッセージに対して
ack()またはretry()のいずれかを呼び出し、その後にバッチに対してackAll()またはretryAll()のいずれかを呼び出すと、単一メッセージに対する呼び出しが優先されます。つまり、バッチレベルの呼び出しはそのメッセージ(または複数の呼び出しが行われた場合はメッセージ)には適用されません。
メッセージの配信に失敗した場合、デフォルトの動作は、配信を失敗としてマークする前に3回再試行することです。コンシューマーを設定する際にmax_retries(デフォルトは3)を設定できますが、ほとんどの場合、デフォルトのままにすることをお勧めします。
設定された最大再試行回数に達したメッセージはキューから削除されるか、デッドレターキュー(DLQ)が設定されている場合は、代わりにDLQに書き込まれます。
バッチ内の単一のメッセージが配信に失敗した場合、そのバッチ全体が再試行されます。ただし、そのバッチ内のメッセージを明示的に確認している場合を除きます。たとえば、10メッセージのバッチが配信され、8番目のメッセージが配信に失敗した場合、すべての10メッセージが再試行され、したがって完全にコンシューマーに再配信されます。
キューにメッセージを公開する際や、メッセージまたはバッチを再試行のためにマークする際に、メッセージの処理を一定期間遅延させることを選択できます。
メッセージを遅延させることで、タスクを後回しにしたり、キューから消費する際のバックプレッシャーに対応したりできます。たとえば、呼び出している上流APIがHTTP 429: Too Many Requestsを返す場合、メッセージを遅延させて、再処理される前に消費する速度を遅くすることができます。
メッセージは最大12時間遅延させることができます。
キューにメッセージまたはメッセージのバッチを送信する際に、delaySecondsパラメータを提供することで遅延させることができます。
// 単一のメッセージを600秒(10分)遅延させるawait env.YOUR_QUEUE.send(message, { delaySeconds: 600 });
// メッセージのバッチを300秒(5分)遅延させるawait env.YOUR_QUEUE.sendBatch(messages, { delaySeconds: 300 });
// このメッセージは遅延させない。// キューにグローバルな遅延が設定されている場合は無視されます。await env.YOUR_QUEUE.sendBatch(messages, { delaySeconds: 0 });キューを作成する際に--delivery-delay-secsを渡すことで、キューごとにデフォルトのグローバル遅延を設定することもできます。
# デフォルトで全メッセージを5分遅延させるnpx wrangler queues create $QUEUE-NAME --delivery-delay-secs=300キューからメッセージを消費する際に、明示的にメッセージを再試行するようにマークすることを選択できます。メッセージは個別に、またはバッチ全体として再試行および遅延させることができます。
バッチ内の個々のメッセージを遅延させるには:
export default { async queue(batch: MessageBatch, env: Env, ctx: ExecutionContext) { for (const msg of batch.messages) { // 単一のメッセージを再試行としてマークし、3600秒(1時間)遅延させる msg.retry({ delaySeconds: 3600 }); } },};メッセージのバッチを遅延させるには:
export default { async queue(batch: MessageBatch, env: Env, ctx: ExecutionContext) { // メッセージのバッチを再試行としてマークし、600秒(10分)遅延させる batch.retryAll({ delaySeconds: 600 }); },};再試行されるメッセージに対して、暗黙の失敗またはretry()を明示的に呼び出すことによって、デフォルトの再試行遅延を設定することもできます。これはコンシューマーレベルで設定され、プッシュベース(ワーカー)およびプルベース(HTTP)コンシューマーの両方でサポートされています。
遅延はwrangler CLIを介して設定できます:
# プッシュベースのコンシューマー# 再試行されるメッセージをデフォルトで60秒(1分)遅延させる。npx wrangler@latest queues consumer worker add $QUEUE-NAME $WORKER_SCRIPT_NAME --retry-delay-secs=60
# プルベースのコンシューマー# 再試行されるメッセージをデフォルトで60秒(1分)遅延させる。npx wrangler@latest queues consumer http add $QUEUE-NAME --retry-delay-secs=60遅延は、プロデューサー(送信時)および/またはコンシューマーごとにretry_delay設定を使用してwrangler.tomlで設定することもできます。
[[queues.producers]] binding = "<BINDING_NAME>" queue = "<QUEUE-NAME>" delivery_delay = 60 # すべてのメッセージ配信を1分遅延させる
[[queues.consumers]] queue = "my-queue" retry_delay = 300 # 再試行されるメッセージを5分遅延させてから再配信を試みるキューまたはキューコンシューマーに関連する設定を変更するためにwrangler CLIとwrangler.tomlの両方を使用する場合、最も最近の構成変更が適用されます。
メッセージの遅延と再試行遅延をプログラムで設定する方法については、Queues REST APIドキュメントを参照してください。
メッセージはキューのレベルでデフォルトで遅延させることができますが、メッセージ(またはバッチ)ごとに遅延させることもできます。
- メッセージ/バッチの遅延設定は、キューのレベルの設定よりも優先されます。
- 送信または再試行時にメッセージに
delaySeconds: 0を設定すると、キューのレベルの遅延は無視され、メッセージは次のバッチで配信されます。 delaySeconds: <任意の正の整数>で送信または再試行されたメッセージは、短いデフォルトの遅延を持つキューに対しても、メッセージレベルの設定を尊重します。
メッセージの配信試行回数に基づいて、メッセージの遅延を増加させるバックオフアルゴリズムを適用できます。
コンシューマーに配信される各メッセージには、配信試行回数を追跡するattemptsプロパティが含まれています。
たとえば、メッセージの指数バックオフ ↗を生成するには、これを計算するヘルパー関数を作成できます。
const calculateExponentialBackoff = ( attempts: number, baseDelaySeconds: number,) => { return baseDelaySeconds ** attempts;};コンシューマー内で、msg.attemptsの値と希望する遅延係数を引数としてretry()を呼び出す際にdelaySecondsに渡します:
const BASE_DELAY_SECONDS = 30;
export default { async queue(batch: MessageBatch, env: Env, ctx: ExecutionContext) { for (const msg of batch.messages) { // 単一のメッセージを再試行としてマークし、3600秒(1時間)遅延させる msg.retry({ delaySeconds: calculateExponentialBackoff( msg.attempts, BASE_DELAY_SECONDS, ), }); } },};- キューのJavaScript APIドキュメントを確認してください。
- キューの動作について詳しく学んでください。
- キューのメトリクスを理解し、バックログや遅延メッセージのカウントを確認してください。