FastAPIでアクセストークンを検証する
Last reviewed: over 1 year ago
このチュートリアルでは、FastAPIアプリに対して行われたリクエストにAccess JWTが含まれているかどうかを検証する方法を説明します。
所要時間: 15分
- FastAPIアプリ用のセルフホスト型Accessアプリケーション
- AccessアプリケーションのAUDタグ
- FastAPIプロジェクト内に、以下のコードを含む
cloudflare.pyという新しいファイルを作成します。
from fastapi import Request, HTTPException
# アプリケーションのオーディエンス (AUD) タグPOLICY_AUD = "XXXXX"
# あなたのCF AccessチームのドメインTEAM_DOMAIN = "https://<your-team-name>.cloudflareaccess.com"CERTS_URL = "{}/cdn-cgi/access/certs".format(TEAM_DOMAIN)
async def validate_cloudflare(request: Request): """ リクエストがCloudflare Accessによって認証されていることを検証します。 """ if verify_token(request) != True: raise HTTPException(status_code=400, detail="適切に認証されていません!")
def _get_public_keys(): """ 戻り値: PyJWTで使用可能なRSA公開鍵のリスト。 """ r = requests.get(CERTS_URL) public_keys = [] jwk_set = r.json() for key_dict in jwk_set["keys"]: public_key = jwt.algorithms.RSAAlgorithm.from_jwk(json.dumps(key_dict)) public_keys.append(public_key) return public_keys
def verify_token(request): """ リクエスト内のトークンを検証します。 """ token = ""
if "CF_Authorization" in request.cookies: token = request.cookies["CF_Authorization"] else: raise HTTPException(status_code=400, detail="必要なcf認証トークンが欠落しています")
keys = _get_public_keys()
# デコーダにキーセットを渡せないため、キーをループします valid_token = False for key in keys: try: # decodeは必要に応じてメールを含むクレームを返します jwt.decode(token, key=key, audience=POLICY_AUD, algorithms=["RS256"]) valid_token = True break except: raise HTTPException(status_code=400, detail="トークンのデコード中にエラーが発生しました") if not valid_token: raise HTTPException(status_code=400, detail="無効なトークンです")
return Trueこれで、FastAPIアプリに依存関係として検証関数を追加できます。これを行う一つの方法は、APIRouterインスタンス ↗を作成することです。以下の例では、/adminで始まるパスへの各リクエストで検証関数が実行されます。
from fastapi import APIRouter, Depends, HTTPExceptionfrom cloudflare import validate_cloudflare
router = APIRouter( prefix="/admin", tags=["admin"], dependencies=[Depends(validate_cloudflare)], responses={404: {"description": "見つかりません"}},)
@router.get("/")async def root(): return {"message": "こんにちは、世界"}