標準ThreadPoolExecutorとの動作比較
本実装はjava.util.concurrent.ThreadPoolExecutorの動作を拡張。主な差異:
- 標準スレッドプール:初期化時コアスレッド生成。キュー満杯で追加スレッド作成
- 本実装:要求時動的スレッド生成。タスク完了後全スレッド終了
利点:常駐スレッド削減によるメモリ節約(1スレッド≈1MB)とCPUコンテキスト切り替えコスト低減
基本実装例
public class DynamicThreadPool {
private final AtomicInteger activeThreads = new AtomicInteger(0);
private final Queue<Task> taskQueue = new ConcurrentLinkedQueue<>();
private final int maxThreads;
private final AtomicBoolean shutdownFlag = new AtomicBoolean(false);
public void submit(Task task) {
if (shutdownFlag.get()) throw new IllegalStateException("Pool shutdown");
activeThreads.getAndUpdate(current ->
current < maxThreads ? current + 1 : current
);
if (activeThreads.get() <= maxThreads) {
new Worker(task).start();
} else {
taskQueue.offer(task);
}
}
private class Worker extends Thread {
private final Task initialTask;
Worker(Task task) {
this.initialTask = task;
}
@Override
public void run() {
try {
initialTask.execute();
} finally {
processNext();
}
}
private void processNext() {
Task next = taskQueue.poll();
if (next != null) {
new Worker(next).start();
} else {
activeThreads.decrementAndGet();
}
}
}
@FunctionalInterface
public interface Task {
void execute() throws Exception;
}
}
ThreadPoolExecutorを利用した改良版
public class EnhancedThreadPool extends ThreadPoolExecutor {
public EnhancedThreadPool(int maxThreads, String prefix) {
super(maxThreads, maxThreads, 60, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(),
r -> new Thread(r, prefix + "-" + r.hashCode()));
allowCoreThreadExpiration(true);
}
@Override
public void allowCoreThreadTimeOut(boolean enable) {
throw new UnsupportedOperationException();
}
}
重要設定: allowCoreThreadTimeOut(true)でコアスレッドのアイドル終了を許可