# 分散タスクスケジューリングとコンテンツ審査システムの実装
1. 分散タスクスケジューリングの概要
1.1 タスクスケジューリングとは
タスクスケジューリングとは、システムが特定のタスクを自動的に実行するために、指定された時間にタスクを実行するプロセスです。これにより、人手を解放し、システムが自動的にタスクを実行できます。
一般的な実装方法:
- マルチスレッド方式 + sleep
- JDK提供のAPI(Timer、ScheduledExecutorなど)
- フレームワーク(Quartzなど)
- Spring Task
1.2 Cron式
Cron式は、タスクの実行タイミングを設定するための文字列形式です。7つの部分から構成され、各部分はスペースで区切られます。
構成要素意味値の範囲第一部分Seconds (秒)0-59第二部分Minutes(分)0-59第三部分Hours(時)0-23第四部分Day-of-Month(日)1-31第五部分Month(月)0-11またはJAN-DEC第六部分Day-of-Week(曜日)1-7(1は日曜日)またはSUN-SAT第七部分Year(年) オプション1970-2099
特殊記号も利用可能:
?: 不確定値*: すべての値,: 複数値設定-: 範囲設定/: 間隔設定L: 月の最終日W: 最も近い営業日#: 週の第何曜日
1.3 分散タスクスケジューリングの特徴
分散タスクスケジューリングは、以下の利点があります:
- 並列処理によるパフォーマンス向上
- 高可用性(障害時のフェイルオーバー)
- 弾力的なスケールアウト
- タスクの統一管理と監視
1.4 XXL-Jobの紹介
XXL-Jobは、大众点评が開発した軽量分散タスクスケジューリングプラットフォームです。主要な特徴:
- シンプルで柔軟な設計
- 丰富的なタスク管理機能
- 高パフォーマンス(非同期処理)
- 高可用性(クラスタ展開対応)
- モニタリングと運用の容易性
2. XXL-Jobのセットアップと使用
2.1 環境準備
2.1.1 スケジューリングセンターの要件
- Maven3+
- JDK1.8+
- MySQL5.7+
2.1.2 データベース初期化
以下のSQLスクリプトを実行してデータベースを初期化します:
-- xxl_job_lock:タスクスケジューリングロックテーブル
-- xxl_job_group:実行者情報テーブル
-- xxl_job_info:スケジューリング拡張情報テーブル
-- xxl_job_log:スケジューリングログテーブル
-- xxl_job_registry:実行者登録テーブル
2.1.3 スケジューリングセンターの設定
# データベース接続情報
spring.datasource.url=jdbc:mysql://localhost:3306/xxl_job?Unicode=true&serverTimezone=Asia/Shanghai&characterEncoding=UTF-8
spring.datasource.username=root
spring.datasource.password=root
# サーバーポート
server.port=8888
server.servlet.context-path=/xxl-job-admin
2.2 実行者の設定
実行者は、スケジューリングリクエストを受け取りタスクを実行するコンポーネントです。
2.2.1 プロジェクトの作成
<dependency>
<groupId>com.xuxueli</groupId>
<artifactId>xxl-job-core</artifactId>
<version>2.2.0</version>
</dependency>
2.2.2 設定ファイル
# スケジューリングセンターのアドレス
xxl.job.admin.addresses=http://localhost:8888/xxl-job-admin
# 実行者のアプリ名
xxl.job.executor.appname=content-review-executor
# 実行者のポート
xxl.job.executor.port=9999
# ログパス
xxl.job.executor.logpath=/data/applogs/xxl-job/jobhandler
2.2.3 タスクの実装
@Component
public class ContentReviewJob {
@XxlJob("contentReviewTask")
public ReturnT<String> execute(String param) throws Exception {
System.out.println("コンテンツ審査タスクを実行中: " + LocalDateTime.now());
// 審査ロジックを実装
reviewPendingContent();
return ReturnT.SUCCESS;
}
private void reviewPendingContent() {
// 審査待ちのコンテンツを取得して処理
}
}
3. 自媒体コンテンツ審査システム
3.1 需求分析
- 自媒体記事が自動審査に合格した場合、ステータスを「8(審査通過待発行)」に設定
- 定時タスクが1分ごとに審査通過待発行の記事をスキャン
- 発行時間が現在時刻より前の記事を即時発行
3.2 データ準備
3.2.1 APIインターフェースの定義
@GetMapping("/findPendingPublish")
List<Integer> findPendingPublish();
3.2.2 サービス層の実装
@Override
public List<Integer> findPendingPublish() {
List<WmNews> pendingList = list(Wrappers.<WmNews>lambdaQuery()
.eq(WmNews::getStatus, 8)
.lt(WmNews::getPublishTime, new Date()));
return pendingList.stream()
.map(WmNews::getId)
.collect(Collectors.toList());
}
3.3 定時タスクの設定
3.3.1 スケジューリングセンターでのタスク作成
- 実行者:content-review-executor
- Cron式:
0 * * * * ?(毎分実行) - ルーティング戦略:ROUND(ラウンドロビン)
3.3.2 タスクの実装
@Component
public class PublishScheduler {
@Autowired
private WemediaFeign wemediaFeign;
@Autowired
private ContentPublishService publishService;
@XxlJob("publishPendingContent")
public ReturnT<String> execute(String param) throws Exception {
log.info("発行待ちコンテンツのスキャンを開始");
List<Integer> pendingIds = wemediaFeign.findPendingPublish();
if (pendingIds != null && !pendingIds.isEmpty()) {
for (Integer id : pendingIds) {
publishService.publishContent(id);
}
}
log.info("発行処理が完了しました");
return ReturnT.SUCCESS;
}
}
4. 人工審査機能
4.1 需求分析
- 自媒体記事が自動審査に合格しなかった場合、ステータスを「3(人工審査待ち)」に設定
- 管理者は待機中の記事を確認し、承認または却下できる
- 承認時はステータスを「4」に、却下時はステータスを「2」に変更
4.2 自媒体側の実装
4.2.1 記事リストの取得
@PostMapping("/findReviewList")
public PageResponseResult findReviewList(ReviewDto dto) {
// ページングと検索ロジックを実装
return wmNewsService.findReviewList(dto);
}
4.2.2 記事詳細の取得
@GetMapping("/findReviewDetail/{id}")
public WmNewsVo findReviewDetail(@PathVariable Integer id) {
return wmNewsService.findReviewDetail(id);
}
4.3 管理者側の実装
4.3.1 審査アクションの処理
@PostMapping("/approve")
public ResponseResult approveContent(@RequestBody ReviewActionDto dto) {
return contentReviewService.approveContent(dto);
}
@PostMapping("/reject")
public ResponseResult rejectContent(@RequestBody ReviewActionDto dto) {
return contentReviewService.rejectContent(dto);
}
4.3.2 サービス層の実装
@Override
public ResponseResult approveContent(ReviewActionDto dto) {
WmNews news = wemediaFeign.findById(dto.getId());
if (news == null) {
return ResponseResult.errorResult(AppHttpCodeEnum.DATA_NOT_EXIST);
}
// ステータスを4(公開済み)に更新
updateWmNews(news, (short) 4, "人工審査通過");
return ResponseResult.okResult(AppHttpCodeEnum.SUCCESS);
}
@Override
public ResponseResult rejectContent(ReviewActionDto dto) {
WmNews news = wemediaFeign.findById(dto.getId());
if (news == null) {
return ResponseResult.errorResult(AppHttpCodeEnum.DATA_NOT_EXIST);
}
// ステータスを2(却下)に更新
updateWmNews(news, (short) 2, dto.getReason());
return ResponseResult.okResult(AppHttpCodeEnum.SUCCESS);
}
5. 記事の公開/非公開機能
5.1 ロジックの実装
5.1.1 記事の公開/非公開処理
@Override
public ResponseResult togglePublishStatus(ToggleDto dto) {
WmNews news = getById(dto.getId());
if (news == null || !news.getStatus().equals(4)) {
return ResponseResult.errorResult(AppHttpCodeEnum.INVALID_OPERATION);
}
// 公開ステータスを切り替え
boolean isPublish = dto.getEnable() == 1;
update(Wrappers.<WmNews>lambdaUpdate()
.eq(WmNews::getId, dto.getId())
.set(WmNews::getEnable, isPublish));
// アプリ側に通知
notifyAppService(news.getArticleId(), isPublish);
return ResponseResult.okResult(AppHttpCodeEnum.SUCCESS);
}
5.1.2 アプリ側のメッセージ受信
@Component
public class ArticleStatusListener {
@KafkaListener(topics = "article.status.update")
public void handleStatusUpdate(ConsumerRecord<?, ?> record) {
String message = (String) record.value();
Map<String, Object> data = JSON.parseObject(message, Map.class);
Integer articleId = (Integer) data.get("articleId");
Boolean isPublish = (Boolean) data.get("isPublish");
articleConfigService.updateStatus(articleId, isPublish);
}
}
この実装により、分散タスクスケジューリングとコンテンツ審査システムが統合され、効率的なコンテンツ管理が実現されます。