ブロッキングキューの仕組みと実装

生産者・消費者パターン

生産者と消費者は設計パターンの一つであり、このパターンでは両者が直接通信せず、共通のコンテナ(例:キュー)を介してやりとりを行うことによって、強結合を回避する。

生産者はデータを生成し、それをコンテナに投入するだけで、消費側の処理完了を待つ必要がない。

一方で、消費者はコンテナからデータを取り出すのみで、生産者と直接連携することはない。

このようなコンテナとしてよく使われる構造がキューである。

JUCにおけるブロッキングキューの操作メソッド

データの挿入方法(プロデューサー)

add(E)         // キューにデータを追加。容量が満杯なら例外をスロー(内部的にはofferを使用)
offer(E)       // キューにデータを追加。容量が満杯ならfalseを返す
offer(E, timeout, unit)  // キューにデータを追加。満杯の場合、指定時間だけ待機し、それでも追加できなければfalse
put(E)         // キューにデータを追加。容量が満杯ならスレッドを待機させ、空きが出るまで待つ

データの取り出し方法(コンシューマー)

remove()       // キューからデータを削除。空の場合例外をスロー(内部的にはpollを使用)
poll()         // キューからデータを削除。空の場合nullを返す
poll(timeout, unit)  // キューからデータを削除。空の場合指定時間だけ待機し、データが入るまで待つ
take()         // キューからデータを削除。空の場合スレッドを待機させ、データが入るまで待つ

ArrayBlockingQueueの基本的な使い方

ArrayBlockingQueueは配列による実装であり、初期サイズは固定で設定が必要である。

主な内部構造

ArrayBlockingQueueの重要なメンバ変数:

lock = 再入可能ロック(ReentrantLock)
count = 現在の要素数
items = 配列本体
putIndex = 要素を格納するインデックス
takeIndex = 要素を取り出すインデックス
notEmpty = 消費者用の条件変数(wait/notify相当)
notFull = 生産者用の条件変数(wait/notify相当)

offerメソッドの実装

public boolean offer(E e) {
    checkNotNull(e);  // nullチェック
    final ReentrantLock lock = this.lock;
    lock.lock();  // 排他制御
    try {
        if (count == items.length)  // キューが満杯か?
            return false;
        else {
            enqueue(e);  // 要素を追加
            return true;
        }
    } finally {
        lock.unlock();  // 解放
    }
}

private void enqueue(E x) {
    final Object[] items = this.items;
    items[putIndex] = x;  // 指定位置に格納
    if (++putIndex == items.length)  // インデックスのリセット
        putIndex = 0;
    count++;  // 要素数を増やす
    notEmpty.signal();  // 消費者を起床
}

putメソッドの実装

public void put(E e) throws InterruptedException {
    checkNotNull(e);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();  // 中断可能なロック
    try {
        while (count == items.length)  // キューが満杯なら待機
            notFull.await();
        enqueue(e);  // 要素を追加
    } finally {
        lock.unlock();
    }
}

消費者の処理実装

pollメソッドの実装

public E poll() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        return (count == 0) ? null : dequeue();  // 要素があれば取り出し
    } finally {
        lock.unlock();
    }
}

private E dequeue() {
    final Object[] items = this.items;
    E x = (E) items[takeIndex];  // 要素取得
    items[takeIndex] = null;  // クリア
    if (++takeIndex == items.length)
        takeIndex = 0;
    count--;  // 要素数減少
    if (itrs != null)
        itrs.elementDequeued();
    notFull.signal();  // 生産者を起床
    return x;
}

LinkedBlockingQueueの概要

AtomicIntegerで長さを管理

主な内部構造

private final AtomicInteger count = new AtomicInteger();  // 要素数
private transient Node<E> head;  // 先頭ノード
private transient Node<E> last;  // 末尾ノード
private final ReentrantLock takeLock = new ReentrantLock();  // 消費用ロック
private final Condition notEmpty = takeLock.newCondition();  // 消費用条件変数
private final ReentrantLock putLock = new ReentrantLock();  // 生産用ロック
private final Condition notFull = putLock.newCondition();  // 生産用条件変数

生産者の処理実装

offerメソッド

public boolean offer(E e) {
    if (e == null) throw new NullPointerException();
    final AtomicInteger count = this.count;
    if (count.get() == capacity)  // 容量確認
        return false;
    int c = -1;
    Node<E> node = new Node<E>(e);
    final ReentrantLock putLock = this.putLock;
    putLock.lock();
    try {
        if (count.get() < capacity) {
            enqueue(node);
            c = count.getAndIncrement();
            if (c + 1 < capacity)
                notFull.signal();  // 生産者を起床
        }
    } finally {
        putLock.unlock();
    }
    if (c == 0)  // キューが空だった場合
        signalNotEmpty();  // 消費者を起床
    return c >= 0;
}

private void enqueue(Node<E> node) {
    last = last.next = node;  // 新しい要素を末尾に追加
}

private void signalNotEmpty() {
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lock();
    try {
        notEmpty.signal();  // 消費者を起床
    } finally {
        takeLock.unlock();
    }
}

putメソッド

public void put(E e) throws InterruptedException {
    if (e == null) throw new NullPointerException();
    int c = -1;
    Node<E> node = new Node<E>(e);
    final ReentrantLock putLock = this.putLock;
    final AtomicInteger count = this.count;
    putLock.lockInterruptibly();
    try {
        while (count.get() == capacity) {
            notFull.await();  // 容量が満杯なら待機
        }
        enqueue(node);
        c = count.getAndIncrement();
        if (c + 1 < capacity)
            notFull.signal();
    } finally {
        putLock.unlock();
    }
    if (c == 0)
        signalNotEmpty();
}

消費者の処理実装

pollメソッド

public E poll() {
    final AtomicInteger count = this.count;
    if (count.get() == 0)  // 空なら即終了
        return null;
    E x = null;
    int c = -1;
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lock();
    try {
        if (count.get() > 0) {
            x = dequeue();
            c = count.getAndDecrement();
            if (c > 1)
                notEmpty.signal();  // まだ要素があるなら消費者を起床
        }
    } finally {
        takeLock.unlock();
    }
    if (c == capacity)  // 空きができたなら生産者を起床
        signalNotFull();
    return x;
}

private E dequeue() {
    Node<E> h = head;
    Node<E> first = h.next;
    h.next = h;  // ガベージコレクション用
    head = first;
    E x = first.item;
    first.item = null;
    return x;
}

private void signalNotFull() {
    final ReentrantLock putLock = this.putLock;
    putLock.lock();
    try {
        notFull.signal();  // 生産者を起床
    } finally {
        putLock.unlock();
    }
}

タグ: Java BlockingQueue ConcurrentHashMap ReentrantLock AtomicInteger

6月13日 17:23 投稿