Javaの高並行処理:Callableインターフェースによるスレッド作成手法

これまでスレッドを作成する方法として、Threadクラスの継承やRunnableインターフェースの実装があることを学んできました。

ここでは、新しいスレッド作成手段としてCallableインターフェースについて説明します。まず、RunnableインターフェースとCallableインターフェースの違いを比較しましょう。

それぞれのインターフェースを実装したクラスを定義します:

// Runnableインターフェース
class TaskRunnable implements Runnable {

    @Override
    public void run() {
      
    }
}

// Callableインターフェース
class TaskCallable implements Callable<Integer> {

    @Override
    public Integer call() throws Exception {
        System.out.println("******処理開始");
        return 1024;
    }
}

Callableインターフェースにはジェネリック型が存在し、戻り値を持つことが特徴です。これは従来の技術に対する拡張であり、戻り値によりより細かい制御が可能になります。

次に、Runnableインターフェースを使ったスレッド作成方法を見てみましょう:

// Runnableを使用
TaskRunnable task1 = new TaskRunnable();
Thread thread1 = new Thread(task1);

しかし、Callableインターフェースを使ってスレッドを作成しようとするとエラーになります。なぜでしょうか?

理由は、ThreadクラスにCallableを受け取るコンストラクタが存在しないからです。

Callableスレッドの作成方法

APIを確認すると、RunnableインターフェースのコンストラクタにはCallableインターフェースの実装クラスが必要であることがわかります。 そのため、スレッドを作成する手順は以下のようになります:

public class CallableExample {
    public static void main(String[] args) {
//        TaskCallable task = new TaskCallable();
        FutureTask futureTask = new FutureTask(new TaskCallable());
        new Thread(futureTask, "A").start();
        System.out.println(futureTask.get()); // 結果: 1024 getメソッドで戻り値を取得
    }
}

getメソッドのブロッキング特性

public class CallableExample {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
//        TaskCallable task = new TaskCallable();
        FutureTask futureTask = new FutureTask(new TaskCallable());
        new Thread(futureTask, "A").start();
        System.out.println(futureTask.get()); // 結果: 1024 getメソッドで戻り値を取得(ブロッキング)
        System.out.println(Thread.currentThread().getName() + "***計算完了");
    }
}
// Callableインターフェース
class TaskCallable implements Callable<Integer> {

    @Override
    public Integer call() throws Exception {
        System.out.println("******処理開始");
        Thread.sleep(5000);
        return 1024;
    }
}

次に、メインスレッドとFutureTaskスレッドの実行順序を入れ替えてみます:

public class CallableExample {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
//        TaskCallable task = new TaskCallable();
        FutureTask futureTask = new FutureTask(new TaskCallable());
        new Thread(futureTask, "A").start();
        System.out.println(Thread.currentThread().getName() + "***計算完了");
        System.out.println(futureTask.get()); // 結果: 1024 getメソッドで戻り値を取得(ブロッキング)
    }
}
// Callableインターフェース
class TaskCallable implements Callable<Integer> {

    @Override
    public Integer call() throws Exception {
        System.out.println("******処理開始");
        Thread.sleep(5000);
        return 1024;
    }
}

通常、futureTaskのgetメソッドはブロックされやすく、mainスレッドが完了する前にwait状態になることがあります。そのため、処理時間の無駄を避けるために、mainスレッドの終了を優先することが一般的です。

FutureTaskの一意性

新たにスレッドBを追加してみます:

public class CallableExample {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
//        TaskCallable task = new TaskCallable();
        FutureTask futureTask = new FutureTask(new TaskCallable());
        new Thread(futureTask, "A").start();
        new Thread(futureTask, "B").start();
        System.out.println(Thread.currentThread().getName() + "***計算完了");
        System.out.println(futureTask.get()); // 結果: 1024 getメソッドで戻り値を取得(ブロッキング)
    }
}

// Callableインターフェース
class TaskCallable implements Callable<Integer> {

    @Override
    public Integer call() throws Exception {
        System.out.println("******処理開始");
        Thread.sleep(5000);
        return 1024;
    }
}

この場合、実際には一度しか実行されません。なぜなら、一つのFutureTaskオブジェクトに対して複数のスレッドがアクセスしても、同じインスタンスが使用されるためです。一方で、Runnableインターフェースでは別の動作となります:

public class CallableExample {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        TaskRunnable task = new TaskRunnable();
        Thread thread1 = new Thread(task);
        Thread thread2 = new Thread(task);
        thread1.start();
        thread2.start();
    }
}

// Runnableインターフェース
class TaskRunnable implements Runnable {

    @Override
    public void run() {
        System.out.println("******処理開始");
    }
}

以上です。

タグ: Java concurrency Callable FutureTask Thread

5月19日 13:06 投稿