线程池の奥深さ

はじめに

前回の記事「CPU温度を下げた経験談」で、バイクの投入管理アプリケーションを担当していることをお伝えしました。大量のトラフィックが新アプリケーションに切り替わった後も、投入処理の複雑さから不安を感じていました。特に、線程池を用いて非同期処理を行っている部分は、深く理解できておらず、しばらくの間、その仕組みを「咀嚼できない」と感じていました。

線程池の3つの主要パラメータやバッキングキューの種類について、いつも行き当たりばcieで検索しては理解を装う日々が続いていました。しかし、この状況を変えたいと考え、チームの文博が主催する学習チーム「Thor」に参加しました。そこで学んだ構造化思考の重要性から、線程池を体系的に学び直す必要性を痛感しました。

線程池を利用すべき理由

近年、CPUのコア数が増加していることで、並列処理の重要性が高まっています。Javaの線程モデルは、プリエンプティブな協調型マルチスレッドを採用しており、多くの線程を扱う場合、以下のような課題が生じます:

  • 線程の作成と破棄にかかるコスト
  • コンテキ스트スイッチのオーバーヘッド

Javaの標準ライブラリ「java.util.concurrent.ThreadPoolExecutor」は、これらの課題を解決するための「プーリング」概念を実装しています。プーリングは以下のようなメリットを提供します:

  • リソースの再利用
  • 処理能力の効率化

線程池の内部仕組みを理解するため、ThreadPoolExecutorのソースコードを分析します。

ソースコードの分析

ThreadPoolExecutorの中心的なメソッドである「execute(Runnable command)」の仕組みを解剖します:

public void execute(Runnable task) {
    if (task == null) return;
    
    int ctlState = ctl.get();
    int currentThreadCount = workerCountOf(ctlState);
    
    if (currentThreadCount < coreThreadCount) {
        if (addWorker(task, true)) {
            return;
        }
        ctlState = ctl.get();
    }
    
    if (isRunning(ctlState) && workQueue.offer(task)) {
        int recheckState = ctl.get();
        if (!isRunning(recheckState) && remove(task)) {
            reject(task);
        } else if (workerCountOf(recheckState) == 0) {
            addWorker(null, false);
        }
    } else if (!addWorker(task, false)) {
        reject(task);
    }
}

このメソッドは、以下のような処理を実施します:

  1. 現在のスレッド数がコアスレッド数以下の場合、新規スレッドを作成します。
  2. 現在のスレッド数がコアスレッド数を超えた場合、タスクをバッキングキューに格納します。
  3. バッキングキューが満杯の場合、最大スレッド数まで新規スレッドを作成します。

ThreadPoolExecutorは、 ctl変数を用いてスレッド池の状態を管理します。ctlは以下のようなビットマップ形式で状態を保持しています:

private static int getThreadCount(int ctlValue) {
    return ctlValue & 0x1CFFFFFF;
}

ctlの上位3ビットはスレッド池の状態を表し、下位29ビットは現在稼働しているスレッド数を表します。

addWorkerメソッドは、スレッド池の状態とスレッド数を元に、新規スレッドの作成を決定します:

private boolean addWorker(Runnable task, boolean isCore) {
    retryLoop:
    for (;;) {
        int ctlValue = ctl.get();
        int runState = runStateOf(ctlValue);
        
        if (runState >= SHUTDOWN) {
            return false;
        }
        
        int maxThreadCount = isCore ? coreThreadCount : maxThreadCount;
        int currentThreadCount = workerCountOf(ctlValue);
        
        if (currentThreadCount >= maxThreadCount) {
            return false;
        }
        
        if (compareAndIncrementWorkerCount(ctlValue)) {
            break retryLoop;
        }
    }
    
    Worker worker = new Worker(task);
    Thread thread = worker.thread;
    
    if (thread != null) {
        try {
            thread.start();
        } catch (Exception e) {
            return false;
        }
    }
    return true;
}

Workerスレッドが作成された後、runWorkerメソッドが実行されます:

private void runWorker(Worker worker) {
    Thread currentThread = Thread.currentThread();
    Runnable task = worker.firstTask;
    worker.firstTask = null;
    
    while (task != null || (task = getTask()) != null) {
        try {
            task.run();
            worker.completedTaskCount++;
        } catch (Throwable t) {
            // エラーハンドリング
        } finally {
            task = null;
        }
    }
}

线程池の監視

大量のトラフィックが流入する際、スレッド池の状態を正確に把握する必要があります。ThreadPoolExecutorは、beforeExecuteとafterExecuteというフ勾ヤ点を提供しています。これらをオーバーライドすることで、スレッド池の状態を監視可能です。

public class CustomThreadPool extends ThreadPoolExecutor {
    private String threadPoolName;
    
    public CustomThreadPool(int coreThreads, int maxThreads, long keepAliveTime,
                           TimeUnit unit, BlockingQueue<Runnable> workQueue,
                           String name) {
        super(coreThreads, maxThreads, keepAliveTime, unit, workQueue);
        this.threadPoolName = name;
    }
    
    @Override
    protected void afterExecute(Runnable task, Throwable throwable) {
        super.afterExecute(task, throwable);
        
        log.info("{} スレッドプール状態: " +
                " 現在稼働スレッド数: {}, コアスレッド数: {}, 実行中のタスク数: {}, " +
                "完了タスク数: {}, キューサイズ: {}, 最大スレッド数: {}",
                this.threadPoolName,
                this.getPoolSize(),
                this.getCorePoolSize(),
                this.getActiveCount(),
                this.getCompletedTaskCount(),
                this.getQueue().size(),
                this.getMaximumPoolSize());
    }
}

上記の例は、スレッドプールの状態をログに出力する例です。稼働スレッド数、タスク数、キュー状態などを確認可能です。

タグ: ThreadPoolExecutor 线程池 スレッドプール java.util.concurrent

6月22日 22:32 投稿