カスタムスレッドプールによる非同期タスクマネージャーの実装

バックエンドで時間のかかる処理を行う際、スレッドが長時間ブロックされることがよくあります。このような場合、非同期スレッドを使用してタスクを処理することが一般的です。毎回新しいスレッドを作成する方法はコードの重複を招き、スレッド管理が煩雑になり、システムパフォーマンスに悪影響を与える可能性があります。Springフレームワークでは@Asyncアノテーションを用いた非同期処理が提供されていますが、この方法ではスレッドプールの詳細な制御やタスクの柔軟なスケジューリングが困難です。そのため、効率的なカスタム非同期タスクマネージャーを構築し、すべてのタスクを一元的に管理することが望ましいです。

1. 前提設定

カスタムスレッドプールを定義し、IoCコンテナに登録します。

/**
 * カスタムスレッドプールの設定
 * @Author GuihaoLv
 **/
@Configuration
public class ThreadPoolConfig
{
    // コアスレッド数
    private int corePoolSize = 50;

    // 最大スレッド数
    private int maxPoolSize = 200;

    // キューの最大容量
    private int queueCapacity = 1000;

    // スレッドのアイドル時間(秒)
    private int keepAliveSeconds = 300;

    /**
     * 共通タスク用スレッドプール
     * @return
     */
    @Bean(name = "threadPoolTaskExecutor")
    public ThreadPoolTaskExecutor threadPoolTaskExecutor()
    {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setMaxPoolSize(maxPoolSize);
        executor.setCorePoolSize(corePoolSize);
        executor.setQueueCapacity(queueCapacity);
        executor.setKeepAliveSeconds(keepAliveSeconds);
        // リジェクトされたタスクの処理ポリシー
        // キューが満杯の場合、CallerRunsPolicyにより呼び出し元スレッドでタスクを実行
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        return executor;
    }

    /**
     * 定期実行またはスケジュールタスク用
     */
    @Bean(name = "scheduledExecutorService")
    protected ScheduledExecutorService scheduledExecutorService()
    {
        // このスレッドプールには最大スレッド数の概念がない
        return new ScheduledThreadPoolExecutor(corePoolSize,
                new BasicThreadFactory.Builder().namingPattern("schedule-pool-%d")
                        .daemon(true).build(),
                new ThreadPoolExecutor.CallerRunsPolicy())
        {
            // タスク完了後に例外をキャッチ・ログ出力
            @Override
            protected void afterExecute(Runnable r, Throwable t)
            {
                super.afterExecute(r, t);
                Threads.printException(r, t);
            }
        };
    }
}

2. 非同期タスクマネージャーの設定

/**
 * 非同期タスクマネージャー
 * AsyncManager は非同期タスクのスケジューリングと管理の中心です。
 * @Author GuihaoLv
 */
public class AsyncManager
{
    /**
     * 処理遅延時間(ミリ秒)
     */
    private final int OPERATE_DELAY_TIME = 10;

    /**
     * 非同期タスクのスケジューリング用スレッドプール
     * executor は ScheduledExecutorService を使用し、定期実行をサポート
     */
    private ScheduledExecutorService executor = SpringUtils.getBean("scheduledExecutorService");

    /**
     * シングルトンパターン
     * 全体で1つの AsyncManager インスタンスのみを維持し、タスク管理を統一
     */
    private AsyncManager(){}

    // シングルトンインスタンス
    private static AsyncManager me = new AsyncManager();

    public static AsyncManager me()
    {
        return me;
    }

    /**
     * タスクをスケジューリング実行
     * @param task タスク
     */
    public void execute(TimerTask task)
    {
        executor.schedule(task, OPERATE_DELAY_TIME, TimeUnit.MILLISECONDS);
    }

    /**
     * スレッドプールをシャットダウン
     */
    public void shutdown()
    {
        // グレースフルシャットダウン
        Threads.shutdownAndAwaitTermination(executor);
    }
}
/**
 * アプリケーション終了時にバックグラウンドスレッドを停止
 * @Author GuihaoLv
 */
@Component
public class ShutdownManager
{
    private static final Logger logger = LoggerFactory.getLogger("sys-user");

    @PreDestroy
    public void destroy()
    {
        shutdownAsyncManager();
        HttpUtils.shutdown();
    }

     /**
     * 非同期タスクの停止
     */
    private void shutdownAsyncManager()
    {
        try
        {
            logger.info("====バックグラウンドタスクスレッドプールを停止中====");
            AsyncManager.me().shutdown();
        }
        catch (Exception e)
        {
            logger.error(e.getMessage(), e);
        }
    }
}

3. 非同期ファクトリーの設定

非同期タスクの起動ロジックを非同期ファクトリーに集約します。

/**
 * 非同期ファクトリー(タスク生成用)
 * AsyncFactory は非同期タスクの作成を担当し、異なる用途に対応したタスクを生成します。
 *
 * @Author GuihaoLv
 */
public class AsyncFactory
{
    private static final Logger sys_user_logger = LoggerFactory.getLogger("sys-user");

    // ホット記事計算タスク
    public static TimerTask calculateHotArticlesTask() {
        return new TimerTask() {
            @Override
            public void run() {
                try {
                    HotArticleAsycTask hotArticleAsycTask = SpringUtils.getBean(HotArticleAsycTask.class);
                    hotArticleAsycTask.calculateHotArticles();
                } catch (Exception e) {
                    sys_user_logger.error("ホット記事の計算に失敗", e);
                }
            }
        };
    }

    // 旧キーワードクリーンアップタスク
    public static TimerTask cleanupOldKeywords() {
        return new TimerTask() {
            @Override
            public void run() {
                try {
                    HotSearchCleanupTask hotArticleAsycTask = SpringUtils.getBean(HotSearchCleanupTask.class);
                    hotArticleAsycTask.cleanupOldKeywords();
                } catch (Exception e) {
                    sys_user_logger.error("ホット検索キーワードのクリーンアップに失敗", e);
                }
            }
        };
    }
}

4. 非同期ファクトリーの利用

/**
 * ホット記事のリアルタイム計算
 * @Author GuihaoLv
 */
@Component
public class HotArticleAsycTask {

    @Autowired
    private StringRedisTemplate redisTemplate;

    private static final String LIKES_KEY = "blog:likes";
    private static final String FAVORITES_KEY = "blog:favorites";
    private static final String HOT_ARTICLES_KEY = "blog:hot";

    public void calculateHotArticles() {
        Set<String> blogIds = redisTemplate.opsForZSet().range(LIKES_KEY, 0, -1);
        if (blogIds == null || blogIds.isEmpty()) return;

        for (String blogId : blogIds) {
            Double likes = redisTemplate.opsForZSet().score(LIKES_KEY, blogId);
            Double favorites = redisTemplate.opsForZSet().score(FAVORITES_KEY, blogId);

            // 計算式
            double hotScore = (likes != null ? likes * 5.0 : 0) +
                    (favorites != null ? favorites * 8.0 : 0);

            // 熱度閾値を超えた場合はホット記事として保存
            if (hotScore >= 500) {
                redisTemplate.opsForZSet().add(HOT_ARTICLES_KEY, blogId, hotScore);
                System.out.println("🔥 記事 " + blogId + " がホットリストに追加されました。熱度:" + hotScore);
            }
        }
    }

    /**
     * 5分ごとに実行される定期タスク
     */
    @Scheduled(fixedRate = 100000)
    public void scheduleHotArticlesCalculation() {
        AsyncManager.me().execute(AsyncFactory.calculateHotArticlesTask());
    }
}

タグ: Java Spring 非同期処理 スレッドプール ScheduledExecutorService

5月24日 04:56 投稿