FlaskアプリケーションにおけるCeleryを活用した非同期SMS送信実装

Celeryの基本構造

Celeryはリアルタイム処理とタスクスケジューリングを専門とする分散タスクキューです。メールやSMS送信などの処理時間がかかる処理を非同期で処理するために使用されます。

処理フローは以下の通りです:

  1. プロデューサがタスクを生成し、ブローカーにキューイング
  2. ワーカーがブローカーからタスクを取得し実行
  3. 結果が必要な場合はバックエンドに返却値を保存

Flaskとの連携設定

Celeryオブジェクトの作成

from flask import Flask
from celery import Celery

app = Flask(__name__)
app.config.update({
    'CELERY_BROKER': 'redis://redis-server:6379/3',
    'CELERY_BACKEND': 'redis://redis-server:6379/4'
})

celery_app = Celery(
    app.name,
    broker=app.config['CELERY_BROKER']
)
celery_app.conf.update(app.config)

タスク定義

@celery_app.task
def generate_verification_code(user_id, phone):
    # ユーザーIDと電話番号を用いたSMS送信処理
    return f"Verification code for {phone}: {random.randint(1000, 9999)}"

タスクの呼び出し

# Flaskルーティング処理内で非同期実行
verification_task = generate_verification_code.delay(user_id, phone_number)

実装時の課題と解決策

循環インポートの回避

初期実装では、タスクモジュールのインポートがルートファイルの読み込み時に発生し、循環依存が発生していました。これを解決するため、タスク呼び出し箇所に遅延インポートを適用します。

@api.route('/verification/<phone>')
def send_verification(phone):
    # タスクインポートを関数内に遅延
    from tasks import generate_verification_code
    generate_verification_code.delay(user_id, phone)

モジュール構造の最適化

単一のタスクファイルではなく、機能別にモジュールを分割します。

ihome/
├── celery/
│   ├── __init__.py
│   ├── config.py
│   └── tasks/
│       ├── __init__.py
│       └── sms.py
└── main.py

config.pyでの設定例:

from celery import Celery

def create_celery():
    celery = Celery('app', broker='redis://redis-server:6379/3')
    celery.conf.update({
        'result_backend': 'redis://redis-server:6379/4',
        'task_serializer': 'json'
    })
    return celery

ワーカー起動コマンド

celery -A ihome.celery.config.celery_app worker --loglevel=info

タグ: Celery flask redis 非同期処理 SMS送信

6月15日 00:01 投稿