asyncio.wait_for() を用いると、指定した時間内に完了しない非同期処理を自動的にキャンセルし、例外を送出させることができます。これは、外部API呼び出しやI/O待ちなどの不確実な遅延を持つ操作に対して、応答性と信頼性を確保するための基本的な手法です。
wait_for の動作原理
asyncio.wait_for(coro_or_task, timeout) は、与えられたコルーチンまたはタスクの完了を待機しますが、timeout(秒単位、float または int)が経過すると、対象タスクを即座にキャンセルし、asyncio.TimeoutError を送出します。タイムアウト値が None の場合、無限待機になります。
重要な点として、wait_for は「待機中のタスクを監視するラッパー」であり、内部でタスク化(asyncio.create_task() 相当)を行います。したがって、渡すオブジェクトが既存のタスクでも、新規タスクでも動作は同一です。
基本的な使用パターン
以下は、タイムアウト処理を安全に実装する典型的な構文です:
import asyncio
async def fetch_data():
await asyncio.sleep(3) # 意図的に長めの遅延
return {"status": "success"}
async def run_with_timeout():
try:
result = await asyncio.wait_for(fetch_data(), timeout=1.5)
print("Success:", result)
except asyncio.TimeoutError:
print("Operation timed out — task was canceled.")
except Exception as e:
print(f"Unexpected error: {type(e).__name__}: {e}")
この例では、fetch_data() が1.5秒以内に完了しなければ、実行中のタスクは強制終了され、TimeoutError がキャッチされます。また、タスク内で発生したその他の例外(例:ネットワークエラー)も上位へ伝播されるため、適切な例外ハンドリングが必要です。
実践サンプル:可変遅延と明示的なキャンセルフロー
次のコードでは、ランダムな処理時間を持つタスクを0.3秒のタイムアウトで実行し、キャンセル後の状態遷移を明示的に確認します:
import asyncio
import random
async def intensive_worker(identifier: int) -> str:
duration = round(0.5 + random.uniform(0.0, 1.2), 2)
print(f"[{identifier}] Starting (expected {duration}s)...")
await asyncio.sleep(duration)
return f"Completed by worker {identifier}"
async def orchestrate_with_deadline():
task = intensive_worker(42)
try:
outcome = await asyncio.wait_for(task, timeout=0.3)
print(f"✅ Result: {outcome}")
except asyncio.TimeoutError:
print("❌ Timeout triggered — task was automatically canceled.")
# 実行
asyncio.run(orchestrate_with_deadline())
出力例(実行ごとに異なる):
[42] Starting (expected 0.87s)...
❌ Timeout triggered — task was automatically canceled.
このように、wait_for は単なる「待機」ではなく、「時間制約付き実行管理」のためのコアプリミティブです。タイムアウト後、対象タスクは確実にキャンセルされ、イベントループからクリーンアップされます。