ReentrantLock の内部実装と AQS に基づくロック機構の詳細

Java の ReentrantLock は、AbstractQueuedSynchronizer(AQS)を基盤として実装されており、排他ロック(排他モード)を提供する同期ツールである。AQS では、スレッドの待ち状態を管理するために双方向連結リストが使用される。そのノード構造は以下の通り:

static final class Node {
    volatile int waitStatus;
    volatile Node prev;
    volatile Node next;
    volatile Thread thread;
    Node nextWaiter;
}

各フィールドの役割は次のとおり:

  • waitStatus:ノードの待機状態を示す。主な値は以下。
    • 1 (CANCELLED):スレッドがキャンセルされた
    • -1 (SIGNAL):後続ノードが unpark を必要としている
    • -2 (CONDITION):Condition キューで待機中
    • -3 (PROPAGATE):共有モードでの伝播を示す
    • 0:通常の同期キュー内での待機状態
  • prev / next:双方向リンク
  • thread:このノードに関連付けられたスレッド
  • nextWaiter:Condition キューにおける次のノード(または排他モードを示すマーカー)

非公平ロックの取得処理

ReentrantLock のデフォルトコンストラクタは非公平ロックを生成する:

public ReentrantLock() {
    sync = new NonfairSync();
}

NonfairSync は AQS のサブクラスであり、lock() メソッドは以下のように実装される:

final void lock() {
    if (compareAndSetState(0, 1))
        setExclusiveOwnerThread(Thread.currentThread());
    else
        acquire(1);
}

まず CAS(Compare-And-Swap)で状態値(state)を 0 から 1 に更新し、成功すれば現在スレッドがロックを獲得する。失敗した場合は acquire(1) を呼び出す。

acquire は AQS のファイナルメソッドで、以下のように動作する:

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

tryAcquire はサブクラスで実装され、非公平ロックでは nonfairTryAcquire が呼ばれる:

final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        if (compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    } else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0)
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

ロック未取得時は再試行、既に所有している場合は再入可能(reentrant)としてカウントを増加させる。

ロック取得に失敗すると、スレッドは同期キューに追加される:

private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    enq(node);
    return node;
}

キュー末尾への追加が失敗した場合、enq で無限ループにより確実に追加される:

private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        if (t == null) {
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

その後、acquireQueued でロック再取得を試みる:

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null;
                failed = false;
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

shouldParkAfterFailedAcquire は前駆ノードの状態を見て、現在ノードを休止すべきかを判断する:

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)
        return true;
    if (ws > 0) {
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

必要に応じて LockSupport.park() でスレッドを休止し、中断フラグを返す:

private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this);
    return Thread.interrupted();
}

ロックの解放処理

unlock() は以下のように実装される:

public void unlock() {
    sync.release(1);
}

public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

tryRelease は排他ロックの所有者チェックと状態更新を行う:

protected final boolean tryRelease(int releases) {
    int c = getState() - releases;
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    if (c == 0) {
        free = true;
        setExclusiveOwnerThread(null);
    }
    setState(c);
    return free;
}

ロック完全解放後、後続ノードを起床させる:

private void unparkSuccessor(Node node) {
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);

    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)
        LockSupport.unpark(s.thread);
}

起床されたスレッドは再度 acquireQueued のループに入り、ロック取得を試みる。

公平ロックの違い

公平ロックでは、tryAcquire 内で hasQueuedPredecessors() を呼び出し、自分より前に待機中のスレッドがいないかを確認する:

protected final boolean tryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        if (!hasQueuedPredecessors() &&
            compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    } else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0)
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

hasQueuedPredecessors の実装:

public final boolean hasQueuedPredecessors() {
    Node t = tail;
    Node h = head;
    Node s;
    return h != t &&
        ((s = h.next) == null || s.thread != Thread.currentThread());
}

これにより、常に FIFO 順でロックが割り当てられ、新規スレッドが既存待機スレッドを飛び越えてロックを取得することがない。

ロックの解放処理は公平・非公平で共通である。

タグ: Java ReentrantLock AQS concurrency LockSupport

6月8日 16:35 投稿