生産者・消費者パターン
生産者と消費者は設計パターンの一つであり、このパターンでは両者が直接通信せず、共通のコンテナ(例:キュー)を介してやりとりを行うことによって、強結合を回避する。
生産者はデータを生成し、それをコンテナに投入するだけで、消費側の処理完了を待つ必要がない。
一方で、消費者はコンテナからデータを取り出すのみで、生産者と直接連携することはない。
このようなコンテナとしてよく使われる構造がキューである。
JUCにおけるブロッキングキューの操作メソッド
データの挿入方法(プロデューサー)
add(E) // キューにデータを追加。容量が満杯なら例外をスロー(内部的にはofferを使用)
offer(E) // キューにデータを追加。容量が満杯ならfalseを返す
offer(E, timeout, unit) // キューにデータを追加。満杯の場合、指定時間だけ待機し、それでも追加できなければfalse
put(E) // キューにデータを追加。容量が満杯ならスレッドを待機させ、空きが出るまで待つ
データの取り出し方法(コンシューマー)
remove() // キューからデータを削除。空の場合例外をスロー(内部的にはpollを使用)
poll() // キューからデータを削除。空の場合nullを返す
poll(timeout, unit) // キューからデータを削除。空の場合指定時間だけ待機し、データが入るまで待つ
take() // キューからデータを削除。空の場合スレッドを待機させ、データが入るまで待つ
ArrayBlockingQueueの基本的な使い方
ArrayBlockingQueueは配列による実装であり、初期サイズは固定で設定が必要である。
主な内部構造
ArrayBlockingQueueの重要なメンバ変数:
lock = 再入可能ロック(ReentrantLock)
count = 現在の要素数
items = 配列本体
putIndex = 要素を格納するインデックス
takeIndex = 要素を取り出すインデックス
notEmpty = 消費者用の条件変数(wait/notify相当)
notFull = 生産者用の条件変数(wait/notify相当)
offerメソッドの実装
public boolean offer(E e) {
checkNotNull(e); // nullチェック
final ReentrantLock lock = this.lock;
lock.lock(); // 排他制御
try {
if (count == items.length) // キューが満杯か?
return false;
else {
enqueue(e); // 要素を追加
return true;
}
} finally {
lock.unlock(); // 解放
}
}
private void enqueue(E x) {
final Object[] items = this.items;
items[putIndex] = x; // 指定位置に格納
if (++putIndex == items.length) // インデックスのリセット
putIndex = 0;
count++; // 要素数を増やす
notEmpty.signal(); // 消費者を起床
}
putメソッドの実装
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly(); // 中断可能なロック
try {
while (count == items.length) // キューが満杯なら待機
notFull.await();
enqueue(e); // 要素を追加
} finally {
lock.unlock();
}
}
消費者の処理実装
pollメソッドの実装
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return (count == 0) ? null : dequeue(); // 要素があれば取り出し
} finally {
lock.unlock();
}
}
private E dequeue() {
final Object[] items = this.items;
E x = (E) items[takeIndex]; // 要素取得
items[takeIndex] = null; // クリア
if (++takeIndex == items.length)
takeIndex = 0;
count--; // 要素数減少
if (itrs != null)
itrs.elementDequeued();
notFull.signal(); // 生産者を起床
return x;
}
LinkedBlockingQueueの概要
AtomicIntegerで長さを管理
主な内部構造
private final AtomicInteger count = new AtomicInteger(); // 要素数
private transient Node<E> head; // 先頭ノード
private transient Node<E> last; // 末尾ノード
private final ReentrantLock takeLock = new ReentrantLock(); // 消費用ロック
private final Condition notEmpty = takeLock.newCondition(); // 消費用条件変数
private final ReentrantLock putLock = new ReentrantLock(); // 生産用ロック
private final Condition notFull = putLock.newCondition(); // 生産用条件変数
生産者の処理実装
offerメソッド
public boolean offer(E e) {
if (e == null) throw new NullPointerException();
final AtomicInteger count = this.count;
if (count.get() == capacity) // 容量確認
return false;
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
if (count.get() < capacity) {
enqueue(node);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal(); // 生産者を起床
}
} finally {
putLock.unlock();
}
if (c == 0) // キューが空だった場合
signalNotEmpty(); // 消費者を起床
return c >= 0;
}
private void enqueue(Node<E> node) {
last = last.next = node; // 新しい要素を末尾に追加
}
private void signalNotEmpty() {
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
notEmpty.signal(); // 消費者を起床
} finally {
takeLock.unlock();
}
}
putメソッド
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
while (count.get() == capacity) {
notFull.await(); // 容量が満杯なら待機
}
enqueue(node);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
}
消費者の処理実装
pollメソッド
public E poll() {
final AtomicInteger count = this.count;
if (count.get() == 0) // 空なら即終了
return null;
E x = null;
int c = -1;
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
if (count.get() > 0) {
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal(); // まだ要素があるなら消費者を起床
}
} finally {
takeLock.unlock();
}
if (c == capacity) // 空きができたなら生産者を起床
signalNotFull();
return x;
}
private E dequeue() {
Node<E> h = head;
Node<E> first = h.next;
h.next = h; // ガベージコレクション用
head = first;
E x = first.item;
first.item = null;
return x;
}
private void signalNotFull() {
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
notFull.signal(); // 生産者を起床
} finally {
putLock.unlock();
}
}