FastAPIアプリケーションへのAPIキー認証実装ガイド

認可キーの一元管理構造

外部クライアントに対するアクセス制御を開始するには、まず有効な認証情報の格納レイヤーを定義します。本格的な運用では永続ストアとの連携が必須ですが、ここでは検証ロジックの骨架を作成するためにメモリ上の集合(set)型を採用します。ハッシュ値ベースの探索となるため、キーの照合コストがO(1)に収まり、大量のリクエストでも検証オーバーヘッドを最小限に抑えられます。

PERMITTED_CLIENT_KEYS: set[str] = {
    "prod_tok_a1b2c3d4e5f6g7h8i9j0",
    "stag_tok_k1l2m3n4o5p6q7r8s9t0",
    "dev_tok_u1v2w3x3y4z5a6b7c8d9"
}

認証パイプラインの実装

FastAPIの依存性注入エンジンを利用し、リクエスト情報からトークンを安全に取り出す関数を作成します。APIKeyHeaderおよびAPIKeyQueryインスタンスを用意し、初期化時にauto_error=Falseを渡すことで、フレームワーク標準のエラー処理をバイパスし、カスタムレスポンスへのフォールバックを実現します。

from fastapi import Depends, HTTPException, status
from fastapi.security import APIKeyHeader, APIKeyQuery

hdr_extractor = APIKeyHeader(name="X-Client-Secret", auto_error=False)
qry_extractor = APIKeyQuery(name="security_token", auto_error=False)

async def resolve_and_verify_client(
    hdr_token: str | None = Depends(hdr_extractor),
    qry_token: str | None = Depends(qry_extractor)
) -> str:
    """ リクエストからトークンを取得し、許可リストに対して正当性を検査する </s>"""
    active_candidate = hdr_token or qry_token

    if not active_candidate:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="認可パラメータが要求に含まれていません"
        )

    if active_candidate not in PERMITTED_CLIENT_KEYS:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="提供されたAPIキーは無効です"
        )

    return active_candidate

ルーティングの保護適用

作成した検証関数をエンドポイントに組み込みます。認証が必要な経路にはDepends()を介して依存関係を宣言し、不要な経路はそのまま公開状態に保ちます。この分離により、検証ロジックは保護対象のリクエストに対してのみ実行され、システムの全般的なレスポンスタイム劣化を防げます。

from fastapi import FastAPI

gateway_app = FastAPI(docs_url="/api-docs")

@gateway_app.get("/health")
async def public_liveness_probe():
    return {"service_state": "running", "version": "1.0.0"}

@gateway_app.get("/internal/analytics")
async def fetch_protected_metrics(authorized_key: str = Depends(resolve_and_verify_client)):
    return {
        "endpoint_status": "access_granted",
        "validated_identifier_hash": hash(authorized_key)
    }

完全な実装例

# server.py
from fastapi import FastAPI, Depends, HTTPException, status
from fastapi.security import APIKeyHeader, APIKeyQuery

app = FastAPI(title="Key-Protected Service")

ACCESS_WHITELIST: set[str] = {
    "key_main_9z8y7x6w5v4u3t2s1r",
    "key_backup_q1p2o3n4m5l6k7j8i9",
    "key_guest_h9g8f7e6d5c4b3a2z1"
}

extract_header = APIKeyHeader(name="Authorization-Bearer", auto_error=False)
extract_query = APIKeyQuery(name="access_key", auto_error=False)

async def validate_access_route(
    h_param: str | None = Depends(extract_header),
    q_param: str | None = Depends(extract_query)
) -> str:
    token_input = h_param or q_param
    
    if not token_input:
        raise HTTPException(status.HTTP_401_UNAUTHORIZED, "認証資格情報が不足しています")
        
    if token_input not in ACCESS_WHITELIST:
        raise HTTPException(status.HTTP_401_UNAUTHORIZED, "不正なアクセストークンです")
        
    return token_input

@app.get("/open/ping")
async def unguarded_health():
    return {"msg": "public_check_ok"}

@app.get("/secured/report")
async def guarded_report(key: str = Depends(validate_access_route)):
    return {"data": "protected_content", "key_prefix": key[:4]}

タグ: fastapi Python APIKeyAuthentication DependencyInjection HTTPSecurity

6月13日 22:32 投稿