認可キーの一元管理構造
外部クライアントに対するアクセス制御を開始するには、まず有効な認証情報の格納レイヤーを定義します。本格的な運用では永続ストアとの連携が必須ですが、ここでは検証ロジックの骨架を作成するためにメモリ上の集合(set)型を採用します。ハッシュ値ベースの探索となるため、キーの照合コストがO(1)に収まり、大量のリクエストでも検証オーバーヘッドを最小限に抑えられます。
PERMITTED_CLIENT_KEYS: set[str] = {
"prod_tok_a1b2c3d4e5f6g7h8i9j0",
"stag_tok_k1l2m3n4o5p6q7r8s9t0",
"dev_tok_u1v2w3x3y4z5a6b7c8d9"
}
認証パイプラインの実装
FastAPIの依存性注入エンジンを利用し、リクエスト情報からトークンを安全に取り出す関数を作成します。APIKeyHeaderおよびAPIKeyQueryインスタンスを用意し、初期化時にauto_error=Falseを渡すことで、フレームワーク標準のエラー処理をバイパスし、カスタムレスポンスへのフォールバックを実現します。
from fastapi import Depends, HTTPException, status
from fastapi.security import APIKeyHeader, APIKeyQuery
hdr_extractor = APIKeyHeader(name="X-Client-Secret", auto_error=False)
qry_extractor = APIKeyQuery(name="security_token", auto_error=False)
async def resolve_and_verify_client(
hdr_token: str | None = Depends(hdr_extractor),
qry_token: str | None = Depends(qry_extractor)
) -> str:
""" リクエストからトークンを取得し、許可リストに対して正当性を検査する </s>"""
active_candidate = hdr_token or qry_token
if not active_candidate:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="認可パラメータが要求に含まれていません"
)
if active_candidate not in PERMITTED_CLIENT_KEYS:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="提供されたAPIキーは無効です"
)
return active_candidate
ルーティングの保護適用
作成した検証関数をエンドポイントに組み込みます。認証が必要な経路にはDepends()を介して依存関係を宣言し、不要な経路はそのまま公開状態に保ちます。この分離により、検証ロジックは保護対象のリクエストに対してのみ実行され、システムの全般的なレスポンスタイム劣化を防げます。
from fastapi import FastAPI
gateway_app = FastAPI(docs_url="/api-docs")
@gateway_app.get("/health")
async def public_liveness_probe():
return {"service_state": "running", "version": "1.0.0"}
@gateway_app.get("/internal/analytics")
async def fetch_protected_metrics(authorized_key: str = Depends(resolve_and_verify_client)):
return {
"endpoint_status": "access_granted",
"validated_identifier_hash": hash(authorized_key)
}
完全な実装例
# server.py
from fastapi import FastAPI, Depends, HTTPException, status
from fastapi.security import APIKeyHeader, APIKeyQuery
app = FastAPI(title="Key-Protected Service")
ACCESS_WHITELIST: set[str] = {
"key_main_9z8y7x6w5v4u3t2s1r",
"key_backup_q1p2o3n4m5l6k7j8i9",
"key_guest_h9g8f7e6d5c4b3a2z1"
}
extract_header = APIKeyHeader(name="Authorization-Bearer", auto_error=False)
extract_query = APIKeyQuery(name="access_key", auto_error=False)
async def validate_access_route(
h_param: str | None = Depends(extract_header),
q_param: str | None = Depends(extract_query)
) -> str:
token_input = h_param or q_param
if not token_input:
raise HTTPException(status.HTTP_401_UNAUTHORIZED, "認証資格情報が不足しています")
if token_input not in ACCESS_WHITELIST:
raise HTTPException(status.HTTP_401_UNAUTHORIZED, "不正なアクセストークンです")
return token_input
@app.get("/open/ping")
async def unguarded_health():
return {"msg": "public_check_ok"}
@app.get("/secured/report")
async def guarded_report(key: str = Depends(validate_access_route)):
return {"data": "protected_content", "key_prefix": key[:4]}