Django と Celery を用いた非同期・定期実行型ウェブスクレイピングとメール通知監視の実装

1. Django プロジェクトへの Celery 統合(標準的なアプローチ)

1.1 必要なパッケージのインストール

プロジェクトディレクトリに移動し、以下のコマンドで依存モジュールをインストールします:

pip install django==3.2.22
pip install celery redis eventlet
pip install django-celery-beat django-celery-results django-simpleui

1.2 Celery 設定ファイルの作成

プロジェクトルートに celery.py を作成します:

import os
from celery import Celery

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myproject.settings')

app = Celery('myproject')
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks()

1.3 Django 設定への Celery 設定追加

settings.py に以下を追記します:

# Celery 設定
CELERY_BROKER_URL = 'redis://127.0.0.1:6379/0'
CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/1'
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TASK_RESULT_EXPIRES = 86400  # 24時間
CELERY_TIMEZONE = 'Asia/Tokyo'

1.4 プロジェクト初期化ファイルの修正

__init__.py(プロジェクトルート)に以下を記述します:

from .celery import app as celery_app

__all__ = ('celery_app',)

1.5 タスク定義

アプリケーションディレクトリ内に tasks.py を作成し、非同期タスクを定義します:

from celery import shared_task
import time
import smtplib
from email.mime.text import MIMEText

@shared_task
def scrape_website(url):
    time.sleep(2)  # シミュレーション
    return f"Scraped content from {url}"

@shared_task(bind=True)
def send_alert_email(self, recipient, subject, body):
    try:
        msg = MIMEText(body)
        msg['Subject'] = subject
        msg['From'] = 'alert@example.com'
        msg['To'] = recipient

        with smtplib.SMTP('localhost') as server:
            server.send_message(msg)
        return "Email sent successfully"
    except Exception as exc:
        raise self.retry(exc=exc, countdown=60, max_retries=3)

1.6 ビューでのタスク呼び出し

views.py でタスクを非同期実行します:

from django.http import JsonResponse
from .tasks import scrape_website, send_alert_email

def trigger_scrape(request):
    url = request.GET.get('url', 'https://example.com')
    task = scrape_website.delay(url)
    return JsonResponse({'task_id': task.id})

def notify_user(request):
    email = request.GET.get('email')
    if email:
        send_alert_email.delay(email, "Task Alert", "Your scraping task has completed.")
    return JsonResponse({'status': 'Notification queued'})

1.7 URL 設定

urls.py にエンドポイントを追加します:

from django.urls import path
from . import views

urlpatterns = [
    path('scrape/', views.trigger_scrape),
    path('notify/', views.notify_user),
]

1.8 Worker の起動

別ターミナルで以下を実行してワーカーを起動します(Windows の場合は -P eventlet を追加):

celery -A myproject worker --loglevel=info
# Windows の場合:
# celery -A myproject worker --pool=eventlet --loglevel=info

1.9 定期タスクの設定(オプション)

django-celery-beat を使用して定期実行を設定するには、settings.py に以下を追加し、管理画面からスケジュールを登録します:

INSTALLED_APPS += ['django_celery_beat']

その後、python manage.py migrate を実行してマイグレーションを適用します。

1.10 Flower による監視

タスクの状態を可視化するために Flower を起動します:

celery -A myproject flower

ブラウザで http://localhost:5555 にアクセスすると、リアルタイムのタスク監視が可能です。

タグ: Django Celery redis Flower django-celery-beat

6月21日 19:33 投稿