分散タスクスケジューリングとコンテンツ審査システムの実装

# 分散タスクスケジューリングとコンテンツ審査システムの実装

1. 分散タスクスケジューリングの概要

1.1 タスクスケジューリングとは

タスクスケジューリングとは、システムが特定のタスクを自動的に実行するために、指定された時間にタスクを実行するプロセスです。これにより、人手を解放し、システムが自動的にタスクを実行できます。

一般的な実装方法:

  • マルチスレッド方式 + sleep
  • JDK提供のAPI(Timer、ScheduledExecutorなど)
  • フレームワーク(Quartzなど)
  • Spring Task

1.2 Cron式

Cron式は、タスクの実行タイミングを設定するための文字列形式です。7つの部分から構成され、各部分はスペースで区切られます。

構成要素意味値の範囲第一部分Seconds (秒)0-59第二部分Minutes(分)0-59第三部分Hours(時)0-23第四部分Day-of-Month(日)1-31第五部分Month(月)0-11またはJAN-DEC第六部分Day-of-Week(曜日)1-7(1は日曜日)またはSUN-SAT第七部分Year(年) オプション1970-2099

特殊記号も利用可能:

  • ?: 不確定値
  • *: すべての値
  • ,: 複数値設定
  • -: 範囲設定
  • /: 間隔設定
  • L: 月の最終日
  • W: 最も近い営業日
  • #: 週の第何曜日

1.3 分散タスクスケジューリングの特徴

分散タスクスケジューリングは、以下の利点があります:

  • 並列処理によるパフォーマンス向上
  • 高可用性(障害時のフェイルオーバー)
  • 弾力的なスケールアウト
  • タスクの統一管理と監視

1.4 XXL-Jobの紹介

XXL-Jobは、大众点评が開発した軽量分散タスクスケジューリングプラットフォームです。主要な特徴:

  • シンプルで柔軟な設計
  • 丰富的なタスク管理機能
  • 高パフォーマンス(非同期処理)
  • 高可用性(クラスタ展開対応)
  • モニタリングと運用の容易性

2. XXL-Jobのセットアップと使用

2.1 環境準備

2.1.1 スケジューリングセンターの要件

  • Maven3+
  • JDK1.8+
  • MySQL5.7+

2.1.2 データベース初期化

以下のSQLスクリプトを実行してデータベースを初期化します:

-- xxl_job_lock:タスクスケジューリングロックテーブル
-- xxl_job_group:実行者情報テーブル
-- xxl_job_info:スケジューリング拡張情報テーブル
-- xxl_job_log:スケジューリングログテーブル
-- xxl_job_registry:実行者登録テーブル

2.1.3 スケジューリングセンターの設定

# データベース接続情報
spring.datasource.url=jdbc:mysql://localhost:3306/xxl_job?Unicode=true&serverTimezone=Asia/Shanghai&characterEncoding=UTF-8
spring.datasource.username=root
spring.datasource.password=root

# サーバーポート
server.port=8888
server.servlet.context-path=/xxl-job-admin

2.2 実行者の設定

実行者は、スケジューリングリクエストを受け取りタスクを実行するコンポーネントです。

2.2.1 プロジェクトの作成

<dependency>
    <groupId>com.xuxueli</groupId>
    <artifactId>xxl-job-core</artifactId>
    <version>2.2.0</version>
</dependency>

2.2.2 設定ファイル

# スケジューリングセンターのアドレス
xxl.job.admin.addresses=http://localhost:8888/xxl-job-admin

# 実行者のアプリ名
xxl.job.executor.appname=content-review-executor

# 実行者のポート
xxl.job.executor.port=9999

# ログパス
xxl.job.executor.logpath=/data/applogs/xxl-job/jobhandler

2.2.3 タスクの実装

@Component
public class ContentReviewJob {

    @XxlJob("contentReviewTask")
    public ReturnT<String> execute(String param) throws Exception {
        System.out.println("コンテンツ審査タスクを実行中: " + LocalDateTime.now());
        
        // 審査ロジックを実装
        reviewPendingContent();
        
        return ReturnT.SUCCESS;
    }
    
    private void reviewPendingContent() {
        // 審査待ちのコンテンツを取得して処理
    }
}

3. 自媒体コンテンツ審査システム

3.1 需求分析

  • 自媒体記事が自動審査に合格した場合、ステータスを「8(審査通過待発行)」に設定
  • 定時タスクが1分ごとに審査通過待発行の記事をスキャン
  • 発行時間が現在時刻より前の記事を即時発行

3.2 データ準備

3.2.1 APIインターフェースの定義

@GetMapping("/findPendingPublish")
List<Integer> findPendingPublish();

3.2.2 サービス層の実装

@Override
public List<Integer> findPendingPublish() {
    List<WmNews> pendingList = list(Wrappers.<WmNews>lambdaQuery()
        .eq(WmNews::getStatus, 8)
        .lt(WmNews::getPublishTime, new Date()));
    
    return pendingList.stream()
        .map(WmNews::getId)
        .collect(Collectors.toList());
}

3.3 定時タスクの設定

3.3.1 スケジューリングセンターでのタスク作成

  • 実行者:content-review-executor
  • Cron式:0 * * * * ?(毎分実行)
  • ルーティング戦略:ROUND(ラウンドロビン)

3.3.2 タスクの実装

@Component
public class PublishScheduler {

    @Autowired
    private WemediaFeign wemediaFeign;
    
    @Autowired
    private ContentPublishService publishService;
    
    @XxlJob("publishPendingContent")
    public ReturnT<String> execute(String param) throws Exception {
        log.info("発行待ちコンテンツのスキャンを開始");
        
        List<Integer> pendingIds = wemediaFeign.findPendingPublish();
        if (pendingIds != null && !pendingIds.isEmpty()) {
            for (Integer id : pendingIds) {
                publishService.publishContent(id);
            }
        }
        
        log.info("発行処理が完了しました");
        return ReturnT.SUCCESS;
    }
}

4. 人工審査機能

4.1 需求分析

  • 自媒体記事が自動審査に合格しなかった場合、ステータスを「3(人工審査待ち)」に設定
  • 管理者は待機中の記事を確認し、承認または却下できる
  • 承認時はステータスを「4」に、却下時はステータスを「2」に変更

4.2 自媒体側の実装

4.2.1 記事リストの取得

@PostMapping("/findReviewList")
public PageResponseResult findReviewList(ReviewDto dto) {
    // ページングと検索ロジックを実装
    return wmNewsService.findReviewList(dto);
}

4.2.2 記事詳細の取得

@GetMapping("/findReviewDetail/{id}")
public WmNewsVo findReviewDetail(@PathVariable Integer id) {
    return wmNewsService.findReviewDetail(id);
}

4.3 管理者側の実装

4.3.1 審査アクションの処理

@PostMapping("/approve")
public ResponseResult approveContent(@RequestBody ReviewActionDto dto) {
    return contentReviewService.approveContent(dto);
}

@PostMapping("/reject")
public ResponseResult rejectContent(@RequestBody ReviewActionDto dto) {
    return contentReviewService.rejectContent(dto);
}

4.3.2 サービス層の実装

@Override
public ResponseResult approveContent(ReviewActionDto dto) {
    WmNews news = wemediaFeign.findById(dto.getId());
    if (news == null) {
        return ResponseResult.errorResult(AppHttpCodeEnum.DATA_NOT_EXIST);
    }
    
    // ステータスを4(公開済み)に更新
    updateWmNews(news, (short) 4, "人工審査通過");
    return ResponseResult.okResult(AppHttpCodeEnum.SUCCESS);
}

@Override
public ResponseResult rejectContent(ReviewActionDto dto) {
    WmNews news = wemediaFeign.findById(dto.getId());
    if (news == null) {
        return ResponseResult.errorResult(AppHttpCodeEnum.DATA_NOT_EXIST);
    }
    
    // ステータスを2(却下)に更新
    updateWmNews(news, (short) 2, dto.getReason());
    return ResponseResult.okResult(AppHttpCodeEnum.SUCCESS);
}

5. 記事の公開/非公開機能

5.1 ロジックの実装

5.1.1 記事の公開/非公開処理

@Override
public ResponseResult togglePublishStatus(ToggleDto dto) {
    WmNews news = getById(dto.getId());
    if (news == null || !news.getStatus().equals(4)) {
        return ResponseResult.errorResult(AppHttpCodeEnum.INVALID_OPERATION);
    }
    
    // 公開ステータスを切り替え
    boolean isPublish = dto.getEnable() == 1;
    update(Wrappers.<WmNews>lambdaUpdate()
        .eq(WmNews::getId, dto.getId())
        .set(WmNews::getEnable, isPublish));
    
    // アプリ側に通知
    notifyAppService(news.getArticleId(), isPublish);
    
    return ResponseResult.okResult(AppHttpCodeEnum.SUCCESS);
}

5.1.2 アプリ側のメッセージ受信

@Component
public class ArticleStatusListener {

    @KafkaListener(topics = "article.status.update")
    public void handleStatusUpdate(ConsumerRecord<?, ?> record) {
        String message = (String) record.value();
        Map<String, Object> data = JSON.parseObject(message, Map.class);
        
        Integer articleId = (Integer) data.get("articleId");
        Boolean isPublish = (Boolean) data.get("isPublish");
        
        articleConfigService.updateStatus(articleId, isPublish);
    }
}

この実装により、分散タスクスケジューリングとコンテンツ審査システムが統合され、効率的なコンテンツ管理が実現されます。

タグ: XXL-Job 分散タスクスケジューリング コンテンツ審査 Spring Boot Kafka

5月15日 15:50 投稿