AQS(AbstractQueuedSynchronizer)の仕組みを徹底解説

AQS(AbstractQueuedSynchronizer)は、Javaにおけるロックやその他の同期コンポーネントを構築するための基盤フレームワークです。このクラスは、int型のstate変数で同期状態を管理し、内部にFIFO(先入れ先出し)の同期キューを持ちます。

AQSを利用するには、サブクラスが継承し、抽象メソッドを実装します。同期状態の操作に使う主要なメソッドは以下の通りです。

  • getState():同期状態を取得
  • setState(int newState):同期状態を設定
  • compareAndSetState(int expect, int update):CAS(Compare-And-Swap)操作で同期状態を設定し、原子性を保証

一、AQSの使い方

AQSを利用するには、サブクラスで以下の抽象メソッドを実装する必要があります。

  • protected boolean tryAcquire(int arg):排他モードで同期状態を取得。成功したらtrue、失敗したらfalseを返す。
  • protected boolean tryRelease(int arg):排他モードで同期状態を解放。待機中の他のスレッドが状態を取得できるようになる。
  • protected int tryAcquireShared(int arg):共有モードで同期状態を取得。戻り値が0以上なら成功、負なら失敗。
  • protected boolean tryReleaseShared(int arg):共有モードで同期状態を解放。成功はtrue、失敗はfalse。
  • protected boolean isHeldExclusively():現在のスレッドが排他モードで状態を保持しているかどうかを返す。

また、AQSはサブクラス向けに多数のテンプレートメソッドを提供します。これらは3種類に分類されます。

  • acquire(int arg):排他モードで同期状態を取得。成功すればそのまま戻り、失敗すれば同期キューで待機。内部でtryAcquire(int arg)を呼び出す。
  • acquireInterruptibly(int arg)acquire(int arg)と同様だが、割り込みに応答。待機中に割り込まれるとInterruptedExceptionをスロー。
  • tryAcquireNanos(int arg, long nanos):タイムアウト付きで同期状態を取得。指定時間内に取得できなければfalse。
  • acquireShared(int arg):共有モードで同期状態を取得。排他モードとは異なり、複数スレッドが同時に状態を取得できる。
  • acquireSharedInterruptibly(int arg):共有モードで取得、割り込みに応答。
  • tryAcquireSharedNanos(int arg, long nanosTimeout):共有モードでタイムアウト付き取得。
  • release(int arg):排他モードで同期状態を解放。解放後に同期キューの先頭ノードのスレッドを起床。
  • releaseShared(int arg):共有モードで同期状態を解放。
  • Collection<Thread> getQueuedThreads():同期キューで待機中のスレッドのコレクションを取得。

二、AQSの実装原理

AQSは内部的にFIFOの双方向キューに依存して同期状態を管理します。スレッドが同期状態の取得に失敗すると、現在のスレッドと待機情報をNodeオブジェクトにカプセル化し、同期キューに追加します。同時にそのスレッドをブロックし、同期状態が解放されると先頭ノードのスレッドを起床して再試行させます。

同期キューのNodeは主に以下のプロパティを持ちます:同期状態取得に失敗したスレッドの参照、待機状態、前のノード、次のノード。同期器は先頭ノードと末尾ノードを保持し、同期状態を取得できなかったスレッドはNodeとなってキューの末尾に追加されます。

同期キューはheadとtailの2つのノード参照を持ちます。スレッドがロックを取得すると、他のスレッドはNodeとして構築され、キューの末尾に追加されます。末尾ノードの追加は、CAS操作(compareAndSetTail(Node expect, Node update))でスレッドセーフに行われます。

  1. キューの先頭ノードは、現在同期ロックを取得しているスレッドです。先頭ノードがロックを解放すると、後続ノードを起床し、後続ノードはロック取得を試み、成功すれば自身を先頭ノードに設定します。
  2. 先頭ノードの設定は、ロック取得に成功したスレッドのみが行います。同時に1スレッドしか成功しないため、CAS操作は不要で、古い先頭ノードのnext参照を切るだけで済みます。

三、ロックの取得と解放

ここでNodeについて詳しく説明します。Nodeは、リソース取得を待機する各スレッドをカプセル化したもので、同期が必要なスレッド自体とその待機状態(ブロック中、起床待機中、キャンセル済みなど)を含みます。変数waitStatusはNodeの待機状態を示し、以下の5つの値を取ります。

  • CANCELLED(1):現在のNodeがスケジュールをキャンセルされた状態。タイムアウトや割り込み(割り込み応答ありの場合)でこの状態になり、以降は変化しない。
  • SIGNAL(-1):後続のNodeが現在のNodeによる起床を待っている状態。後続Nodeがキューに追加される際、前のNodeの状態をSIGNALに更新する。
  • CONDITION(-2):NodeがCondition上で待機している状態。他のスレッドがCondition.signal()を呼ぶと、この状態のNodeは待機キューから同期キューに移動する。
  • PROPAGATE(-3):共有モードで、前のNodeが後続Nodeだけでなく、さらにその先のNodeも起床する可能性があることを示す。
  • 0:新しいNodeがキューに追加されたときのデフォルト状態。

負の値は有効な待機状態、正の値はキャンセル済みを表します。そのため、ソースコードでは>0や<0で状態を判定することがよくあります。

3.1 排他モードでの同期状態取得・解放(ソースコード解説)

排他モードでの同期状態取得はacquire(int arg)メソッドで行います。

1 public final void acquire(int arg) {
2     if (!tryAcquire(arg) &&
3         acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
4         selfInterrupt();
5 }

最初にサブクラスで実装したtryAcquireでロック取得を試み、成功すれば終了。失敗するとaddWaiterでNodeを作成し、キュー末尾に追加します。

1 private Node addWaiter(Node mode) {
2     Node node = new Node(Thread.currentThread(), mode);
3     Node pred = tail;
4     if (pred != null) {
5         node.prev = pred;
6         if (compareAndSetTail(pred, node)) {
7             pred.next = node;
8             return node;
9         }
10     }
11     enq(node);
12     return node;
13 }
1 private Node enq(final Node node) {
2     for (;;) {
3         Node t = tail;
4         if (t == null) {
5             if (compareAndSetHead(new Node()))
6                 tail = head;
7         } else {
8             node.prev = t;
9             if (compareAndSetTail(t, node)) {
10                 t.next = node;
11                 return t;
12             }
13         }
14     }
15 }

Nodeを作成して末尾に追加した後、acquireQueuedでそのNodeがスピン(ループ)しながら同期状態の取得を試み、取得できなければブロックされます。ブロックされたスレッドは、前のNodeがキューから出るか、割り込みが発生することで起床します。

1 final boolean acquireQueued(final Node node, int arg) {
2     boolean failed = true;
3     try {
4         boolean interrupted = false;
5         for (;;) {
6             final Node p = node.predecessor();
7             if (p == head && tryAcquire(arg)) {
8                 setHead(node);
9                 p.next = null;
10                 failed = false;
11                 return interrupted;
12             }
13             if (shouldParkAfterFailedAcquire(p, node) &&
14                 parkAndCheckInterrupt())
15                 interrupted = true;
16         }
17     } finally {
18         if (failed)
19             cancelAcquire(node);
20     }
21 }

7行目の条件は、現在のNodeの前のNodeが先頭ノードである場合のみ、ロック取得を試みることを意味します。これによりFIFOが保証されます。ロック獲得に成功するとacquireからリターンします。

ロック獲得後に処理が終われば、ロックを解放して後続Nodeを起床します。解放はreleaseメソッドで行います。

1 public final boolean release(int arg) {
2     if (tryRelease(arg)) {
3         Node h = head;
4         if (h != null && h.waitStatus != 0)
5             unparkSuccessor(h);
6         return true;
7     }
8     return false;
9 }

3.2 共有モードでの同期状態取得・解放(ソースコード解説)

共有モードでの取得はacquireSharedで行います。まずtryAcquireSharedを呼び、戻り値が0以上なら成功。負ならdoAcquireSharedを呼びます。

1 public final void acquireShared(int arg) {
2     if (tryAcquireShared(arg) < 0)
3         doAcquireShared(arg);
4 }
1 private void doAcquireShared(int arg) {
2     final Node node = addWaiter(Node.SHARED);
3     boolean failed = true;
4     try {
5         boolean interrupted = false;
6         for (;;) {
7             final Node p = node.predecessor();
8             if (p == head) {
9                 int r = tryAcquireShared(arg);
10                 if (r >= 0) {
11                     setHeadAndPropagate(node, r);
12                     p.next = null;
13                     if (interrupted)
14                         selfInterrupt();
15                     failed = false;
16                     return;
17                 }
18             }
19             if (shouldParkAfterFailedAcquire(p, node) &&
20                 parkAndCheckInterrupt())
21                 interrupted = true;
22         }
23     } finally {
24         if (failed)
25             cancelAcquire(node);
26     }
27 }

共有モードと排他モードの流れは似ています。どちらもサブクラスのメソッドを呼び、成功すればそのまま戻り、失敗すればスピンしながら前のNodeが先頭ノードになるのを待って再試行します。

共有モードの解放はreleaseSharedで行います。

1 public final boolean releaseShared(int arg) {
2     if (tryReleaseShared(arg)) {
3         doReleaseShared();
4         return true;
5     }
6     return false;
7 }
1 private void doReleaseShared() {
2     for (;;) {
3         Node h = head;
4         if (h != null && h != tail) {
5             int ws = h.waitStatus;
6             if (ws == Node.SIGNAL) {
7                 if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
8                     continue;
9                 unparkSuccessor(h);
10             } else if (ws == 0 &&
11                        !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
12                 continue;
13         }
14         if (h == head)
15             break;
16     }
17 }

共有モードと排他モードの主な違いは、同時に複数スレッドがロックを取得できるかどうかです。例えばファイルの書き込みは排他、読み取りは共有で扱うことができます。

四、AQSの待機・通知メカニズム

AQS内部には同期ブロッキングキューだけでなく、複数のConditionオブジェクトも存在します。各Conditionは実際には双方向キューであり、待機・通知メカニズムを実現します。任意のJavaオブジェクトには同期キューとブロッキングキューがあり、wait()を呼ぶと待機キューに入ります。Conditionはこの待機キューを実装したものです。

Condition待機キュー内のスレッドはロックを競合せず、他のスレッドからの起床を待ちます。起床されると、そのスレッドは同期キューの末尾に移動し、ロックを競合し始めます。

Javaのwait()synchronizedブロック内でのみ呼び出せますが、Conditionのawait()も同様に、Lockを取得したスレッドのみが呼び出せます。

Conditionの主なメソッドはawait()signal()signalAll()で、Objectのwait()notify()notifyAll()に対応します。

Conditionの動作フローは以下の通りです。

  1. Lock.newCondition()でConditionオブジェクト(実際はAQSの内部クラスConditionObject)を作成。
  2. スレッドがLock.lock()でロックを取得後、Condition.await()を呼ぶと待機状態に入りロックを解放。内部でLockSupport.park()によりスレッドが待機。
  3. 待機中のスレッドはNodeとしてConditionの待機キュー末尾に追加。この操作はCAS不要(ロック取得スレッドのみが操作するため)。
  4. 他のスレッドがsignal()signalAll()を呼ぶと、待機キューの先頭Nodeを起床。実際にはそのNodeをCASで同期キューの末尾に移動。
  5. 各Lockオブジェクトは複数のConditionを持てるため、Nodeが待機キューから同期キューへ移動する際、CASでスレッドセーフを確保。
  6. 同期キューに入った後、LockSupport.unpark()でスレッドを起床。
  7. signalAll()は待機キュー内の全Nodeを順次起床。

五、まとめ

AQSは抽象同期キューであり、テンプレートメソッドパターンを用いて同期コンポーネントの基本実装を提供します。サブクラスはAQSを実装して具体的な同期コンポーネントを作成できます。

AQS内部はvolatile int stateとFIFOブロッキングキューで構成されます。主な動作は、CASでstateの値を操作して同期状態を取得しようとすることです。成功すればそのまま、失敗すればスレッドがNodeとして同期キューの末尾に追加されます。

末尾への追加はCASが必要です。なぜなら、ロック獲得に失敗したスレッドやCondition待機キューから起床したスレッドが同時に追加される可能性があるからです。

同期キューの先頭ノードは現在同期状態を取得しているNodeです。先頭ノードのnextノードは、継続的にCASでロック取得を試みます。他のノードは自分の前のノードがheadかどうかのみをチェックします。headの次のノードだけがロック取得を試みることで、FIFOが保証されます。

AQSは排他モードと共有モードの取得・解放メソッドを提供します。それぞれの流れを以下にまとめます。

5.1 排他モードでの同期状態取得フロー

  1. acquireを呼び、tryAcquireを実行。成功すれば戻る。
  2. tryAcquireが失敗した場合、排他モードのNodeを作成し、スピンとCASで同期キューの末尾に追加。
  3. キューが空の場合、新しいNodeの前に空のheadノードを初期化。
  4. キューが空でない場合、新しいNodeが末尾に追加され、新しいtailとなる。
  5. スピン内で、自分の前のノードがheadかどうかを繰り返し確認。
  6. 前のノードがheadなら、tryAcquireを再試行。失敗すればスピン継続。
  7. tryAcquireが成功すると、現在のNodeをheadに設定。古いheadのnextはnullに(CAS不要)。
  8. 取得成功を返す。

補足:headは現在ロックを保持するNodeを指しますが、キュー初期化時は最初のNodeとは別に新規Nodeオブジェクトがheadとなります。headのnextノードのみがスピンでtryAcquireを繰り返し、他のノードは前のノードがheadかどうかのみをチェックします。スピンによるCPU消費を避けるため、取得失敗時はLockSupport.park()でスレッドを待機させ、headノードが起床します。

5.2 排他モードでの同期状態解放フロー

  1. releaseを呼び、tryReleaseを実行。失敗すればfalseを返す。
  2. 成功した場合、unparkSuccessorで次のノードを起床。
  3. まず現在のheadの状態を初期化し、LockSupport.unpark()でnextノードを起床。

補足:排他モードの解放は単純で、nextノードを起床するだけです。nextノードが待機状態でなければ何もせず、そのノード自身のスピンでロックを取得します。

5.3 共有モードでの同期状態取得フロー

排他モードとほぼ同じですが、取得成功の条件が異なります。排他モードではtryAcquireの戻り値がtrueかfalseかで判断するのに対し、共有モードではtryAcquireSharedの戻り値が0以上かどうかで判断します。0以上なら成功、負なら失敗です。それ以外の流れは同一です。

5.4 共有モードでの同期状態解放フロー

  1. releaseSharedを呼び、tryReleaseSharedを実行。失敗すればfalseを返す。
  2. 成功した場合、doReleaseSharedを呼び、後続ノードを起床。
  3. 起床処理はスピンとCASを繰り返す。共有モードでは複数スレッドが同時に解放を行う可能性があるため、CASでスレッドセーフを確保する。

タグ: Java AbstractQueuedSynchronizer AQS 並行処理 ロック

6月17日 16:32 投稿