JavaにおけるConcurrentHashMapの内部構造とスレッドセーフ実装

日常的な開発では、HashMapが広く使われていますが、そのデータ構造は配列+単方向連結リスト(JDK 1.8以降では、チェーン長が8を超えると赤黒木に変換)です。しかし、マルチスレッド環境下ではHashMapはスレッドセーフではなく、明示的な同期処理が必要です。

代わりにjava.util.Hashtableは同期化されたメソッドによりスレッドセーフを保証しますが、全体ロックによる性能劣化が課題でした。これを解決するために登場したのがjava.util.concurrent.ConcurrentHashMapです。

Hashtableの特徴と限界

  • 初期容量は11(HashMapは16)
  • インスタンス生成時に即座にテーブルを確保(HashMapは初回put時)
  • データ構造は配列+単方向リストのみ(赤黒木なし)
  • ノード挿入は先頭挿入方式
  • すべての操作メソッドにsynchronizedを付与
  • キー・値ともにnullを許容しない

この設計により、Hashtableはオブジェクト単位で排他制御されるため、高並行環境ではスレッド競合が頻発し、スループットが低下します。

JDK 1.7時代のConcurrentHashMap:セグメント分割ロック

JDK 1.7までは、ハッシュテーブル全体を複数の「セグメント」に分割し、各セグメントごとに独立したロックをかける「分段鎖」方式を採用していました。

public class SegmentedMap<K, V> {
    private static final int DEFAULT_SEGMENTS = 16;
    private final Segment[] segments;

    static class Segment extends ReentrantLock {
        private volatile HashEntry[] table;
        private int threshold;

        V insert(K key, int hash, V value) {
            lock();
            try {
                // ロック内でのみ操作
                HashEntry[] tab = table;
                int idx = hash & (tab.length - 1);
                HashEntry head = tab[idx];
                
                for (HashEntry e = head; e != null; e = e.next) {
                    if (e.key.equals(key)) {
                        V old = e.value;
                        e.value = value;
                        return old;
                    }
                }

                HashEntry newEntry = new HashEntry(hash, key, value, head);
                tab[idx] = newEntry;
                if (++size > threshold) resize();
                return null;
            } finally {
                unlock();
            }
        }
    }
}

この方式では、最大16のセグメントが存在するため、理論上は16スレッドまで同時に書き込み可能でした。ただし、セグメント数は初期化後に変更不可であり、柔軟性に欠けます。

JDK 1.8以降の進化:CASと部分的synchronized

JDK 1.8からは、HashMapと同様の「配列+連結リスト+赤黒木」構造を採用しつつ、ロック粒度を大幅に細分化しました。具体的には:

  • テーブル初期化やノード追加にはCAS(Compare-And-Swap)を使用
  • 既存ノードの更新や木構造操作にはsynchronizedで個別ノードをロック
  • リサイズ時は複数スレッドが協調してデータ移行
final class ModernConcurrentMap<K, V> {
    private transient volatile Node[] table;
    private volatile int sizeCtl;

    V putValue(K key, V val) {
        int hash = spread(key.hashCode());
        for (Node[] tab = table;;) {
            Node f; int n, i;
            
            if (tab == null || (n = tab.length) == 0) {
                tab = initializeTable();
            } else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
                if (casTabAt(tab, i, null, new Node(hash, key, val)))
                    break;
            } else {
                synchronized (f) {  // ノード単位のロック
                    if (tabAt(tab, i) == f) {
                        if (f.hash >= 0) {
                            // リスト処理
                        } else if (f instanceof TreeBin) {
                            // 木構造処理
                        }
                    }
                }
            }
        }
        return null;
    }

    private final Node[] initializeTable() {
        while (sizeCtl >= 0) {
            if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
                try {
                    if (table == null) {
                        Node[] nt = new Node[DEFAULT_CAPACITY];
                        table = nt;
                        sizeCtl = (int)(DEFAULT_CAPACITY * 0.75);
                    }
                } finally {
                    sizeCtl = calculatedThreshold;
                }
                break;
            }
        }
        return table;
    }
}

拡張処理(transfer)の協調動作

テーブル拡張時には、複数スレッドが同時にデータ移行を担当します。移行範囲はtransferIndexによって動的に割り当てられ、各スレッドは指定された範囲のバケットを処理します。

private void relocateEntries(Node[] oldTab, Node[] newTab) {
    int stride = (NCPU > 1) ? (oldTab.length >>> 3) / NCPU : oldTab.length;
    ForwardingNode marker = new ForwardingNode(newTab);

    for (int i = transferIndex; i > 0; ) {
        int bound = Math.max(i - stride, 0);
        for (int j = i - 1; j >= bound; j--) {
            Node node = oldTab[j];
            if (node == null) {
                setTabAt(oldTab, j, marker);
            } else if (node.hash >= 0) {
                // リストを二分割して再配置
            } else if (node instanceof TreeBin) {
                // 木構造の再配置と必要に応じてリスト化
            }
        }
        transferIndex = bound;
    }
}

読み取り操作の最適化

get()操作はロックを一切使用せず、volatile参照とCASだけで実現されています。これは、最新の値が必ず見えるという保証のもと、最大限のパフォーマンスを追求した設計です。

V getValue(Object key) {
    int h = spread(key.hashCode());
    Node[] tab = table;
    if (tab != null) {
        Node e = tabAt(tab, (tab.length - 1) & h);
        if (e != null) {
            if (e.hash == h && (e.key.equals(key))) 
                return e.val;
            else if (e.hash < 0)  // 拡張中または木構造
                return e.find(h, key).val;
            else {
                for (e = e.next; e != null; e = e.next)
                    if (e.hash == h && e.key.equals(key))
                        return e.val;
            }
        }
    }
    return null;
}

主要な技術要素の比較

実装同期手法ロック粒度データ構造
Hashtablesynchronizedメソッドオブジェクト全体配列+リスト
ConcurrentHashMap (1.7)ReentrantLockセグメント単位配列+リスト
ConcurrentHashMap (1.8+)CAS + synchronizedノード単位配列+リスト+赤黒木

タグ: Java ConcurrentHashMap マルチスレッド

7月4日 21:58 投稿