はじめに
前回の記事「CPU温度を下げた経験談」で、バイクの投入管理アプリケーションを担当していることをお伝えしました。大量のトラフィックが新アプリケーションに切り替わった後も、投入処理の複雑さから不安を感じていました。特に、線程池を用いて非同期処理を行っている部分は、深く理解できておらず、しばらくの間、その仕組みを「咀嚼できない」と感じていました。
線程池の3つの主要パラメータやバッキングキューの種類について、いつも行き当たりばcieで検索しては理解を装う日々が続いていました。しかし、この状況を変えたいと考え、チームの文博が主催する学習チーム「Thor」に参加しました。そこで学んだ構造化思考の重要性から、線程池を体系的に学び直す必要性を痛感しました。
線程池を利用すべき理由
近年、CPUのコア数が増加していることで、並列処理の重要性が高まっています。Javaの線程モデルは、プリエンプティブな協調型マルチスレッドを採用しており、多くの線程を扱う場合、以下のような課題が生じます:
- 線程の作成と破棄にかかるコスト
- コンテキ스트スイッチのオーバーヘッド
Javaの標準ライブラリ「java.util.concurrent.ThreadPoolExecutor」は、これらの課題を解決するための「プーリング」概念を実装しています。プーリングは以下のようなメリットを提供します:
- リソースの再利用
- 処理能力の効率化
線程池の内部仕組みを理解するため、ThreadPoolExecutorのソースコードを分析します。
ソースコードの分析
ThreadPoolExecutorの中心的なメソッドである「execute(Runnable command)」の仕組みを解剖します:
public void execute(Runnable task) {
if (task == null) return;
int ctlState = ctl.get();
int currentThreadCount = workerCountOf(ctlState);
if (currentThreadCount < coreThreadCount) {
if (addWorker(task, true)) {
return;
}
ctlState = ctl.get();
}
if (isRunning(ctlState) && workQueue.offer(task)) {
int recheckState = ctl.get();
if (!isRunning(recheckState) && remove(task)) {
reject(task);
} else if (workerCountOf(recheckState) == 0) {
addWorker(null, false);
}
} else if (!addWorker(task, false)) {
reject(task);
}
}
このメソッドは、以下のような処理を実施します:
- 現在のスレッド数がコアスレッド数以下の場合、新規スレッドを作成します。
- 現在のスレッド数がコアスレッド数を超えた場合、タスクをバッキングキューに格納します。
- バッキングキューが満杯の場合、最大スレッド数まで新規スレッドを作成します。
ThreadPoolExecutorは、 ctl変数を用いてスレッド池の状態を管理します。ctlは以下のようなビットマップ形式で状態を保持しています:
private static int getThreadCount(int ctlValue) {
return ctlValue & 0x1CFFFFFF;
}
ctlの上位3ビットはスレッド池の状態を表し、下位29ビットは現在稼働しているスレッド数を表します。
addWorkerメソッドは、スレッド池の状態とスレッド数を元に、新規スレッドの作成を決定します:
private boolean addWorker(Runnable task, boolean isCore) {
retryLoop:
for (;;) {
int ctlValue = ctl.get();
int runState = runStateOf(ctlValue);
if (runState >= SHUTDOWN) {
return false;
}
int maxThreadCount = isCore ? coreThreadCount : maxThreadCount;
int currentThreadCount = workerCountOf(ctlValue);
if (currentThreadCount >= maxThreadCount) {
return false;
}
if (compareAndIncrementWorkerCount(ctlValue)) {
break retryLoop;
}
}
Worker worker = new Worker(task);
Thread thread = worker.thread;
if (thread != null) {
try {
thread.start();
} catch (Exception e) {
return false;
}
}
return true;
}
Workerスレッドが作成された後、runWorkerメソッドが実行されます:
private void runWorker(Worker worker) {
Thread currentThread = Thread.currentThread();
Runnable task = worker.firstTask;
worker.firstTask = null;
while (task != null || (task = getTask()) != null) {
try {
task.run();
worker.completedTaskCount++;
} catch (Throwable t) {
// エラーハンドリング
} finally {
task = null;
}
}
}
线程池の監視
大量のトラフィックが流入する際、スレッド池の状態を正確に把握する必要があります。ThreadPoolExecutorは、beforeExecuteとafterExecuteというフ勾ヤ点を提供しています。これらをオーバーライドすることで、スレッド池の状態を監視可能です。
public class CustomThreadPool extends ThreadPoolExecutor {
private String threadPoolName;
public CustomThreadPool(int coreThreads, int maxThreads, long keepAliveTime,
TimeUnit unit, BlockingQueue<Runnable> workQueue,
String name) {
super(coreThreads, maxThreads, keepAliveTime, unit, workQueue);
this.threadPoolName = name;
}
@Override
protected void afterExecute(Runnable task, Throwable throwable) {
super.afterExecute(task, throwable);
log.info("{} スレッドプール状態: " +
" 現在稼働スレッド数: {}, コアスレッド数: {}, 実行中のタスク数: {}, " +
"完了タスク数: {}, キューサイズ: {}, 最大スレッド数: {}",
this.threadPoolName,
this.getPoolSize(),
this.getCorePoolSize(),
this.getActiveCount(),
this.getCompletedTaskCount(),
this.getQueue().size(),
this.getMaximumPoolSize());
}
}
上記の例は、スレッドプールの状態をログに出力する例です。稼働スレッド数、タスク数、キュー状態などを確認可能です。