コンテンツにスキップ

始める

Cloudflare Queuesは、非同期処理のためにメッセージをキューに追加できる柔軟なメッセージキューです。このガイドに従うことで、最初のキューを作成し、そのキューにメッセージを公開するためのWorkerと、そのキューからメッセージを消費するための消費者Workerを作成します。

前提条件

Queuesを使用するには、以下が必要です:

  1. Sign up for a Cloudflare account.
  2. Install npm.
  3. Install Node.js.

Node.js version manager

Use a Node version manager like Volta or nvm to avoid permission issues and change Node.js versions. Wrangler, discussed later in this guide, requires a Node version of 16.17.0 or later.

1. Queuesを有効にする

Queues is currently in Public Beta and is included in the monthly subscription cost of your Workers Paid plan, and charges based on operations against your queues. Refer to Pricing for more details.

Before you can use Queues, you must enable it via the Cloudflare dashboard. You need a Workers Paid plan to enable Queues.

To enable Queues:

  1. Log in to the Cloudflare dashboard.
  2. Go to Workers & Pages > Queues.
  3. Select Enable Queues Beta.

2. Workerプロジェクトを作成する

あなたは、プロデューサーWorkerからキューにアクセスします。メッセージをキューに公開するためには、少なくとも1つのプロデューサーWorkerを作成する必要があります。

プロデューサーWorkerを作成するには、次のコマンドを実行します:

Terminal window
npm create cloudflare@latest -- producer-worker

For setup, select the following options:

  • For What would you like to start with?, choose Hello Worldの例.
  • For Which template would you like to use?, choose Hello World Worker.
  • For Which language do you want to use?, choose TypeScript.
  • For Do you want to use git for version control?, choose Yes.
  • For Do you want to deploy your application?, choose No (we will be making some changes before deploying).

これにより、新しいディレクトリが作成され、src/index.ts Workerスクリプトとwrangler.toml設定ファイルが含まれます。Workerを作成した後、アクセスするためのQueueを作成します。

新しく作成されたディレクトリに移動します:

Terminal window
cd producer-worker

3. キューを作成する

キューを使用するには、メッセージを公開し、消費するために少なくとも1つのキューを作成する必要があります。

キューを作成するには、次のコマンドを実行します:

Terminal window
npx wrangler queues create <MY-QUEUE-NAME>

このキューで使用するメッセージの種類に関連する、説明的な名前を選択してください。説明的なキュー名は、debug-logsuser-clickstream-data、またはpassword-reset-prodのようになります。

キュー名は1から63文字の長さでなければなりません。キュー名にはダッシュ(-)以外の特殊文字を含めることはできず、文字または数字で始まり、終わる必要があります。

キュー名を設定した後に変更することはできません。キューを作成した後、プロデューサーWorkerを設定してアクセスします。

4. プロデューサーWorkerを設定する

Worker内のコードにキューを公開するには、バインディングを作成してキューをWorkerに接続する必要があります。Bindingsを使用すると、WorkerがCloudflare開発プラットフォーム上のリソース(例えばQueues)にアクセスできます。

バインディングを作成するには、新しく生成されたwrangler.toml設定ファイルを開き、次の内容を追加します:

[[queues.producers]]
queue = "MY-QUEUE-NAME"
binding = "MY_QUEUE"

MY-QUEUE-NAMEステップ3で作成したキューの名前に置き換えます。次に、MY_QUEUEをバインディングのために希望する名前に置き換えます。バインディングは有効なJavaScript変数名でなければなりません。これは、Worker内でこのキューを参照するために使用する変数です。

プロデューサーWorkerを書く

これで、キューに公開するメッセージを作成するためにプロデューサーWorkerを設定します。プロデューサーWorkerは以下のことを行います:

  1. ブラウザから受け取ったリクエストを取得します。
  2. リクエストをJSON形式に変換します。
  3. リクエストを直接キューに書き込みます。

Workerプロジェクトディレクトリ内で、srcフォルダを開き、index.tsファイルに次の内容を追加します:

export default {
async fetch(request, env, ctx): Promise<Response> {
let log = {
url: request.url,
method: request.method,
headers: Object.fromEntries(request.headers),
};
await env.<MY_QUEUE>.send(log);
return new Response('成功!');
},
} satisfies ExportedHandler<Env>;

MY_QUEUEwrangler.tomlから設定したバインディングの名前に置き換えます。

また、index.tsEnvインターフェースにキューを追加します。

export interface Env {
<MY_QUEUE>: Queue<any>;
}

この書き込みが失敗した場合、Workerはエラーを返します(例外を発生させます)。この書き込みが成功した場合、ブラウザにHTTP 200ステータスコードで成功を返します。

本番アプリケーションでは、例外をキャッチして直接処理するためにtry...catch文を使用することが一般的です(例えば、カスタムエラーを返すか、再試行するなど)。

プロデューサーWorkerを公開する

wrangler.tomlファイルとindex.tsファイルが設定されたので、プロデューサーWorkerを公開する準備が整いました。プロデューサーWorkerを公開するには、次のコマンドを実行します:

Terminal window
npx wrangler deploy

以下のような出力が表示され、デフォルトで*.workers.devのURLが表示されるはずです。

Uploaded <YOUR-WORKER-NAME> (0.76 sec)
Published <YOUR-WORKER-NAME> (0.29 sec)
https://<YOUR-WORKER-NAME>.<YOUR-ACCOUNT>.workers.dev

*.workers.devサブドメインをコピーして、新しいブラウザタブに貼り付けます。ページを数回更新して、キューにリクエストを公開し始めます。ブラウザは、リクエストをキューに書き込むたびに成功の応答を返すはずです。

キューとメッセージをキューに公開するためのプロデューサーWorkerを構築しました。次に、キューに公開されたメッセージを消費するための消費者Workerを作成します。消費者Workerがないと、メッセージはデフォルトで4日間キューに留まります。

5. 消費者Workerを作成する

消費者Workerは、キューからメッセージを受信します。消費者Workerがキューのメッセージを受信すると、それをログコンソールやストレージオブジェクトなどの別のソースに書き込むことができます。

このガイドでは、消費者Workerを作成し、wrangler tailを使用してメッセージをログに記録し、検査します。消費者Workerは、プロデューサーWorkerを作成したのと同じWorkerプロジェクト内で作成します。

消費者Workerを作成するには、index.tsファイルを開き、既存のfetchハンドラーに次のqueueハンドラーを追加します:

export default {
async fetch(request, env, ctx): Promise<Response> {
let log = {
url: request.url,
method: request.method,
headers: Object.fromEntries(request.headers),
};
await env.<MY_QUEUE>.send(log);
return new Response('成功!');
},
async queue(batch, env): Promise<void> {
let messages = JSON.stringify(batch.messages);
console.log(`キューから消費しました: ${messages}`);
},
} satisfies ExportedHandler<Env>;

MY_QUEUEwrangler.tomlから設定したバインディングの名前に置き換えます。

メッセージがキューに公開されるたびに、消費者Workerのqueueハンドラー(async queue)が呼び出され、1つ以上のメッセージが渡されます。

この例では、消費者WorkerはキューのJSON形式のメッセージを文字列に変換し、その出力をログに記録します。実際のアプリケーションでは、消費者Workerはメッセージをオブジェクトストレージ(R2など)に書き込んだり、データベース(D1など)に書き込んだり、外部API(メールAPIなど)やレガシークラウドプロバイダーのデータウェアハウスに呼び出す前にメッセージをさらに処理したりするように構成できます。

消費者ハンドラー内で非同期タスクを実行する場合は、waitUntil()を使用して関数の応答が処理されるようにします。このメソッドのスコープ内では他の非同期メソッドはサポートされていません。

消費者Workerをキューに接続する

消費者Workerを設定した後、キューに接続する準備が整いました。

各キューには、1つの消費者Workerしか接続できません。同じキューに複数の消費者を接続しようとすると、そのWorkerを公開しようとしたときにエラーが発生します。

キューを消費者Workerに接続するには、wrangler.tomlファイルを開き、次の内容を追加します:

[[queues.consumers]]
queue = "<MY-QUEUE-NAME>"
# 必須: これはステップ3で作成したキューの名前と一致する必要があります。
# 名前を間違えると、Workerを公開しようとしたときにエラーが発生します。
max_batch_size = 10 # オプション: デフォルトは10
max_batch_timeout = 5 # オプション: デフォルトは5秒

MY-QUEUE-NAMEステップ3で作成したキューに置き換えます。

消費者Workerでは、max_batch_sizeオプションとmax_batch_timeoutオプションを使用してメッセージを自動的にバッチ処理しています。消費者Workerは、10のバッチまたは5秒ごとにメッセージを受信します。

max_batch_size(デフォルトは10)は、消費者Workerが呼び出される回数を減らすのに役立ちます。メッセージごとに呼び出されるのではなく、10メッセージがキューに入った後にのみ呼び出されます。

max_batch_timeout(デフォルトは5秒)は、待機時間を減らすのに役立ちます。プロデューサーWorkerが消費者Workerを呼び出すためにキューに10メッセージを送信していない場合、消費者Workerは待機中のメッセージを受信するために5秒ごとに呼び出されます。

消費者Workerを公開する

wrangler.tomlファイルとindex.tsファイルが設定されたので、次のコマンドを実行して消費者Workerを公開します:

Terminal window
npx wrangler deploy

6. キューからメッセージを読み取る

消費者Workerを設定した後、キューからメッセージを読み取ることができます。

wrangler tailを実行して、消費者が受信したメッセージをログに記録するのを待ちます:

Terminal window
npx wrangler tail

wrangler tailが実行中の状態で、ステップ4で開いたWorkerのURLを開きます。

ブラウザウィンドウに成功メッセージが表示されるはずです。

成功メッセージが表示された場合、URLを数回更新してメッセージを生成し、キューにプッシュします。

wrangler tailが実行中の状態で、消費者Workerは更新によって生成されたリクエストをログに記録し始めます。

10回未満の更新を行った場合、バッチタイムアウトが10秒に設定されているため、メッセージが表示されるまでに数秒かかることがあります。10秒後には、メッセージがターミナルに到着するはずです。

更新時にエラーが発生した場合は、ステップ3で作成したキュー名とwrangler.tomlファイルで参照したキューが同じであることを確認してください。プロデューサーWorkerが成功を返し、エラーを返していないことを確認する必要があります。

このガイドを完了することで、キュー、キューにメッセージを公開するプロデューサーWorker、およびそのメッセージを消費する消費者Workerを作成しました。

関連リソース

  • Cloudflare Workersと、Cloudflare上で構築できるアプリケーションについて詳しく学びましょう。