WorkersからR2マルチパートAPIを使用する
このガイドに従うことで、アプリケーションがマルチパートアップロードを実行できるWorkerを作成します。 この例のWorkerは、Workerに認証を追加したり、各パートをアップロードする際に追加のバリデーションロジックを追加したりするための基盤として使用できます。 このガイドには、このWorkerにファイルをアップロードする例のPythonアプリケーションも含まれています。
このガイドでは、WorkerのR2バインディングが設定されていることを前提としています。R2バインディングの設定手順については、WorkersからR2を使用するを参照してください。
以下の例のWorkerは、アプリケーションがWorkerを介してマルチパートAPIを使用できるHTTP APIを公開します。
この例では、各リクエストはHTTPメソッドとアクションリクエストパラメータに基づいてルーティングされます。Workerが複雑になるにつれて、Hono ↗のようなサーバーレスWebフレームワークを利用してルーティングを処理することを検討してください。
以下の例のWorkerは、各リクエストに対する応答にマルチパートアップロードの状態に関する新しい情報を含めています。マルチパートアップロードを作成するリクエストでは、uploadIdが返されます。パートをアップロードするリクエストでは、パート番号とetagが返されます。クライアントはこの状態を追跡し、次のリクエストにuploadIdを含め、マルチパートアップロードを完了する際には各パートのetagとパート番号を含めます。
以下のコードをプロジェクトのindex.jsファイルに追加し、MY_BUCKETをバケットの名前に置き換えてください:
interface Env { MY_BUCKET: R2Bucket;}
export default { async fetch( request, env, ctx ): Promise<Response> { const bucket = env.MY_BUCKET;
const url = new URL(request.url); const key = url.pathname.slice(1); const action = url.searchParams.get("action");
if (action === null) { return new Response("アクションタイプが欠落しています", { status: 400 }); }
// HTTPメソッドとアクションタイプに基づいてリクエストをルーティング switch (request.method) { case "POST": switch (action) { case "mpu-create": { const multipartUpload = await bucket.createMultipartUpload(key); return new Response( JSON.stringify({ key: multipartUpload.key, uploadId: multipartUpload.uploadId, }) ); } case "mpu-complete": { const uploadId = url.searchParams.get("uploadId"); if (uploadId === null) { return new Response("uploadIdが欠落しています", { status: 400 }); }
const multipartUpload = env.MY_BUCKET.resumeMultipartUpload( key, uploadId );
interface completeBody { parts: R2UploadedPart[]; } const completeBody: completeBody = await request.json(); if (completeBody === null) { return new Response("ボディが欠落しているか不完全です", { status: 400, }); }
// マルチパートアップロードがもはや存在しない場合のエラーハンドリング try { const object = await multipartUpload.complete(completeBody.parts); return new Response(null, { headers: { etag: object.httpEtag, }, }); } catch (error: any) { return new Response(error.message, { status: 400 }); } } default: return new Response(`POSTのための不明なアクション ${action}`, { status: 400, }); } case "PUT": switch (action) { case "mpu-uploadpart": { const uploadId = url.searchParams.get("uploadId"); const partNumberString = url.searchParams.get("partNumber"); if (partNumberString === null || uploadId === null) { return new Response("partNumberまたはuploadIdが欠落しています", { status: 400, }); } if (request.body === null) { return new Response("リクエストボディが欠落しています", { status: 400 }); }
const partNumber = parseInt(partNumberString); const multipartUpload = env.MY_BUCKET.resumeMultipartUpload( key, uploadId ); try { const uploadedPart: R2UploadedPart = await multipartUpload.uploadPart(partNumber, request.body); return new Response(JSON.stringify(uploadedPart)); } catch (error: any) { return new Response(error.message, { status: 400 }); } } default: return new Response(`PUTのための不明なアクション ${action}`, { status: 400, }); } case "GET": if (action !== "get") { return new Response(`GETのための不明なアクション ${action}`, { status: 400, }); } const object = await env.MY_BUCKET.get(key); if (object === null) { return new Response("オブジェクトが見つかりません", { status: 404 }); } const headers = new Headers(); object.writeHttpMetadata(headers); headers.set("etag", object.httpEtag); return new Response(object.body, { headers }); case "DELETE": switch (action) { case "mpu-abort": { const uploadId = url.searchParams.get("uploadId"); if (uploadId === null) { return new Response("uploadIdが欠落しています", { status: 400 }); } const multipartUpload = env.MY_BUCKET.resumeMultipartUpload( key, uploadId );
try { multipartUpload.abort(); } catch (error: any) { return new Response(error.message, { status: 400 }); } return new Response(null, { status: 204 }); } case "delete": { await env.MY_BUCKET.delete(key); return new Response(null, { status: 204 }); } default: return new Response(`DELETEのための不明なアクション ${action}`, { status: 400, }); } default: return new Response("メソッドが許可されていません", { status: 405, headers: { Allow: "PUT, POST, GET, DELETE" }, }); } },} satisfies ExportedHandler<Env>;上記のコードでWorkerを更新した後、npx wrangler deployを実行します。
これで、このWorkerを使用してマルチパートアップロードを実行できるようになります。既存のアプリケーションからこのWorkerにリクエストを送信してアップロードを実行するか、スクリプトを使用してこのWorkerを介してファイルをアップロードできます。
次のセクションはオプションであり、選択したファイルをあなたのマシンからWorkerにアップロードするPythonスクリプトの例を示します。
この例のアプリケーションは、ローカルファイルをWorkerに複数のパートでアップロードします。Pythonの組み込みThreadPoolExecutorを使用して、パートのアップロードをWorkerに並行して行い、アップロード速度を向上させます。WorkerへのHTTPリクエストは、requests ↗ライブラリを使用して行われます。
このようにマルチパートAPIを利用することで、Workerを使用してWorkersリクエストボディサイズ制限を超えるファイルをアップロードできます。個々のパートのアップロードはこの制限の対象となります。
以下のコードをローカルマシンのmpuscript.pyという名前のファイルに保存します。worker_endpoint変数をWorkerがデプロイされている場所に変更します。このスクリプトを実行する際にアップロードしたいファイルを引数として渡します:python3 mpuscript.py myfile。これにより、あなたのマシンからバケットにmyfileがアップロードされます。
import mathimport osimport requestsfrom requests.adapters import HTTPAdapter, Retryimport sysimport concurrent.futures
# アップロードするファイルを引数として取得filename = sys.argv[1]# Workerのエンドポイント、これをWorkerがデプロイされている場所に変更worker_endpoint = "https://myworker.myzone.workers.dev/"# パートサイズを10MBに設定。5MBが最小パートサイズで、最後のパートを除くpartsize = 10 * 1024 * 1024
def upload_file(worker_endpoint, filename, partsize): url = f"{worker_endpoint}{filename}"
# マルチパートアップロードを作成 uploadId = requests.post(url, params={"action": "mpu-create"}).json()["uploadId"]
part_count = math.ceil(os.stat(filename).st_size / partsize) # 最大25の同時アップロードのためのエグゼキュータを作成 executor = concurrent.futures.ThreadPoolExecutor(25) # 各パートをアップロードするためのタスクをエグゼキュータに提出 futures = [ executor.submit(upload_part, filename, partsize, url, uploadId, index) for index in range(part_count) ] concurrent.futures.wait(futures) # futuresからパートを取得 uploaded_parts = [future.result() for future in futures]
# マルチパートアップロードを完了 response = requests.post( url, params={"action": "mpu-complete", "uploadId": uploadId}, json={"parts": uploaded_parts}, ) if response.status_code == 200: print("🎉 マルチパートアップロードが成功裏に完了しました") else: print(response.text)
def upload_part(filename, partsize, url, uploadId, index): # ファイルをrbモードで開き、utf-8を解析しようとせずに生のバイトとして扱う with open(filename, "rb") as file: file.seek(partsize * index) part = file.read(partsize)
# パートのアップロードが失敗した場合のリトライポリシー s = requests.Session() retries = Retry(total=3, status_forcelist=[400, 500, 502, 503, 504]) s.mount("https://", HTTPAdapter(max_retries=retries))
return s.put( url, params={ "action": "mpu-uploadpart", "uploadId": uploadId, "partNumber": str(index + 1), }, data=part, ).json()
upload_file(worker_endpoint, filename, partsize)マルチパートアップロードの状態を管理することは、Workersの使用モデルにうまくマッピングされません。Workersは本質的にステートレスだからです。通常のマルチパートアップロードでは、マルチパートアップロードは通常、クライアントアプリケーションの1回の連続実行で行われます。これは、Worker内でのマルチパートアップロードとは異なり、Workerの複数の呼び出しを通じて完了することがよくあります。これにより、状態管理がより困難になります。
これを克服するために、マルチパートアップロードに関連する状態、すなわちuploadIdとどのパートがアップロードされたかを、Workerの外部のどこかで追跡する必要があります。
このガイドで説明した例のWorkerとPythonアプリケーションでは、マルチパートアップロードの状態は、Workerにリクエストを送信するクライアントアプリケーションで追跡され、必要な状態が各リクエストに含まれています。クライアントアプリケーションでマルチパートの状態を追跡することで、最大限の柔軟性が得られ、各パートの並行かつ順不同のアップロードが可能になります。
クライアントでこの状態を追跡することが不可能な場合は、代替設計を検討できます。たとえば、uploadIdとどのパートがアップロードされたかをDurable Objectや他のデータベースで追跡することができます。