Celeryを活用した非同期タスク処理の実装

Celeryの概要とアーキテクチャ

Celeryは、Pythonで実装された分散タスクキューシステムであり、大量のメッセージをリアルタイムに処理することに特化しています。主に非同期タスクの実行やスケジューリング機能を提供し、Webアプリケーションのレスポンス向上に貢献します。

システムは以下の3つの主要コンポーネントで構成されています:

  • メッセージブローカー(Broker):RabbitMQやRedisなどのミドルウェアを使用し、タスクのメッセージを管理します。Celery単体ではメッセージサービスを提供しないため、外部サービスとの連携が必要です。
  • ワーカー(Worker):実際にタスクを実行するプロセスで、分散環境の複数ノードで並列動作が可能です。
  • 結果ストア(Backend):タスクの実行結果を保存し、後から結果を取得できるようにします。RedisやRabbitMQなどが利用可能です。

主な活用シーン

Celeryは時間のかかる処理をメインプログラムから切り離すために利用されます。代表的なユースケースとして以下が挙げられます:

  • メール送信やプッシュ通知などの非同期通信処理
  • 画像・動画のエンコード処理
  • データ集計やバックアップなどの定期的なバッチ処理

インストール方法

pipコマンドで簡単にインストールできます:

$ pip install celery redis

基本的な非同期タスクの実装

まず、シンプルなプロジェクト構造を作成します。

tasks.py(タスク定義)

from celery import Celery
import time

# Redisをブローカーとバックエンドに指定
app = Celery(
    'worker_app',
    broker='redis://localhost:6379/0',
    backend='redis://localhost:6379/1'
)

@app.task(bind=True)
def process_notification(self, user_id):
    """ユーザーへの通知処理をシミュレート"""
    print(f"ユーザーID: {user_id} の処理を開始")
    time.sleep(3)  # 重い処理を想定
    print(f"ユーザーID: {user_id} の処理が完了")
    return {'status': 'success', 'user_id': user_id}

producer.py(タスクの呼び出し)

from tasks import process_notification

# タスクを非同期でキューに送信
task_1 = process_notification.delay(user_id=1001)
print(f"タスクID: {task_1.id}")

task_2 = process_notification.delay(user_id=1002)
print(f"タスクID: {task_2.id}")

results.py(結果の確認)

from celery.result import AsyncResult
from tasks import app

# 特定のタスクIDの結果を取得
task_id = '取得したいタスクIDを指定'
result = AsyncResult(id=task_id, app=app)

if result.ready():
    print(f"実行結果: {result.get()}")
elif result.failed():
    print("タスクの実行に失敗しました")
else:
    print(f"現在の状態: {result.state}")

ワーカーを起動するコマンド:

$ celery -A tasks worker --loglevel=info

プロジェクト構造のモジュール化

実用的なプロジェクトでは、タスクをカテゴリ別に分割して管理します。

プロジェクト構成

celery_project/
├── celery_config.py
├── __init__.py
└── task_modules/
    ├── __init__.py
    ├── email_tasks.py
    └── report_tasks.py

celery_config.py(設定ファイル)

from celery import Celery
from celery.schedules import crontab

app = Celery(
    'celery_project',
    broker='redis://localhost:6379/0',
    backend='redis://localhost:6379/1',
    include=['task_modules.email_tasks', 'task_modules.report_tasks']
)

app.conf.timezone = 'Asia/Tokyo'
app.conf.enable_utc = False

task_modules/email_tasks.py

import time
from celery_config import app

@app.task(name='send_welcome_email')
def send_welcome_email(recipient):
    time.sleep(2)
    return f"{recipient} 宛てにウェルカムメールを送信完了"

task_modules/report_tasks.py

import time
from celery_config import app

@app.task(name='generate_daily_report')
def generate_daily_report(report_date):
    time.sleep(5)
    return f"{report_date} の日次レポートを生成完了"

スケジュール設定による定期実行

特定の時刻や間隔でタスクを自動実行する設定方法です。

指定時刻での実行

from datetime import datetime, timedelta
from task_modules.email_tasks import send_welcome_email

# 現在時刻から10秒後に実行
current_time = datetime.utcnow()
execution_time = current_time + timedelta(seconds=10)

task = send_welcome_email.apply_async(
    args=['user@example.com'],
    eta=execution_time
)
print(f"予約されたタスクID: {task.id}")

周期実行の設定(Beat)

celery_config.pyにスケジュール設定を追加します:

from celery.schedules import crontab

app.conf.beat_schedule = {
    'every-minute-check': {
        'task': 'send_welcome_email',
        'schedule': 60.0,  # 60秒ごと
        'args': ('scheduled_user@example.com',)
    },
    'daily-report': {
        'task': 'generate_daily_report',
        'schedule': crontab(hour=7, minute=30),  # 毎日7:30に実行
        'args': ('auto-generated',)
    }
}

Beatスケジューラとワーカーの起動:

# Beatプロセスの起動
$ celery -A celery_config beat --loglevel=info

# ワーカープロセスの起動(別ターミナルで実行)
$ celery -A celery_config worker --loglevel=info

# または統合して起動
$ celery -A celery_config worker -B --loglevel=info

Djangoフレームワークとの連携

DjangoプロジェクトでCeleryを利用する場合の構成例です。

ディレクトリ構成

myproject/
├── manage.py
├── myproject/
│   ├── settings.py
│   └── ...
└── celery_app/
    ├── __init__.py
    ├── settings.py
    └── tasks.py

celery_app/settings.py

broker_url = 'redis://localhost:6379/10'
result_backend = 'redis://localhost:6379/11'
task_serializer = 'json'
result_serializer = 'json'
accept_content = ['json']
timezone = 'Asia/Tokyo'

celery_app/__init__.py

import os
from celery import Celery

# Django設定の読み込み
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myproject.settings')

app = Celery('myproject')
app.config_from_object('celery_app.settings')
app.autodiscover_tasks()

celery_app/tasks.py

from celery_app import app
import time

@app.task(bind=True)
def async_data_export(self, export_format):
    """データエクスポート処理"""
    time.sleep(3)
    return f"{export_format}形式でのエクスポート完了"

Djangoビューからの呼び出し

from django.http import HttpResponse
from celery_app.tasks import async_data_export

def trigger_export(request):
    # 非同期タスクを実行
    task = async_data_export.delay('CSV')
    return HttpResponse(f'エクスポート処理を開始しました。タスクID: {task.id}')

Django環境での起動コマンド:

$ celery -A celery_app worker -B --loglevel=info

タグ: Celery Python redis Django 非同期処理

6月7日 20:21 投稿