並行処理におけるデータ整合性の確保
マルチスレッドプログラミングにおいて、複数のスレッドが共有リソースにアクセスする際、実行順序によって結果が不定になる現象を競合状態(Race Condition)と呼びます。この問題を解決し、複数のスレッドが同時にオブジェクトの状態を変更してもデータが破損しないように保証する仕組みをスレッド同期といいます。このような保証を提供できるクラスはスレッドセーフであると言われます。.NET Framework Class Library (FCL) において、静的メンバーは基本的にスレッドセーフに設計されていますが、インスタンスメンバーについては開発者が同期機構を実装する必要があります。
.NET の同期機構は、大きく分けて「プリミティブ同期構築体」と「ハイブリッド同期構築体」に分類されます。プリミティブ構築体はさらに、ユーザーモード型とカーネルモード型に分けられます。ハイブリッド構築体は、これらを組み合わせて特定のパフォーマンス特性を持たせたものです。
- ユーザーモード型: Volatile, Interlocked, SpinLock
- カーネルモード型: AutoResetEvent, ManualResetEvent, Semaphore, Mutex
- ハイブリッド型: ManualResetEventSlim, SemaphoreSlim, Monitor, ReaderWriterLockSlim
以下では、基礎となるプリミティブ構築体の仕組みと実装例について解説します。
競合状態の具体例
複数のスレッドが共有変数をインクリメントする単純な例を通じて、同期がない場合の問題を確認します。
var counter = new SharedCounter();
var tasks = new Task[Environment.ProcessorCount];
for (int i = 0; i < Environment.ProcessorCount; i++)
{
tasks[i] = Task.Run(() => counter.Increment());
}
Task.WaitAll(tasks);
Console.WriteLine($"論理プロセッサ数:{Environment.ProcessorCount}");
Console.WriteLine($"ループ回数:{SharedCounter.Iterations}");
Console.WriteLine($"期待値:{Environment.ProcessorCount * SharedCounter.Iterations}");
counter.ShowResult();
public class SharedCounter
{
public const int Iterations = 10000;
private int _total;
public void Increment()
{
for (int i = 0; i < Iterations; i++)
{
_total++;
}
}
public void ShowResult()
{
Console.WriteLine($"実際の結果:{_total}");
}
}
実行結果は期待値(プロセッサ数×ループ回数)よりも少ない値になります。これは、_total++ 操作が原子操作ではないためです。この操作は「値の読み込み」「インクリメント」「書き込み」の 3 段階に分かれており、スレッド切り替えが発生すると、あるスレッドが読み込んだ値を別のスレッドが上書きしてしまうことが起こります。
ユーザーモード同期構築体
ユーザーモードの構築体は、特別な CPU 命令を用いて同期を実現します。オペレーティングシステム(OS)のカーネルを介さないため、コンテキストスイッチのコストが発生せず、高速に動作します。ただし、長時間のブロックには適しません。
1. Volatile 構築体
コンパイラや CPU による最適化(命令の並べ替えなど)を防止し、メモリの可視性を保証します。C# の volatile キーワードや Volatile.Read/Write メソッドを使用することで、読み書きの順序性を確保できます。
2. Interlocked 構築体
原子操作を提供するクラスです。インクリメント、デクリメント、交換、比較交換などの操作をスレッドセーフに実行できます。
前述の競合状態の例を Interlocked を使用して修正すると以下のようになります。
public void Increment()
{
for (int i = 0; i < Iterations; i++)
{
Interlocked.Increment(ref _total);
}
}
これにより、複数スレッドからの同時アクセスでも正確なカウントが可能になります。また、特定の初期化コードを一度だけ実行するためのフラグ管理などにも利用できます。
if (Interlocked.Exchange(ref _initialized, 1) == 0)
{
// 初期化処理(一度のみ実行)
}
3. SpinLock
ロックを取得できるまで、スレッドが CPU 時間を消費して待ち続ける(ビジーウェイト)ロック機構です。ロック保持時間が極めて短い場合に有効ですが、長時間待機すると CPU リソースを浪費します。
カーネルモード同期構築体
カーネルモードの構築体は、OS カーネルの機能を利用してスレッドをブロックします。ユーザーモードからカーネルモードへの遷移に伴うオーバーヘッドがありますが、待機時間が長い場合に CPU を他のスレッドに譲渡できるため、システム全体のリソース効率を向上させます。また、プロセス間同期にも利用可能です。
1. イベント(EventWaitHandle)
スレッド間のシグナリングに使用されるフラグです。状態が true(シグナルあり)の場合、待機しているスレッドが解放されます。
AutoResetEvent
待機スレッドを 1 つだけ解放し、自動的に状態を false に戻します。
var coordinator = new WorkCoordinator();
coordinator.StartWorkers();
public class WorkCoordinator
{
private int _completedCount = 0;
private readonly AutoResetEvent _signal = new AutoResetEvent(true);
public void StartWorkers()
{
for (int i = 0; i < Environment.ProcessorCount; i++)
{
new Thread(ExecuteWork).Start();
}
}
private void ExecuteWork()
{
while (true)
{
_signal.WaitOne();
// 模擬処理
Thread.Sleep(500);
var count = Interlocked.Increment(ref _completedCount);
Console.WriteLine($"完了タスク数:{count}");
_signal.Set();
}
}
}
初期状態を true に設定しておくことで、最初のスレッドが即座に処理を開始できます。処理完了後に Set() を呼び出すことで、次のスレッドに権限が渡されます。
ManualResetEvent
待機しているすべてのスレッドを解放します。状態を false に戻すには手動で Reset() を呼び出す必要があります。
private readonly ManualResetEvent _signal = new ManualResetEvent(true);
// 使用中
_signal.WaitOne();
_signal.Reset(); // 手動でブロック状態に戻す
// 処理...
_signal.Set(); // 全スレッドを解放
バッチ処理の開始合図など、複数のスレッドを同時に動かしたい場合に適しています。
2. セマフォ(Semaphore)
同時にリソースにアクセスできるスレッドの数を制限します。内部カウンターが 0 の場合、スレッドはブロックされます。カウンターが正の場合、アクセスが許可され、カウンターは減算されます。
var manager = new ResourcePoolManager();
manager.Run();
// 別スレッドなどで Release を呼び出し可能
// manager.ReleaseSlots(5);
public class ResourcePoolManager
{
// 最大 12、初期 5
private readonly Semaphore _pool = new Semaphore(5, 12);
private int _processed = 0;
public void Run()
{
for (int i = 0; i < Environment.ProcessorCount; i++)
{
new Thread(WorkerThread).Start();
}
}
private void WorkerThread()
{
while (true)
{
_pool.WaitOne();
// 処理模擬
Thread.Sleep(500);
var count = Interlocked.Increment(ref _processed);
Console.WriteLine($"処理済み:{count}");
}
}
public void ReleaseSlots(int count)
{
_pool.Release(count);
}
}
コンストラクタの引数で同時アクセス可能数を制御できます。Release メソッドを外部から呼び出すことで、動的にスロットを開放することが可能です。
3. ミューテックス(Mutex)
相互排他ロックを実現します。動作原理は AutoResetEvent に似ていますが、プロセス間での同期が可能であり、所有権の概念を持ちます。リソースを取得したスレッドは、必ず解放処理を行う必要があります。