Java スレッドプールの仕組みと Executor フレームワークの活用

Executor フレームワークの概要

Java 5 以降、java.util.concurrent パッケージに標準搭載された Executor フレームワークは、タスクの提交と実行を分離する設計パターンを提供します。開発者は業務ロジックであるタスクを定義し、それをスレッドプールに提交するだけでよく、スレッドの生成、割り当て、ライフサイクル管理といった低レベルの処理を意識する必要がなくなります。

Executors ファクトリクラスによる初期化

スレッドプールを簡単に生成するためのファクトリクラスとして Executors が用意されています。主に以下の 4 種類のプールを作成可能です。

1. キャッシュ型スレッドプール (newCachedThreadPool)

必要に応じてスレッドを生成し、空闲状態が 60 秒を超えたスレッドは自動的に终止されます。最大スレッド数は Integer.MAX_VALUE まで拡張可能であり、内部では SynchronousQueue を使用します。短期間の非同期タスクに適していますが、負荷が高い場合にスレッドが大量生成されるリスクがあるため、并发数の制御が必要です。

2. 固定サイズスレッドプール (newFixedThreadPool)

指定された数のスレッドを維持するプールです。コアスレッド数と最大スレッド数が同一であり、タスクキューには LinkedBlockingQueue が使用されます。スレッドが空闲になってもすぐに释放されず、再利用されます。

3. シングルスレッドプール (newSingleThreadExecutor)

単一のワーカー线程のみを使用するプールです。タスクは FIFO 順序で確実に直列実行されます。スレッドが異常终止した場合でも、新しいスレッドが生成され処理が継続されるため、タスクの実行順序保証が必要な場景で有用です。

4. 定时スレッドプール (newScheduledThreadPool)

遅延実行や周期性の実行をサポートするプールです。データ同期や定期バッチ処理などのユースケースに適しています。

実装例:スレッドプールの挙動比較

異なるプールタイプにおけるタスク実行の挙動を確認するためのコード例を示します。

キャッシュ型プールの例

public class WorkerTask implements Runnable {
    private final int taskId;

    public WorkerTask(int taskId) {
        this.taskId = taskId;
    }

    @Override
    public void run() {
        System.out.println("Task " + taskId + " executed by " + Thread.currentThread().getName());
    }

    public static void main(String[] args) {
        ExecutorService pool = Executors.newCachedThreadPool();
        for (int count = 0; count < 5; count++) {
            pool.execute(new WorkerTask(count));
        }
        pool.shutdown();
    }
}

この場合、空闲スレッドがあれば再利用され、なければ新規スレッドが生成されます。

シングルスレッドプールの例

public class WorkerTask implements Runnable {
    private final int taskId;

    public WorkerTask(int taskId) {
        this.taskId = taskId;
    }

    @Override
    public void run() {
        System.out.println("Task " + taskId + " executed by " + Thread.currentThread().getName());
    }

    public static void main(String[] args) {
        ExecutorService pool = Executors.newSingleThreadExecutor();
        for (int count = 0; count < 5; count++) {
            pool.execute(new WorkerTask(count));
        }
        pool.shutdown();
    }
}

出力結果は常にタスク ID の順序通りになり、同時に実行されるスレッドは 1 つのみです。

定时任务の実行方法

ScheduledExecutorService を使用することで、遅延実行や定期実行が可能になります。

遅延実行の例

タスク提交から 5 秒後に一度だけ実行します。

public class ScheduledDemo {
    public static void main(String[] args) {
        ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(3);
        scheduler.schedule(() -> System.out.println("Delayed execution after 5 seconds"), 5, TimeUnit.SECONDS);
        scheduler.shutdown();
    }
}

定期実行の例

2 秒の遅延後、4 秒間隔でタスクを繰り返し実行します。

public class ScheduledDemo {
    public static void main(String[] args) {
        ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(3);
        scheduler.scheduleWithFixedDelay(() -> System.out.println("Repeated task"), 2, 4, TimeUnit.SECONDS);
        try {
            Thread.sleep(15000);
            scheduler.shutdown();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

ThreadPoolExecutor の内部構造

上記のファクトリメソッドは、実際には ThreadPoolExecutor クラスをインスタンス化しています。コンストラクタの主要パラメータは以下の通りです。

  • corePoolSize: コアスレッド数。常に維持されるスレッド数。
  • maximumPoolSize: 最大スレッド数。キューが満杯になった際に生成可能な最大数。
  • keepAliveTime: 空闲スレッドが终止するまでの等待時間。
  • unit: 時間の単位。
  • workQueue: タスクを保持するブロッキングキュー。
  • threadFactory: スレッド生成用のファクトリ。
  • handler: キュー満杯時の拒否戦略。

ブロッキングキューの種類

  • ArrayBlockingQueue: 配列ベースの有界キュー。FIFO 順序。
  • LinkedBlockingQueue: リンクリストベースのキュー。吞吐量が高い。
  • SynchronousQueue: 要素を保持しないキュー。直接ハンドオフ方式。
  • PriorityBlockingQueue: 優先度付きの非阻塞キュー。

飽和時の拒否戦略 (RejectedExecutionHandler)

キューが満杯かつスレッド数が最大に達した場合の動作定義です。

  • AbortPolicy: 例外 RejectedExecutionException をスローする(デフォルト)。
  • DiscardPolicy: タスクを silently 丢弃する。
  • DiscardOldestPolicy: キュー内の最も古いタスクを丢弃し、再提交する。
  • CallerRunsPolicy: 提交元のスレッド自体がタスクを実行する。

スレッドプールの状態遷移

プール内部では ctl という変数で状態とスレッド数を管理しています。主な状態は以下の 5 つです。

  1. RUNNING: 新タスクを受け付け、キュー内のタスクも処理する。
  2. SHUTDOWN: 新タスクは受け付けないが、キュー内のタスクは処理する。
  3. STOP: 新タスク不接受,キュー内のタスクも丢弃し、実行中のタスクを中断する。
  4. TIDYING: 全タスクが終了し、スレッド数が 0 になった後の状態。
  5. TERMINATED: 终止処理が完了した状態。

タスク提交メソッドの違い

タスクをプールに送る方法として executesubmit があります。

  • execute(Runnable): 戻り値はなく、Executor インターフェースで定義。
  • submit(Callable/Runnable): 実行結果を保持する Future オブジェクトを返す。ExecutorService インターフェースで定義。

submit を使用すると、内部で RunnableFutureTask にラップされ、execute メソッド経由で処理されます。

プールの终止方法

リソース解放のために以下の 2 つのメソッドが提供されています。

  • shutdown(): 徐々に终止する。既存タスクは完了させるが、新タスクは受け付けない。
  • shutdownNow(): 即座に终止する。実行中のタスクを中断し、等待中のタスクをリストとして返す。

スレッド数の動的調整

稼働中のプールに対して、setCorePoolSizesetMaximumPoolSize メソッドを呼び出すことで、スレッド数の上限や基準を動的に変更することが可能です。これにより、負荷変動に応じたリソース制御が行えます。

タグ: Java concurrency thread-pool executor-service

5月19日 15:11 投稿