ブロッキングキューとは
ブロッキングキューは特殊な種類のキューで、「先入れ先出し」(FIFO)の原則に従います。これはスレッドセーフなデータ構造であり、以下の特性を持っています:
- キューが満杯の場合、追加の要素を挿入しようとするとブロックされ、他のスレッドが要素を取り出すまで待機します。
- キューが空の場合、要素を取り出そうとするとブロックされ、他のスレッドが要素を挿入するまで待機します。
生産者-消費者モデル
ブロッキングキューの典型的な応用例は「生産者-消費者モデル」です。これは非常に一般的な開発モデルです。
生産者-消費者モデルとは
生産者-消費者モデルは、コンテナ(ブロッキングキュー)を使用して生産者と消費者の強い結合問題を解決するパターンです。生産者と消費者は直接通信せず、ブロッキングキューを介して通信します。これにより、生産者はデータを生成した後、消費者が処理するのを待たずに直接キューにデータを入れ、消費者は生産者にデータを要求するのではなく、直接キューからデータを取得します。
結合と分離について
結合とは、2つ以上のモジュール間の相互依存関係のことです。ソフトウェア工学において、モジュール間の結合度が高いほど、保守コストが高くなります。そのため、システムアーキテクチャの設計プロセスでは、アプリケーションの保守性を高めるために、各モジュール間の結合度を減らす必要があります。
結合には、強い結合(密結合)と弱い結合(疎結合)があります。
強い結合
強い結合アーキテクチャは本質的にクライアント/サーバーモデルです。利点は、アーキテクチャがシンプルで、設計が簡単で、開発サイクルが短く、迅速な開発、投入、展開、アプリケーションが可能です。
しかし、クラスター規模の拡大に伴い、システムの安定性は徐々に低下します。主な理由は以下の通りです:
- 同期操作によるネットワークリソースの消費が大きい。
- サーバーがクライアントに直接公開されるため、ネットワーク攻撃を引き起こしやすい。
- プログラムコード間の関連性が高すぎ、モジュール化処理に適していない。
弱い結合(分離)
弱い結合アーキテクチャは、クライアント/サーバーモデル間にプロキシを追加し、CSモデルをCASモデルに変換するものです。この新しいアーキテクチャでは、クライアントの役割は変わらず、プロキシサーバーがクライアントとの通信とクライアントの識別判断を担当し、サーバーはプロキシサーバーの後ろに位置し、クライアントからは見えず、データ処理のみを担当します。
利点は以下の通りです:
- マルチタスク並列処理能力が大幅に向上。
- 負荷適応メカニズムの実現。
- サーバーへのネットワーク攻撃をほぼ排除。
- 非同期操作によるネットワークリソース消費と操作関連性の削減。
- システムの保守性の向上。
Java標準ライブラリのブロッキングキュー
Java標準ライブラリにはブロッキングキューが組み込まれています。プログラムでブロッキングキューを使用する必要がある場合は、標準ライブラリのものを直接使用できます。
- BlockingQueueはインターフェースです。実際の実装クラスはLinkedBlockingQueueです。
- putメソッドはブロッキング方式でのキューへの挿入に使用され、takeはブロッキング方式でのキューからの取り出しに使用されます。
- BlockingQueueにはoffer、poll、peekなどのメソッドもありますが、これらのメソッドにはブロッキング特性がありません。
BlockingQueue<String> queue = new LinkedBlockingQueue<>();
// キューに要素を挿入
queue.put("要素");
// キューから要素を取り出す。putせずにtakeするとブロックされる
String element = queue.take();
ブロッキングキューを使用した生産者-消費者モデルのシミュレーション
public class ProducerConsumerDemo {
public static void main(String[] args) throws InterruptedException {
BlockingQueue<Integer> blockingQueue = new LinkedBlockingQueue<>();
Thread consumerThread = new Thread(() -> {
while (true) {
try {
int value = blockingQueue.take();
System.out.println("消費した要素: " + value);
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "消費者");
Thread producerThread = new Thread(() -> {
Random random = new Random();
while (true) {
try {
int num = random.nextInt(1000);
System.out.println("生成した要素: " + num);
blockingQueue.put(num);
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "生産者");
consumerThread.start();
producerThread.start();
}
}
ブロッキングキューのカスタム実装
実装の考え方:
- 「循環キュー」の方法を使用して実装します。
- synchronizedを使用してロック制御を行います。
- putで要素を挿入する際、キューが満杯の場合はwaitします(注意:ループ内でwaitする必要があります。目覚めたときに必ずしもキューが空いているとは限らないため、複数のスレッドが同時に目覚める可能性があるため)。
- takeで要素を取り出す際、キューが空の場合はwaitします(これもループwaitです)。
ここでは配列を使用して実装します。
public class CustomBlockingQueue {
private int[] container = new int[1000];
private int frontIndex = 0;
private int rearIndex = 0;
private int count = 0;
// 要素をキューに挿入
public void insert(int item) throws InterruptedException {
synchronized (this) {
while (count == container.length) {
// キューが満杯の場合、ブロックする
this.wait();
}
container[rearIndex] = item;
rearIndex++;
if (rearIndex >= container.length) {
rearIndex = 0;
}
count++;
// take内のwaitを起こす
this.notify();
}
}
// キューから要素を取り出す
public int remove() throws InterruptedException {
int result = 0;
synchronized (this) {
while (count == 0) {
// キューが空の場合もブロックする
this.wait();
}
result = container[frontIndex];
frontIndex++;
if (frontIndex >= container.length) {
frontIndex = 0;
}
count--;
// insert内のwaitを起こす
this.notify();
}
return result;
}
}
テストコード
public class QueueTest {
public static void main(String[] args) {
CustomBlockingQueue queue = new CustomBlockingQueue();
Thread consumer = new Thread(() -> {
while (true) {
try {
int value = queue.remove();
System.out.println("消費: " + value);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
Thread producer = new Thread(() -> {
int counter = 0;
while (true) {
try {
System.out.println("生産: " + counter);
queue.insert(counter);
counter++;
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
consumer.start();
producer.start();
}
}