コンテンツにスキップ

Alarms APIの使用

Durable Objects Alarms APIを使用して、Durable Objectへのリクエストをバッチ処理します。

この例では、Durable Objectを10秒ごとに起こして、単一のDurable Objectへのリクエストをバッチ処理するalarm()ハンドラーを実装しています。alarm()ハンドラーは、キューに十分な作業があるまで処理を遅延させます。

// Worker
export default {
async fetch(request, env) {
let id = env.BATCHER.idFromName("foo");
return await env.BATCHER.get(id).fetch(request);
},
};
const SECONDS = 10;
// Durable Object
export class Batcher {
constructor(state, env) {
this.state = state;
this.storage = state.storage;
this.state.blockConcurrencyWhile(async () => {
let vals = await this.storage.list({ reverse: true, limit: 1 });
this.count = vals.size == 0 ? 0 : parseInt(vals.keys().next().value);
});
}
async fetch(request) {
this.count++;
// 現在アラームが設定されていない場合、10秒後にアラームを設定します
// 次の10秒間の追加のPOSTは、このバッチの一部になります。
let currentAlarm = await this.storage.getAlarm();
if (currentAlarm == null) {
this.storage.setAlarm(Date.now() + (1000 * SECONDS));
}
// リクエストをバッチに追加します。
await this.storage.put(this.count, await request.text());
return new Response(JSON.stringify({ queued: this.count }), {
headers: {
"content-type": "application/json;charset=UTF-8",
},
});
}
async alarm() {
let vals = await this.storage.list();
await fetch("http://example.com/some-upstream-service", {
method: "POST",
body: Array.from(vals.values()),
});
await this.storage.deleteAll();
this.count = 0;
}
}

alarm()ハンドラーは10秒ごとに呼び出されます。予期しないエラーでDurable Objectが終了した場合、alarm()ハンドラーは別のマシンで再インスタンス化されます。短い遅延の後、alarm()ハンドラーは別のマシンで最初から実行されます。