コンテンツにスキップ

WorkersからR2マルチパートAPIを使用する

このガイドに従うことで、アプリケーションがマルチパートアップロードを実行できるWorkerを作成します。 この例のWorkerは、Workerに認証を追加したり、各パートをアップロードする際に追加のバリデーションロジックを追加したりするための基盤として使用できます。 このガイドには、このWorkerにファイルをアップロードする例のPythonアプリケーションも含まれています。

このガイドでは、WorkerのR2バインディングが設定されていることを前提としています。R2バインディングの設定手順については、WorkersからR2を使用するを参照してください。

マルチパートAPIを使用した例のWorker

以下の例のWorkerは、アプリケーションがWorkerを介してマルチパートAPIを使用できるHTTP APIを公開します。

この例では、各リクエストはHTTPメソッドとアクションリクエストパラメータに基づいてルーティングされます。Workerが複雑になるにつれて、HonoのようなサーバーレスWebフレームワークを利用してルーティングを処理することを検討してください。

以下の例のWorkerは、各リクエストに対する応答にマルチパートアップロードの状態に関する新しい情報を含めています。マルチパートアップロードを作成するリクエストでは、uploadIdが返されます。パートをアップロードするリクエストでは、パート番号とetagが返されます。クライアントはこの状態を追跡し、次のリクエストにuploadIdを含め、マルチパートアップロードを完了する際には各パートのetagとパート番号を含めます。

以下のコードをプロジェクトのindex.jsファイルに追加し、MY_BUCKETをバケットの名前に置き換えてください:

interface Env {
MY_BUCKET: R2Bucket;
}
export default {
async fetch(
request,
env,
ctx
): Promise<Response> {
const bucket = env.MY_BUCKET;
const url = new URL(request.url);
const key = url.pathname.slice(1);
const action = url.searchParams.get("action");
if (action === null) {
return new Response("アクションタイプが欠落しています", { status: 400 });
}
// HTTPメソッドとアクションタイプに基づいてリクエストをルーティング
switch (request.method) {
case "POST":
switch (action) {
case "mpu-create": {
const multipartUpload = await bucket.createMultipartUpload(key);
return new Response(
JSON.stringify({
key: multipartUpload.key,
uploadId: multipartUpload.uploadId,
})
);
}
case "mpu-complete": {
const uploadId = url.searchParams.get("uploadId");
if (uploadId === null) {
return new Response("uploadIdが欠落しています", { status: 400 });
}
const multipartUpload = env.MY_BUCKET.resumeMultipartUpload(
key,
uploadId
);
interface completeBody {
parts: R2UploadedPart[];
}
const completeBody: completeBody = await request.json();
if (completeBody === null) {
return new Response("ボディが欠落しているか不完全です", {
status: 400,
});
}
// マルチパートアップロードがもはや存在しない場合のエラーハンドリング
try {
const object = await multipartUpload.complete(completeBody.parts);
return new Response(null, {
headers: {
etag: object.httpEtag,
},
});
} catch (error: any) {
return new Response(error.message, { status: 400 });
}
}
default:
return new Response(`POSTのための不明なアクション ${action}`, {
status: 400,
});
}
case "PUT":
switch (action) {
case "mpu-uploadpart": {
const uploadId = url.searchParams.get("uploadId");
const partNumberString = url.searchParams.get("partNumber");
if (partNumberString === null || uploadId === null) {
return new Response("partNumberまたはuploadIdが欠落しています", {
status: 400,
});
}
if (request.body === null) {
return new Response("リクエストボディが欠落しています", { status: 400 });
}
const partNumber = parseInt(partNumberString);
const multipartUpload = env.MY_BUCKET.resumeMultipartUpload(
key,
uploadId
);
try {
const uploadedPart: R2UploadedPart =
await multipartUpload.uploadPart(partNumber, request.body);
return new Response(JSON.stringify(uploadedPart));
} catch (error: any) {
return new Response(error.message, { status: 400 });
}
}
default:
return new Response(`PUTのための不明なアクション ${action}`, {
status: 400,
});
}
case "GET":
if (action !== "get") {
return new Response(`GETのための不明なアクション ${action}`, {
status: 400,
});
}
const object = await env.MY_BUCKET.get(key);
if (object === null) {
return new Response("オブジェクトが見つかりません", { status: 404 });
}
const headers = new Headers();
object.writeHttpMetadata(headers);
headers.set("etag", object.httpEtag);
return new Response(object.body, { headers });
case "DELETE":
switch (action) {
case "mpu-abort": {
const uploadId = url.searchParams.get("uploadId");
if (uploadId === null) {
return new Response("uploadIdが欠落しています", { status: 400 });
}
const multipartUpload = env.MY_BUCKET.resumeMultipartUpload(
key,
uploadId
);
try {
multipartUpload.abort();
} catch (error: any) {
return new Response(error.message, { status: 400 });
}
return new Response(null, { status: 204 });
}
case "delete": {
await env.MY_BUCKET.delete(key);
return new Response(null, { status: 204 });
}
default:
return new Response(`DELETEのための不明なアクション ${action}`, {
status: 400,
});
}
default:
return new Response("メソッドが許可されていません", {
status: 405,
headers: { Allow: "PUT, POST, GET, DELETE" },
});
}
},
} satisfies ExportedHandler<Env>;

上記のコードでWorkerを更新した後、npx wrangler deployを実行します。

これで、このWorkerを使用してマルチパートアップロードを実行できるようになります。既存のアプリケーションからこのWorkerにリクエストを送信してアップロードを実行するか、スクリプトを使用してこのWorkerを介してファイルをアップロードできます。

次のセクションはオプションであり、選択したファイルをあなたのマシンからWorkerにアップロードするPythonスクリプトの例を示します。

Workerを使用してマルチパートアップロードを実行する(オプション)

この例のアプリケーションは、ローカルファイルをWorkerに複数のパートでアップロードします。Pythonの組み込みThreadPoolExecutorを使用して、パートのアップロードをWorkerに並行して行い、アップロード速度を向上させます。WorkerへのHTTPリクエストは、requestsライブラリを使用して行われます。

このようにマルチパートAPIを利用することで、Workerを使用してWorkersリクエストボディサイズ制限を超えるファイルをアップロードできます。個々のパートのアップロードはこの制限の対象となります。

以下のコードをローカルマシンのmpuscript.pyという名前のファイルに保存します。worker_endpoint変数をWorkerがデプロイされている場所に変更します。このスクリプトを実行する際にアップロードしたいファイルを引数として渡します:python3 mpuscript.py myfile。これにより、あなたのマシンからバケットにmyfileがアップロードされます。

import math
import os
import requests
from requests.adapters import HTTPAdapter, Retry
import sys
import concurrent.futures
# アップロードするファイルを引数として取得
filename = sys.argv[1]
# Workerのエンドポイント、これをWorkerがデプロイされている場所に変更
worker_endpoint = "https://myworker.myzone.workers.dev/"
# パートサイズを10MBに設定。5MBが最小パートサイズで、最後のパートを除く
partsize = 10 * 1024 * 1024
def upload_file(worker_endpoint, filename, partsize):
url = f"{worker_endpoint}{filename}"
# マルチパートアップロードを作成
uploadId = requests.post(url, params={"action": "mpu-create"}).json()["uploadId"]
part_count = math.ceil(os.stat(filename).st_size / partsize)
# 最大25の同時アップロードのためのエグゼキュータを作成
executor = concurrent.futures.ThreadPoolExecutor(25)
# 各パートをアップロードするためのタスクをエグゼキュータに提出
futures = [
executor.submit(upload_part, filename, partsize, url, uploadId, index)
for index in range(part_count)
]
concurrent.futures.wait(futures)
# futuresからパートを取得
uploaded_parts = [future.result() for future in futures]
# マルチパートアップロードを完了
response = requests.post(
url,
params={"action": "mpu-complete", "uploadId": uploadId},
json={"parts": uploaded_parts},
)
if response.status_code == 200:
print("🎉 マルチパートアップロードが成功裏に完了しました")
else:
print(response.text)
def upload_part(filename, partsize, url, uploadId, index):
# ファイルをrbモードで開き、utf-8を解析しようとせずに生のバイトとして扱う
with open(filename, "rb") as file:
file.seek(partsize * index)
part = file.read(partsize)
# パートのアップロードが失敗した場合のリトライポリシー
s = requests.Session()
retries = Retry(total=3, status_forcelist=[400, 500, 502, 503, 504])
s.mount("https://", HTTPAdapter(max_retries=retries))
return s.put(
url,
params={
"action": "mpu-uploadpart",
"uploadId": uploadId,
"partNumber": str(index + 1),
},
data=part,
).json()
upload_file(worker_endpoint, filename, partsize)

状態管理

マルチパートアップロードの状態を管理することは、Workersの使用モデルにうまくマッピングされません。Workersは本質的にステートレスだからです。通常のマルチパートアップロードでは、マルチパートアップロードは通常、クライアントアプリケーションの1回の連続実行で行われます。これは、Worker内でのマルチパートアップロードとは異なり、Workerの複数の呼び出しを通じて完了することがよくあります。これにより、状態管理がより困難になります。

これを克服するために、マルチパートアップロードに関連する状態、すなわちuploadIdとどのパートがアップロードされたかを、Workerの外部のどこかで追跡する必要があります。

このガイドで説明した例のWorkerとPythonアプリケーションでは、マルチパートアップロードの状態は、Workerにリクエストを送信するクライアントアプリケーションで追跡され、必要な状態が各リクエストに含まれています。クライアントアプリケーションでマルチパートの状態を追跡することで、最大限の柔軟性が得られ、各パートの並行かつ順不同のアップロードが可能になります。

クライアントでこの状態を追跡することが不可能な場合は、代替設計を検討できます。たとえば、uploadIdとどのパートがアップロードされたかをDurable Objectや他のデータベースで追跡することができます。