Javaにおける高スレッド並行処理とスレッドプールの実装技術

JavaのスレッドプールはExecutorフレームワークインターフェースを通じて実現されており、Executor、Executorsユーティリティクラス、ExecutorService、ThreadPoolExecutorといったコンポーネントが含まれます。

Executorsによるスレッド作成の三つの方法:

ExecutorService pool = Executors.newFixedThreadPool(5);    // 固定サイズ
ExecutorService pool = Executors.newSingleThreadExecutor();     // シングルスレッドプール
ExecutorService pool = Executors.newCachedThreadPool();     // キャッシュ型、必要に応じてスレッドを作成

以下ではこれら三つの違いについて説明します:

固定サイズのスレッドプール

最初に見ていくのは固定サイズのスレッドプールExecutors.newFixedThreadPool(5)です:

コードを見てみましょう:

/**
 * 主な特徴:スレッドの再利用、最大並行数の制御、スレッド管理
 *
 * @author example
 */
public class CustomThreadPoolExample {
    public static void main(String[] args) {
        // 5つのワーカースレッドを持つプール
        ExecutorService pool = Executors.newFixedThreadPool(5);
        try {
            // 10人の顧客をシミュレートし、5つのサービス窓口のみが存在
            for (int index = 0; index < 10; index++) {
                final int customerNumber = index;
                pool.execute(() -> {
                    System.out.println(Thread.currentThread().getName() + "\t" + "サービスを処理中 - 顧客番号: " + customerNumber);
                });
                Thread.sleep(300);
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            pool.shutdown();
        }
    }
}

結果は以下のようになります:

次に少しスレッドの待機時間を追加したコードを見てみましょう:

public class CustomThreadPoolExample {
    public static void main(String[] args) {
        // 5つのワーカースレッドを持つプール
        ExecutorService pool = Executors.newFixedThreadPool(5);
        try {
            for (int index = 0; index < 10; index++) {
                final int customerNumber = index;
                pool.execute(() -> {
                    System.out.println(Thread.currentThread().getName() + "\t" + "サービスを処理中 - 顧客番号: " + customerNumber);
                });
                Thread.sleep(300);
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            pool.shutdown();
        }
    }
}

各タスクが順番に処理されていることが確認できます。これは固定サイズのスレッドプールであり、常にこのプールからスレッドが取得されていることがわかります。

シングルスレッドプール

二つ目はシングルスレッドプールです:

ExecutorService pool = Executors.newSingleThreadExecutor(); // 1つのワーカースレッド
public class CustomThreadPoolExample {
    public static void main(String[] args) {
        // ExecutorService pool = Executors.newFixedThreadPool(5);
        ExecutorService pool = Executors.newSingleThreadExecutor();

        try {
            for (int index = 0; index < 10; index++) {
                final int customerNumber = index;
                pool.execute(() -> {
                    System.out.println(Thread.currentThread().getName() + "\t" + "サービスを処理中 - 顧客番号: " + customerNumber);
                });
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            pool.shutdown();
        }
    }
}

すべてのタスクが同じスレッドで処理されていることが確認できます。

キャッシュ型スレッドプール

三つ目のスレッドプール:ExecutorService pool = Executors.newCachedThreadPool();

ExecutorService pool = Executors.newCachedThreadPool(); // 動的にスレッド数を調整可能

コード例:

public class CustomThreadPoolExample {
    public static void main(String[] args) {
        // ExecutorService pool = Executors.newFixedThreadPool(5);
        // ExecutorService pool = Executors.newSingleThreadExecutor();
        ExecutorService pool = Executors.newCachedThreadPool();

        try {
            for (int index = 0; index < 10; index++) {
                final int customerNumber = index;
                pool.execute(() -> {
                    try {
                        Thread.sleep(300);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println(Thread.currentThread().getName() + "\t" + "サービスを処理中 - 顧客番号: " + customerNumber);
                });
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            pool.shutdown();
        }
    }
}

効果を見てみましょう:

上記コードではスレッドプールから取得したスレッドに対して0.3秒の待機時間を設定していますが、多数の異なるスレッドが生成されていることが確認できます。

待機時間が短すぎるため、処理要求の頻度が高くなり、多くのスレッドが作成されたのです。

ヒント:スリープ時間を調整することでプール内のスレッド数を制御できます。スリープ時間を長くすると、作成されるスレッド数が自然と減少し、処理要求の頻度が低下することを示しています。

このことから、リクエストが多すぎたり頻繁すぎたりする場合、キャッシュ型スレッドプールnewCachedThreadPoolを使用すると、より多くのスレッドが作成されることがわかります。

スレッドプールの内部実装

まずnewFixedThreadPool()の内部実装を見てみましょう:

次にnewSingleThreadExecutor()の内部実装:

そしてnewCachedThreadPool()の内部実装:

まとめると、実際にはThreadPoolExecutorが返され(継承図を確認してください)、異なるパラメータをコンストラクタに渡しているだけであることがわかります。また、内部ではブロッキングキューが使用されていることも分かります。 つまり、ThreadPoolExecutorを使用して直接スレッドプールを作成することもでき、Executorsは単なるユーティリティクラスであり、実際にはThreadPoolExecutorを返しています。

ThreadPoolExecutorの七つのパラメータ

さらにThreadPoolExecutorの中を見てみましょう:

このthisをクリックすると、7つのパラメータがあることが確認できます:corePoolSizemaximumPoolSizekeepAliveTimeunitworkQueuethreadFactoryhandler

以下の図はこれらの7つのパラメータの説明です:

スレッドプールの内部動作原理図

以下の図を参考にして、上記の7つのパラメータを理解しましょう: まずスレッドプールの内部動作原理図を見てみましょう:

上記の図とパラメータの説明を照らし合わせることで、maximumPoolcorePoolを含んでいることがわかります。maximumPoolは最大スレッド数を表し、corePoolは常駐スレッド数を表します。銀行に例えると、最大で5つのサービス窓口がありますが、通常使用するのは2つだけです。

待機領域はブロッキングキュー(BlockingQueue)に相当し、ブロッキングキューが満杯になると、handlerの拒否戦略が適用されます。これは銀行の入り口に「これ以上の業務は受け付けません」と書かれた看板があるようなものです。

顧客の業務がほぼ終了したとき、(corePoolに基づいて拡張された)余分な窓口はkeepAliveTime後に閉鎖され、元のcorePool個のサービス窓口に戻ります。

スレッドプールの作業フローのまとめ:

まずスレッドプールがタスクを受け取ると、コアスレッド数が満杯かどうかを確認します。もしcorePoolが満杯でなければ、コア(常駐)スレッドが処理を行います。

常駐スレッドが満杯であれば、ブロッキングキューに格納します。ブロッキングキューが満杯でなければ、これらのタスクをキューに配置します。

ブロッキングキューも満杯であれば、最大スレッド数までスレッド数を拡張します。

最大スレッド数も満杯であれば、拒否戦略が適用されます。

これがスレッドプールの四つのステップです。受付、キューへの格納、スレッドの拡張、拒否戦略!

以下のフロー図も参考にしてください:

実際の開発での適切なスレッドプールの使用方法

なぜExecutorsユーティリティクラスを使用してスレッドプールを作成しないことを推奨するのか?

例として、以前説明したnewSingleThreadExecutor()およびExecutors.newCachedThreadPool()で作成されたスレッドプールの内部実装を見てみましょう:

内部実装で見られるように:

Executorsを使用して作成する場合、デフォルトのInteger.MAX_VALUEのサイズは21億であり、メモリを極端に消費します。スレッドプールは決して遅れることなく、メモリを圧迫してしまいます。

以下の内部実装も参照してください:

     // Single用
    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }
    
    // LinkedBlockQueueをクリック
        public LinkedBlockingQueue() {
        this(Integer.MAX_VALUE);
    }

   // Cached用
    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

newSingleThreadExecutor()の場合、LinkedBlockQueueの長さはInteger.MAX_VALUEです。 newCachedThreadPool()の場合、maximumPoolの値がInteger.MAX_VALUEになっています! 両方ともOOM例外を引き起こす可能性があります!

カスタムスレッドプール

public class CustomThreadPoolExample {
    public static void main(String[] args) {
        // カスタムスレッドプール
        ThreadPoolExecutor pool = new ThreadPoolExecutor(2,
                5,
                2L,
                TimeUnit.SECONDS,
                new LinkedBlockingQueue<>(3), // 指定しない場合はデフォルトでInteger.MAX_VALUE
                Executors.defaultThreadFactory(),
                new ThreadPoolExecutor.AbortPolicy());// デフォルトの拒否戦略

        try {
            // 10人の顧客をシミュレートし、5つのサービス窓口のみが存在
            for (int index = 0; index < 9; index++) {
                pool.execute(() -> {
                    System.out.println(Thread.currentThread().getName() + "\t" + "サービスを処理中");
                });
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            pool.shutdown();
        }
    }

poolはカスタムスレッドプールであり、前述のパラメータを理解している必要があります。

このスレッドプールがサポートできる最大同時実行数はmaximumPool+Queueのサイズ、つまり5+3=8です。このサイズを超えるとjava.util.concurrent.RejectedExecutionExceptionという拒否実行例外が発生します。

スレッドプールの四つの拒否戦略

次にスレッドプールの四つの拒否戦略を見てみましょう。上記はJDKのデフォルト拒否戦略です:

他の三つの戦略の実行結果を見てみましょう

上記コードの拒否戦略を二番目のnew ThreadPoolExecutor.CallerRunsPolicy()に変更すると、呼び出し元に制御が戻り、ここではmainスレッドになります。

三番目のnew ThreadPoolExecutor.DiscardOldestPolicy():エラーは発生しません。

四番目のnew ThreadPoolExecutor.DiscardPolicy():同様にエラーは発生しません。

これらの戦略はすべてRejectedExecutionHandlerインターフェースを継承しています。

maximumPoolSizeを適切に設定する方法

最後にmaximumPoolSizeを適切に設定する方法について少し触れます。

IOバウンドとCPUバウンドの理解:(チューニング)

  1. CPUバウンド型では、通常CPUコア数+1に設定することでCPUの効率を最大限に保つことができます!
System.out.println(Runtime.getRuntime().availableProcessors()); // CPUコア数を取得、8コア
  1. IOバウンド型では、プログラム内でIO処理が重いスレッドを判断します

タグ: Java thread-pool concurrent-programming executor-framework Multithreading

6月25日 19:00 投稿