コンテンツにスキップ

プルコンシューマー

プルベースのコンシューマーは、Cloudflare Workersの外部の任意の環境やプログラミング言語からHTTPを介してキューからメッセージを取得することを可能にします。プルベースのコンシューマーは、メッセージの消費率が上流のインフラストラクチャや長時間実行されるタスクによって制限されている場合に便利です。

プッシュまたはプルコンシューマーの選択方法

プッシュベースのコンシューマーを設定するか、プルベースのコンシューマーを設定するかは、キューの使用方法や、キューコンシューマーの上流にあるインフラストラクチャの構成によって異なります。

  • プッシュベースのコンシューマーから始めるのが、キューからメッセージを消費する最も簡単な方法です。プッシュベースのコンシューマーはWorkers上で実行され、デフォルトでは自動的にスケールアップし、キューに書き込まれるメッセージを消費します。
  • Cloudflare Workersの外部にある既存のインフラストラクチャからメッセージを消費する必要がある場合や、メッセージの消費速度を慎重に制御する必要がある場合は、プルベースのコンシューマーを使用します。プルベースのコンシューマーは、準備が整ったときにのみキューからメッセージをプル(取得)し、確認するために明示的に呼び出す必要があります。

要件が変更された場合、いつでもキューに新しいコンシューマーを追加したり削除したりできるため、プルベースからプッシュベースのコンシューマーに変更することが可能です。

プルベースのコンシューマーを設定し、キューからメッセージを受信するには、次の手順が必要です。

  1. キューのHTTPプルを有効にします。
  2. HTTPクライアント用の有効な認証トークンを作成します。
  3. キューからメッセージバッチをプルします。
  4. バッチ内のメッセージを確認および/または再試行します。

1. HTTPプルの有効化

wrangler.tomlwrangler CLI、またはCloudflareダッシュボードを介して、HTTPプルを有効にするか、キューをプッシュベースからプルベースに変更できます。

wrangler.toml

HTTPコンシューマーは、コンシューマー設定でtype = "http_pull"を設定することでwrangler.tomlで構成できます:

[[queues.consumers]]
# 必須
queue = "QUEUE-NAME"
type = "http_pull"
# オプション
visibility_timeout_ms = 5000
max_retries = 5
dead_letter_queue = "SOME-OTHER-QUEUE"

typeプロパティを省略すると、キューはプッシュベースにデフォルト設定されます。

wrangler CLI

wrangler queues consumer httpサブコマンドを使用して、既存のキューにプルベースのコンシューマーを有効にし、キュー名を指定できます。

Terminal window
npx wrangler queues consumer http add $QUEUE-NAME

既存のプッシュベースのコンシューマーがある場合は、最初にそれを削除する必要があります。既存のコンシューマー設定があるキューに対してconsumer http addを呼び出そうとすると、wranglerはエラーを返します:

Terminal window
wrangler queues consumer worker remove $QUEUE-NAME $SCRIPT_NAME

2. コンシューマー認証

HTTPプルコンシューマーには、com.cloudflare.api.account.queues_readおよびcom.cloudflare.api.account.queues_writeの権限を持つAPIトークンが必要です。

プルベースのコンシューマーは、受信したメッセージを確認するためにキューの状態に書き込む必要があるため、読み取りと書き込みの両方が必要です。メッセージを消費すると、キューが変更されます。

APIトークンは、HTTPリクエストのAuthorizationヘッダーにAuthorization: Bearer $YOUR_TOKEN_HEREの形式で提示されます。以下の例は、curl HTTPクライアントを使用してAPIトークンを渡す方法を示しています:

Terminal window
curl "https://api.cloudflare.com/client/v4/accounts/${CF_ACCOUNT_ID}/queues/${QUEUE_ID}/messages/pull" \
--header "Authorization: Bearer ${QUEUES_TOKEN}" \
--header "Content-Type: application/json" \
--data '{ "visibility_timeout": 10000, "batch_size": 2 }'

単一のキューに対して複数の同時プルベースのコンシューマーを認証して実行できますが、すべてのコンシューマーはCloudflare APIに対して同じレート制限を共有します。

APIトークンの作成

APIトークンを作成するには:

  1. CloudflareダッシュボードのAPIトークンページに移動します。
  2. トークンを作成を選択します。
  3. ページの下部までスクロールし、カスタムトークンを作成を選択します。
  4. トークンに名前を付けます。例えば、queue-pull-token
  5. 権限セクションで、アカウントを選択し、次にキューを選択します。編集(読み取り+書き込み)が選択されていることを確認します。
  6. (オプション)すべてのアカウント(デフォルト)または特定のアカウントを選択してトークンのスコープを設定します。
  7. 要約に進むを選択し、次にトークンを作成を選択します。

トークンは一度だけ表示されるため、メモしておく必要があります。

3. メッセージをプルする

メッセージをプルするには、visibility_timeoutbatch_sizeをオプションで指定するJSONエンコードされたボディを持つHTTP POSTリクエストをQueues REST APIに送信します。空のJSONオブジェクト({})を指定することもできます:

// POST /accounts/${CF_ACCOUNT_ID}/queues/${QUEUE_ID}/messages/pull with the timeout & batch size
let resp = await fetch(
`https://api.cloudflare.com/client/v4/accounts/${CF_ACCOUNT_ID}/queues/${QUEUE_ID}/messages/pull`,
{
method: "POST",
headers: {
"content-type": "application/json",
authorization: `Bearer ${QUEUES_API_TOKEN}`,
},
// オプション - 空のオブジェクト'{}'を提供すると、デフォルトが適用されます。
body: JSON.stringify({ visibility_timeout: 6000, batch_size: 50 }),
},
);

これにより、以下の形式でメッセージの配列(指定されたbatch_sizeまで)が返されます:

{
"success": true,
"errors": [],
"messages": [],
"result": {
"messages": [
{
"body": "hello",
"id": "1ad27d24c83de78953da635dc2ea208f",
"timestamp_ms": 1689615013586,
"attempts": 2,
"metadata":{
"CF-sourceMessageSource":"dash",
"CF-Content-Type":"json"
},
"lease_id": "eyJhbGciOiJkaXIiLCJlbmMiOiJBMjU2Q0JDLUhTNTEyIn0..NXmbr8h6tnKLsxJ_AuexHQ.cDt8oBb_XTSoKUkVKRD_Jshz3PFXGIyu7H1psTO5UwI.smxSvQ8Ue3-ymfkV6cHp5Va7cyUFPIHuxFJA07i17sc"
},
{
"body": "world",
"id": "95494c37bb89ba8987af80b5966b71a7",
"timestamp_ms": 1689615013586,
"attempts": 2,
"metadata":{
"CF-sourceMessageSource":"dash",
"CF-Content-Type":"json"
},
"lease_id": "eyJhbGciOiJkaXIiLCJlbmMiOiJBMjU2Q0JDLUhTNTEyIn0..QXPgHfzETsxYQ1Vd-H0hNA.mFALS3lyouNtgJmGSkTzEo_imlur95EkSiH7fIRIn2U.PlwBk14CY_EWtzYB-_5CR1k30bGuPFPUx1Nk5WIipFU"
}
]
}
}

プルコンシューマーは「ショートポーリング」アプローチに従います:配信可能なメッセージがある場合、Queuesは即座にメッセージを返します(指定されたbatch_sizeまで)。配信するメッセージがない場合、Queuesは空の応答を返します。メッセージを配信するために、Queuesはオープン接続を保持しません(しばしば「ロングポーリング」と呼ばれます)。

各メッセージオブジェクトには5つのフィールドがあります:

  1. body - これは、メッセージが公開されたコンテンツタイプに基づいてbase64エンコードされている場合があります。
  2. id - メッセージの一意の読み取り専用の一時識別子。
  3. timestamp_ms - メッセージがキューに公開された時刻(Unixエポックからのミリ秒)。これにより、現在のタイムスタンプから引き算することでメッセージの古さを判断できます。
  4. attempts - メッセージが完全に配信されるために試みられた回数。この値がmax_retriesに達すると、メッセージは再配信されず、キューから永久に削除されます。
  5. lease_id - メッセージのエンコードされたリースID。lease_idは、メッセージを明示的に確認または再試行するために使用されます。

lease_idを使用すると、プルコンシューマーはバッチ内の一部、またはすべてのメッセージを明示的に確認したり、再試行のためにマークしたりできます。メッセージが確認されない場合や再試行のためにマークされない場合、visibility_timeoutが達成されると再配信のためにマークされます。このタイムアウトが達成されると、lease_idはもはや有効ではありません。

キューからプルする際に、batch_sizevisibility_timeoutの両方を設定できます:

  • batch_size(デフォルトは5、最大100) - 各プルでコンシューマーに返されるメッセージの数。
  • visibility_timeout(デフォルトは30秒、最大12時間) - バッチ内で配信されたメッセージを明示的に確認するためにコンシューマーが持つ時間を定義します。このタイムアウトが切れると、メッセージは未確認と見なされ、再配信のためにキューに戻されます。

同時コンシューマー

同じキューから複数のHTTPクライアントが同時にプルすることができます:各クライアントは一意のメッセージバッチを受け取り、visibility_timeoutが切れるまでそれらのメッセージに対する「リース」を保持します。または、メッセージが再試行のためにマークされるまで保持します。

再試行のためにマークされたメッセージはキューに戻され、任意のコンシューマーに配信される可能性があります。メッセージは特定のコンシューマーに結びついていないため、コンシューマーにはアイデンティティがなく、遅いまたはスタックしたコンシューマーがキュー内のメッセージの処理を妨げないようにします。

複数のコンシューマーは、複数の上流リソース(例えば、GPUインフラストラクチャ)がある場合や、キューのバックログに基づいて自動スケールしたい場合、またはコストの観点から便利です。

4. メッセージの確認

コンシューマーによってプルされたメッセージは、確認されるか再試行のためにマークされる必要があります。

メッセージを確認および/または再試行のためにマークするには、Queues REST APIに従って、確認および/または再試行するlease_idオブジェクトの配列を提供してキューの/ackエンドポイントにHTTP POSTリクエストを送信します:

// POST /accounts/${CF_ACCOUNT_ID}/queues/${QUEUE_ID}/messages/ack with the lease_ids
let resp = await fetch(
`https://api.cloudflare.com/client/v4/accounts/${CF_ACCOUNT_ID}/queues/${QUEUE_ID}/messages/ack`,
{
method: "POST",
headers: {
"content-type": "application/json",
authorization: `Bearer ${QUEUES_API_TOKEN}`,
},
// 再試行するメッセージがない場合は、空の配列を指定できます - retries: []
body: JSON.stringify({
acks: [
{ lease_id: "lease_id1" },
{ lease_id: "lease_id2" },
{ lease_id: "etc" },
],
retries: [{ lease_id: "lease_id4" }],
}),
},
);

再試行のためにマークする際にメッセージの遅延秒数を指定することもできます。これは、retries配列に{ lease_id: string, delay_seconds: number }オブジェクトを提供することで行います:

{
"acks": [
{ "lease_id": "lease_id1" },
{ "lease_id": "lease_id2" },
{ "lease_id": "lease_id3" }
],
"retries": [{ "lease_id": "lease_id4", "delay_seconds": 600 }]
}

さらに:

  • コンシューマーでメッセージを処理している場合は、/ackエンドポイントへのリクエストにすべてのlease_idを提供する必要があります。メッセージを確認しない場合、それは再配信のためにマークされます(キューに戻されます)。
  • メッセージを再試行のためにマークすることもできます。たとえば、メッセージの処理中にエラーが発生した場合や、上流リソースの圧力がある場合です。メッセージを再試行のために明示的にマークすると、(潜在的に長い)visibility_timeoutが達成されるのを待つことなく、すぐにキューに戻されます。
  • メッセージのバッチを処理する進捗に応じて、/ackエンドポイントに複数回呼び出すことができますが、APIレート制限に達しないように確認をグループ化することをお勧めします。

QueuesはリースIDに関して寛容であることを目指しています:コンシューマーが可視性タイムアウトが達成された後にリースIDでメッセージを確認しても、Queuesはその確認を受け入れます。メッセージがその間に別のコンシューマーに配信された場合、そのコンシューマーもエラーなしでメッセージを確認できます。

コンテンツタイプ

キューに外部コンシューマーがいる場合、特定のコンテンツタイプがJSONオブジェクト内で安全にシリアライズできるようにエンコードされる可能性があることに注意する必要があります。

jsonおよびbytesコンテンツタイプの場合、これはそれらがbase64エンコードされることを意味します(RFC 4648)。textタイプは、プレーンなUTF-8エンコードされた文字列として送信されます。

コンシューマーは、データを操作する前にjsonおよびbytesタイプをデコードする必要があります。

次のステップ