コンテンツにスキップ

FastAPIでアクセストークンを検証する

Last reviewed: over 1 year ago

このチュートリアルでは、FastAPIアプリに対して行われたリクエストにAccess JWTが含まれているかどうかを検証する方法を説明します。

所要時間: 15分

前提条件

1. 検証関数を作成する

  1. FastAPIプロジェクト内に、以下のコードを含むcloudflare.pyという新しいファイルを作成します。
from fastapi import Request, HTTPException
# アプリケーションのオーディエンス (AUD) タグ
POLICY_AUD = "XXXXX"
# あなたのCF Accessチームのドメイン
TEAM_DOMAIN = "https://<your-team-name>.cloudflareaccess.com"
CERTS_URL = "{}/cdn-cgi/access/certs".format(TEAM_DOMAIN)
async def validate_cloudflare(request: Request):
"""
リクエストがCloudflare Accessによって認証されていることを検証します。
"""
if verify_token(request) != True:
raise HTTPException(status_code=400, detail="適切に認証されていません!")
def _get_public_keys():
"""
戻り値:
PyJWTで使用可能なRSA公開鍵のリスト。
"""
r = requests.get(CERTS_URL)
public_keys = []
jwk_set = r.json()
for key_dict in jwk_set["keys"]:
public_key = jwt.algorithms.RSAAlgorithm.from_jwk(json.dumps(key_dict))
public_keys.append(public_key)
return public_keys
def verify_token(request):
"""
リクエスト内のトークンを検証します。
"""
token = ""
if "CF_Authorization" in request.cookies:
token = request.cookies["CF_Authorization"]
else:
raise HTTPException(status_code=400, detail="必要なcf認証トークンが欠落しています")
keys = _get_public_keys()
# デコーダにキーセットを渡せないため、キーをループします
valid_token = False
for key in keys:
try:
# decodeは必要に応じてメールを含むクレームを返します
jwt.decode(token, key=key, audience=POLICY_AUD, algorithms=["RS256"])
valid_token = True
break
except:
raise HTTPException(status_code=400, detail="トークンのデコード中にエラーが発生しました")
if not valid_token:
raise HTTPException(status_code=400, detail="無効なトークンです")
return True

2. アプリで検証関数を使用する

これで、FastAPIアプリに依存関係として検証関数を追加できます。これを行う一つの方法は、APIRouterインスタンスを作成することです。以下の例では、/adminで始まるパスへの各リクエストで検証関数が実行されます。

from fastapi import APIRouter, Depends, HTTPException
from cloudflare import validate_cloudflare
router = APIRouter(
prefix="/admin",
tags=["admin"],
dependencies=[Depends(validate_cloudflare)],
responses={404: {"description": "見つかりません"}},
)
@router.get("/")
async def root():
return {"message": "こんにちは、世界"}