AQS(AbstractQueuedSynchronizer)は、Javaにおけるロックやその他の同期コンポーネントを構築するための基盤フレームワークです。このクラスは、int型のstate変数で同期状態を管理し、内部にFIFO(先入れ先出し)の同期キューを持ちます。
AQSを利用するには、サブクラスが継承し、抽象メソッドを実装します。同期状態の操作に使う主要なメソッドは以下の通りです。
getState():同期状態を取得setState(int newState):同期状態を設定compareAndSetState(int expect, int update):CAS(Compare-And-Swap)操作で同期状態を設定し、原子性を保証
一、AQSの使い方
AQSを利用するには、サブクラスで以下の抽象メソッドを実装する必要があります。
protected boolean tryAcquire(int arg):排他モードで同期状態を取得。成功したらtrue、失敗したらfalseを返す。protected boolean tryRelease(int arg):排他モードで同期状態を解放。待機中の他のスレッドが状態を取得できるようになる。protected int tryAcquireShared(int arg):共有モードで同期状態を取得。戻り値が0以上なら成功、負なら失敗。protected boolean tryReleaseShared(int arg):共有モードで同期状態を解放。成功はtrue、失敗はfalse。protected boolean isHeldExclusively():現在のスレッドが排他モードで状態を保持しているかどうかを返す。
また、AQSはサブクラス向けに多数のテンプレートメソッドを提供します。これらは3種類に分類されます。
acquire(int arg):排他モードで同期状態を取得。成功すればそのまま戻り、失敗すれば同期キューで待機。内部でtryAcquire(int arg)を呼び出す。acquireInterruptibly(int arg):acquire(int arg)と同様だが、割り込みに応答。待機中に割り込まれるとInterruptedExceptionをスロー。tryAcquireNanos(int arg, long nanos):タイムアウト付きで同期状態を取得。指定時間内に取得できなければfalse。acquireShared(int arg):共有モードで同期状態を取得。排他モードとは異なり、複数スレッドが同時に状態を取得できる。acquireSharedInterruptibly(int arg):共有モードで取得、割り込みに応答。tryAcquireSharedNanos(int arg, long nanosTimeout):共有モードでタイムアウト付き取得。release(int arg):排他モードで同期状態を解放。解放後に同期キューの先頭ノードのスレッドを起床。releaseShared(int arg):共有モードで同期状態を解放。Collection<Thread> getQueuedThreads():同期キューで待機中のスレッドのコレクションを取得。
二、AQSの実装原理
AQSは内部的にFIFOの双方向キューに依存して同期状態を管理します。スレッドが同期状態の取得に失敗すると、現在のスレッドと待機情報をNodeオブジェクトにカプセル化し、同期キューに追加します。同時にそのスレッドをブロックし、同期状態が解放されると先頭ノードのスレッドを起床して再試行させます。
同期キューのNodeは主に以下のプロパティを持ちます:同期状態取得に失敗したスレッドの参照、待機状態、前のノード、次のノード。同期器は先頭ノードと末尾ノードを保持し、同期状態を取得できなかったスレッドはNodeとなってキューの末尾に追加されます。
同期キューはheadとtailの2つのノード参照を持ちます。スレッドがロックを取得すると、他のスレッドはNodeとして構築され、キューの末尾に追加されます。末尾ノードの追加は、CAS操作(compareAndSetTail(Node expect, Node update))でスレッドセーフに行われます。
- キューの先頭ノードは、現在同期ロックを取得しているスレッドです。先頭ノードがロックを解放すると、後続ノードを起床し、後続ノードはロック取得を試み、成功すれば自身を先頭ノードに設定します。
- 先頭ノードの設定は、ロック取得に成功したスレッドのみが行います。同時に1スレッドしか成功しないため、CAS操作は不要で、古い先頭ノードのnext参照を切るだけで済みます。
三、ロックの取得と解放
ここでNodeについて詳しく説明します。Nodeは、リソース取得を待機する各スレッドをカプセル化したもので、同期が必要なスレッド自体とその待機状態(ブロック中、起床待機中、キャンセル済みなど)を含みます。変数waitStatusはNodeの待機状態を示し、以下の5つの値を取ります。
- CANCELLED(1):現在のNodeがスケジュールをキャンセルされた状態。タイムアウトや割り込み(割り込み応答ありの場合)でこの状態になり、以降は変化しない。
- SIGNAL(-1):後続のNodeが現在のNodeによる起床を待っている状態。後続Nodeがキューに追加される際、前のNodeの状態をSIGNALに更新する。
- CONDITION(-2):NodeがCondition上で待機している状態。他のスレッドが
Condition.signal()を呼ぶと、この状態のNodeは待機キューから同期キューに移動する。 - PROPAGATE(-3):共有モードで、前のNodeが後続Nodeだけでなく、さらにその先のNodeも起床する可能性があることを示す。
- 0:新しいNodeがキューに追加されたときのデフォルト状態。
負の値は有効な待機状態、正の値はキャンセル済みを表します。そのため、ソースコードでは>0や<0で状態を判定することがよくあります。
3.1 排他モードでの同期状態取得・解放(ソースコード解説)
排他モードでの同期状態取得はacquire(int arg)メソッドで行います。
1 public final void acquire(int arg) {
2 if (!tryAcquire(arg) &&
3 acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
4 selfInterrupt();
5 }
最初にサブクラスで実装したtryAcquireでロック取得を試み、成功すれば終了。失敗するとaddWaiterでNodeを作成し、キュー末尾に追加します。
1 private Node addWaiter(Node mode) {
2 Node node = new Node(Thread.currentThread(), mode);
3 Node pred = tail;
4 if (pred != null) {
5 node.prev = pred;
6 if (compareAndSetTail(pred, node)) {
7 pred.next = node;
8 return node;
9 }
10 }
11 enq(node);
12 return node;
13 }
1 private Node enq(final Node node) {
2 for (;;) {
3 Node t = tail;
4 if (t == null) {
5 if (compareAndSetHead(new Node()))
6 tail = head;
7 } else {
8 node.prev = t;
9 if (compareAndSetTail(t, node)) {
10 t.next = node;
11 return t;
12 }
13 }
14 }
15 }
Nodeを作成して末尾に追加した後、acquireQueuedでそのNodeがスピン(ループ)しながら同期状態の取得を試み、取得できなければブロックされます。ブロックされたスレッドは、前のNodeがキューから出るか、割り込みが発生することで起床します。
1 final boolean acquireQueued(final Node node, int arg) {
2 boolean failed = true;
3 try {
4 boolean interrupted = false;
5 for (;;) {
6 final Node p = node.predecessor();
7 if (p == head && tryAcquire(arg)) {
8 setHead(node);
9 p.next = null;
10 failed = false;
11 return interrupted;
12 }
13 if (shouldParkAfterFailedAcquire(p, node) &&
14 parkAndCheckInterrupt())
15 interrupted = true;
16 }
17 } finally {
18 if (failed)
19 cancelAcquire(node);
20 }
21 }
7行目の条件は、現在のNodeの前のNodeが先頭ノードである場合のみ、ロック取得を試みることを意味します。これによりFIFOが保証されます。ロック獲得に成功するとacquireからリターンします。
ロック獲得後に処理が終われば、ロックを解放して後続Nodeを起床します。解放はreleaseメソッドで行います。
1 public final boolean release(int arg) {
2 if (tryRelease(arg)) {
3 Node h = head;
4 if (h != null && h.waitStatus != 0)
5 unparkSuccessor(h);
6 return true;
7 }
8 return false;
9 }
3.2 共有モードでの同期状態取得・解放(ソースコード解説)
共有モードでの取得はacquireSharedで行います。まずtryAcquireSharedを呼び、戻り値が0以上なら成功。負ならdoAcquireSharedを呼びます。
1 public final void acquireShared(int arg) {
2 if (tryAcquireShared(arg) < 0)
3 doAcquireShared(arg);
4 }
1 private void doAcquireShared(int arg) {
2 final Node node = addWaiter(Node.SHARED);
3 boolean failed = true;
4 try {
5 boolean interrupted = false;
6 for (;;) {
7 final Node p = node.predecessor();
8 if (p == head) {
9 int r = tryAcquireShared(arg);
10 if (r >= 0) {
11 setHeadAndPropagate(node, r);
12 p.next = null;
13 if (interrupted)
14 selfInterrupt();
15 failed = false;
16 return;
17 }
18 }
19 if (shouldParkAfterFailedAcquire(p, node) &&
20 parkAndCheckInterrupt())
21 interrupted = true;
22 }
23 } finally {
24 if (failed)
25 cancelAcquire(node);
26 }
27 }
共有モードと排他モードの流れは似ています。どちらもサブクラスのメソッドを呼び、成功すればそのまま戻り、失敗すればスピンしながら前のNodeが先頭ノードになるのを待って再試行します。
共有モードの解放はreleaseSharedで行います。
1 public final boolean releaseShared(int arg) {
2 if (tryReleaseShared(arg)) {
3 doReleaseShared();
4 return true;
5 }
6 return false;
7 }
1 private void doReleaseShared() {
2 for (;;) {
3 Node h = head;
4 if (h != null && h != tail) {
5 int ws = h.waitStatus;
6 if (ws == Node.SIGNAL) {
7 if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
8 continue;
9 unparkSuccessor(h);
10 } else if (ws == 0 &&
11 !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
12 continue;
13 }
14 if (h == head)
15 break;
16 }
17 }
共有モードと排他モードの主な違いは、同時に複数スレッドがロックを取得できるかどうかです。例えばファイルの書き込みは排他、読み取りは共有で扱うことができます。
四、AQSの待機・通知メカニズム
AQS内部には同期ブロッキングキューだけでなく、複数のConditionオブジェクトも存在します。各Conditionは実際には双方向キューであり、待機・通知メカニズムを実現します。任意のJavaオブジェクトには同期キューとブロッキングキューがあり、wait()を呼ぶと待機キューに入ります。Conditionはこの待機キューを実装したものです。
Condition待機キュー内のスレッドはロックを競合せず、他のスレッドからの起床を待ちます。起床されると、そのスレッドは同期キューの末尾に移動し、ロックを競合し始めます。
Javaのwait()はsynchronizedブロック内でのみ呼び出せますが、Conditionのawait()も同様に、Lockを取得したスレッドのみが呼び出せます。
Conditionの主なメソッドはawait()、signal()、signalAll()で、Objectのwait()、notify()、notifyAll()に対応します。
Conditionの動作フローは以下の通りです。
Lock.newCondition()でConditionオブジェクト(実際はAQSの内部クラスConditionObject)を作成。- スレッドが
Lock.lock()でロックを取得後、Condition.await()を呼ぶと待機状態に入りロックを解放。内部でLockSupport.park()によりスレッドが待機。 - 待機中のスレッドはNodeとしてConditionの待機キュー末尾に追加。この操作はCAS不要(ロック取得スレッドのみが操作するため)。
- 他のスレッドが
signal()やsignalAll()を呼ぶと、待機キューの先頭Nodeを起床。実際にはそのNodeをCASで同期キューの末尾に移動。 - 各Lockオブジェクトは複数のConditionを持てるため、Nodeが待機キューから同期キューへ移動する際、CASでスレッドセーフを確保。
- 同期キューに入った後、
LockSupport.unpark()でスレッドを起床。 signalAll()は待機キュー内の全Nodeを順次起床。
五、まとめ
AQSは抽象同期キューであり、テンプレートメソッドパターンを用いて同期コンポーネントの基本実装を提供します。サブクラスはAQSを実装して具体的な同期コンポーネントを作成できます。
AQS内部はvolatile int stateとFIFOブロッキングキューで構成されます。主な動作は、CASでstateの値を操作して同期状態を取得しようとすることです。成功すればそのまま、失敗すればスレッドがNodeとして同期キューの末尾に追加されます。
末尾への追加はCASが必要です。なぜなら、ロック獲得に失敗したスレッドやCondition待機キューから起床したスレッドが同時に追加される可能性があるからです。
同期キューの先頭ノードは現在同期状態を取得しているNodeです。先頭ノードのnextノードは、継続的にCASでロック取得を試みます。他のノードは自分の前のノードがheadかどうかのみをチェックします。headの次のノードだけがロック取得を試みることで、FIFOが保証されます。
AQSは排他モードと共有モードの取得・解放メソッドを提供します。それぞれの流れを以下にまとめます。
5.1 排他モードでの同期状態取得フロー
acquireを呼び、tryAcquireを実行。成功すれば戻る。tryAcquireが失敗した場合、排他モードのNodeを作成し、スピンとCASで同期キューの末尾に追加。- キューが空の場合、新しいNodeの前に空のheadノードを初期化。
- キューが空でない場合、新しいNodeが末尾に追加され、新しいtailとなる。
- スピン内で、自分の前のノードがheadかどうかを繰り返し確認。
- 前のノードがheadなら、
tryAcquireを再試行。失敗すればスピン継続。 tryAcquireが成功すると、現在のNodeをheadに設定。古いheadのnextはnullに(CAS不要)。- 取得成功を返す。
補足:headは現在ロックを保持するNodeを指しますが、キュー初期化時は最初のNodeとは別に新規Nodeオブジェクトがheadとなります。headのnextノードのみがスピンでtryAcquireを繰り返し、他のノードは前のノードがheadかどうかのみをチェックします。スピンによるCPU消費を避けるため、取得失敗時はLockSupport.park()でスレッドを待機させ、headノードが起床します。
5.2 排他モードでの同期状態解放フロー
releaseを呼び、tryReleaseを実行。失敗すればfalseを返す。- 成功した場合、
unparkSuccessorで次のノードを起床。 - まず現在のheadの状態を初期化し、
LockSupport.unpark()でnextノードを起床。
補足:排他モードの解放は単純で、nextノードを起床するだけです。nextノードが待機状態でなければ何もせず、そのノード自身のスピンでロックを取得します。
5.3 共有モードでの同期状態取得フロー
排他モードとほぼ同じですが、取得成功の条件が異なります。排他モードではtryAcquireの戻り値がtrueかfalseかで判断するのに対し、共有モードではtryAcquireSharedの戻り値が0以上かどうかで判断します。0以上なら成功、負なら失敗です。それ以外の流れは同一です。
5.4 共有モードでの同期状態解放フロー
releaseSharedを呼び、tryReleaseSharedを実行。失敗すればfalseを返す。- 成功した場合、
doReleaseSharedを呼び、後続ノードを起床。 - 起床処理はスピンとCASを繰り返す。共有モードでは複数スレッドが同時に解放を行う可能性があるため、CASでスレッドセーフを確保する。