Celeryの概要とアーキテクチャ
Celeryは、Pythonで実装された分散タスクキューシステムであり、大量のメッセージをリアルタイムに処理することに特化しています。主に非同期タスクの実行やスケジューリング機能を提供し、Webアプリケーションのレスポンス向上に貢献します。
システムは以下の3つの主要コンポーネントで構成されています:
- メッセージブローカー(Broker):RabbitMQやRedisなどのミドルウェアを使用し、タスクのメッセージを管理します。Celery単体ではメッセージサービスを提供しないため、外部サービスとの連携が必要です。
- ワーカー(Worker):実際にタスクを実行するプロセスで、分散環境の複数ノードで並列動作が可能です。
- 結果ストア(Backend):タスクの実行結果を保存し、後から結果を取得できるようにします。RedisやRabbitMQなどが利用可能です。
主な活用シーン
Celeryは時間のかかる処理をメインプログラムから切り離すために利用されます。代表的なユースケースとして以下が挙げられます:
- メール送信やプッシュ通知などの非同期通信処理
- 画像・動画のエンコード処理
- データ集計やバックアップなどの定期的なバッチ処理
インストール方法
pipコマンドで簡単にインストールできます:
$ pip install celery redis
基本的な非同期タスクの実装
まず、シンプルなプロジェクト構造を作成します。
tasks.py(タスク定義)
from celery import Celery
import time
# Redisをブローカーとバックエンドに指定
app = Celery(
'worker_app',
broker='redis://localhost:6379/0',
backend='redis://localhost:6379/1'
)
@app.task(bind=True)
def process_notification(self, user_id):
"""ユーザーへの通知処理をシミュレート"""
print(f"ユーザーID: {user_id} の処理を開始")
time.sleep(3) # 重い処理を想定
print(f"ユーザーID: {user_id} の処理が完了")
return {'status': 'success', 'user_id': user_id}
producer.py(タスクの呼び出し)
from tasks import process_notification
# タスクを非同期でキューに送信
task_1 = process_notification.delay(user_id=1001)
print(f"タスクID: {task_1.id}")
task_2 = process_notification.delay(user_id=1002)
print(f"タスクID: {task_2.id}")
results.py(結果の確認)
from celery.result import AsyncResult
from tasks import app
# 特定のタスクIDの結果を取得
task_id = '取得したいタスクIDを指定'
result = AsyncResult(id=task_id, app=app)
if result.ready():
print(f"実行結果: {result.get()}")
elif result.failed():
print("タスクの実行に失敗しました")
else:
print(f"現在の状態: {result.state}")
ワーカーを起動するコマンド:
$ celery -A tasks worker --loglevel=info
プロジェクト構造のモジュール化
実用的なプロジェクトでは、タスクをカテゴリ別に分割して管理します。
プロジェクト構成
celery_project/
├── celery_config.py
├── __init__.py
└── task_modules/
├── __init__.py
├── email_tasks.py
└── report_tasks.py
celery_config.py(設定ファイル)
from celery import Celery
from celery.schedules import crontab
app = Celery(
'celery_project',
broker='redis://localhost:6379/0',
backend='redis://localhost:6379/1',
include=['task_modules.email_tasks', 'task_modules.report_tasks']
)
app.conf.timezone = 'Asia/Tokyo'
app.conf.enable_utc = False
task_modules/email_tasks.py
import time
from celery_config import app
@app.task(name='send_welcome_email')
def send_welcome_email(recipient):
time.sleep(2)
return f"{recipient} 宛てにウェルカムメールを送信完了"
task_modules/report_tasks.py
import time
from celery_config import app
@app.task(name='generate_daily_report')
def generate_daily_report(report_date):
time.sleep(5)
return f"{report_date} の日次レポートを生成完了"
スケジュール設定による定期実行
特定の時刻や間隔でタスクを自動実行する設定方法です。
指定時刻での実行
from datetime import datetime, timedelta
from task_modules.email_tasks import send_welcome_email
# 現在時刻から10秒後に実行
current_time = datetime.utcnow()
execution_time = current_time + timedelta(seconds=10)
task = send_welcome_email.apply_async(
args=['user@example.com'],
eta=execution_time
)
print(f"予約されたタスクID: {task.id}")
周期実行の設定(Beat)
celery_config.pyにスケジュール設定を追加します:
from celery.schedules import crontab
app.conf.beat_schedule = {
'every-minute-check': {
'task': 'send_welcome_email',
'schedule': 60.0, # 60秒ごと
'args': ('scheduled_user@example.com',)
},
'daily-report': {
'task': 'generate_daily_report',
'schedule': crontab(hour=7, minute=30), # 毎日7:30に実行
'args': ('auto-generated',)
}
}
Beatスケジューラとワーカーの起動:
# Beatプロセスの起動
$ celery -A celery_config beat --loglevel=info
# ワーカープロセスの起動(別ターミナルで実行)
$ celery -A celery_config worker --loglevel=info
# または統合して起動
$ celery -A celery_config worker -B --loglevel=info
Djangoフレームワークとの連携
DjangoプロジェクトでCeleryを利用する場合の構成例です。
ディレクトリ構成
myproject/
├── manage.py
├── myproject/
│ ├── settings.py
│ └── ...
└── celery_app/
├── __init__.py
├── settings.py
└── tasks.py
celery_app/settings.py
broker_url = 'redis://localhost:6379/10'
result_backend = 'redis://localhost:6379/11'
task_serializer = 'json'
result_serializer = 'json'
accept_content = ['json']
timezone = 'Asia/Tokyo'
celery_app/__init__.py
import os
from celery import Celery
# Django設定の読み込み
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myproject.settings')
app = Celery('myproject')
app.config_from_object('celery_app.settings')
app.autodiscover_tasks()
celery_app/tasks.py
from celery_app import app
import time
@app.task(bind=True)
def async_data_export(self, export_format):
"""データエクスポート処理"""
time.sleep(3)
return f"{export_format}形式でのエクスポート完了"
Djangoビューからの呼び出し
from django.http import HttpResponse
from celery_app.tasks import async_data_export
def trigger_export(request):
# 非同期タスクを実行
task = async_data_export.delay('CSV')
return HttpResponse(f'エクスポート処理を開始しました。タスクID: {task.id}')
Django環境での起動コマンド:
$ celery -A celery_app worker -B --loglevel=info