Celeryの基本構造
Celeryはリアルタイム処理とタスクスケジューリングを専門とする分散タスクキューです。メールやSMS送信などの処理時間がかかる処理を非同期で処理するために使用されます。
処理フローは以下の通りです:
- プロデューサがタスクを生成し、ブローカーにキューイング
- ワーカーがブローカーからタスクを取得し実行
- 結果が必要な場合はバックエンドに返却値を保存
Flaskとの連携設定
Celeryオブジェクトの作成
from flask import Flask
from celery import Celery
app = Flask(__name__)
app.config.update({
'CELERY_BROKER': 'redis://redis-server:6379/3',
'CELERY_BACKEND': 'redis://redis-server:6379/4'
})
celery_app = Celery(
app.name,
broker=app.config['CELERY_BROKER']
)
celery_app.conf.update(app.config)
タスク定義
@celery_app.task
def generate_verification_code(user_id, phone):
# ユーザーIDと電話番号を用いたSMS送信処理
return f"Verification code for {phone}: {random.randint(1000, 9999)}"
タスクの呼び出し
# Flaskルーティング処理内で非同期実行
verification_task = generate_verification_code.delay(user_id, phone_number)
実装時の課題と解決策
循環インポートの回避
初期実装では、タスクモジュールのインポートがルートファイルの読み込み時に発生し、循環依存が発生していました。これを解決するため、タスク呼び出し箇所に遅延インポートを適用します。
@api.route('/verification/<phone>')
def send_verification(phone):
# タスクインポートを関数内に遅延
from tasks import generate_verification_code
generate_verification_code.delay(user_id, phone)
モジュール構造の最適化
単一のタスクファイルではなく、機能別にモジュールを分割します。
ihome/
├── celery/
│ ├── __init__.py
│ ├── config.py
│ └── tasks/
│ ├── __init__.py
│ └── sms.py
└── main.py
config.pyでの設定例:
from celery import Celery
def create_celery():
celery = Celery('app', broker='redis://redis-server:6379/3')
celery.conf.update({
'result_backend': 'redis://redis-server:6379/4',
'task_serializer': 'json'
})
return celery
ワーカー起動コマンド
celery -A ihome.celery.config.celery_app worker --loglevel=info