Pythonのmultiprocessingモジュールを用いて並列処理を構築する際、親プロセスと子プロセス間で状態を共有しようとすると、意図しない動作に直面することがあります。以下のスクリプトは、セマフォを用いて同時実行数を制御しつつ、複数のタスク結果を辞書へ集約する実装例です。
import time
from multiprocessing import Process, Semaphore
def execute_task(semaphore, task_idx, result_store):
# 擬似的な処理時間
time.sleep(0.3)
fetched_data = {f"item_{task_idx}": f"url_{task_idx}.example"}
semaphore.acquire()
for k, v in fetched_data.items():
result_store[k] = v
semaphore.release()
if __name__ == "__main__":
collected_data = {}
limit = Semaphore(4)
for idx in range(6):
worker = Process(target=execute_task, args=(limit, idx, collected_data))
worker.start()
worker.join()
print(collected_data)
このコードを実行すると、最終的な出力結果は空の辞書{}となります。子プロセス内で取得したデータは、親プロセスのメモリ領域に反映されません。
原因を特定するため、各プロセス内での辞書オブジェクトのメモリアドレスを検証します。
def execute_task(semaphore, task_idx, result_store):
time.sleep(0.1)
result_store[f"temp_{task_idx}"] = True
print(f"Process {task_idx} storage ID: {id(result_store)}")
semaphore.release()
実行ログを確認すると、プロセスごとに異なる整数値が出力されます。これは、Processを生成する際に引数として渡された辞書が、OSレベルで新しいメモリ空間に複製されていることを示しています。子プロセスが参照しているのは親プロセスのオブジェクトのコピーであり、その変更はローカルなコピーにのみ適用されます。
この挙動を受け、「変数をグローバルスコープで定義すれば共有できるのではないか」と考え、関数内でglobalキーワードを使用しようとするとSyntaxError: name 'x' is parameter and globalが発生します。Pythonの仕様上、引数として受け取った名前は自動的にローカル変数として扱われるため、global宣言と競合し構文エラーとなるためです。
プロセス間のデータ受け渡しを実現するには、OSが提供するIPC(Inter-Process Communication)の仕組み、またはランタイムが提供する抽象化レイヤーを利用する必要があります。主要なアプローチは以下の通りです。
- マネージャーオブジェクト: プロキシサーバーを介して共有状態を管理
- キュー (Queue): プロセスセーフなメッセージパッシング
- パイプ (Pipe): 双方向のストリーム通信
辞書やリストなどのミュータブルなオブジェクトを安全に共有する場合、Managerを利用するのが標準的な手法です。以下の実装では、共有メモリ上に辞書を配置し、複数のワーカープロセスから同時に書き込みを行っています。
import time
from multiprocessing import Process, Semaphore, Manager
def fetch_and_store(sync_obj, job_id, shared_dict):
# I/O待機のシミュレーション
time.sleep(0.2)
sync_obj.acquire()
try:
shared_dict[job_id] = f"processed_data_{job_id}"
finally:
sync_obj.release()
def main():
with Manager() as mgr:
# 共有メモリ上に辞書を作成
shared_results = mgr.dict()
concurrency_ctrl = Semaphore(5)
active_workers = []
for task_id in range(10):
p = Process(target=fetch_and_store,
args=(concurrency_ctrl, task_id, shared_results))
p.start()
active_workers.append(p)
for w in active_workers:
w.join()
# プロキシを標準辞書に変換して利用
final_output = dict(shared_results)
print(final_output)
if __name__ == "__main__":
main()
Manager().dict()は、内部で別プロセスを起動し、オブジェクトへのアクセスをIPCチャネルを経由して同期します。これにより、メモリ隔離の壁を越えた安全なデータ集約が可能になります。並列アーキテクチャを設計する際は、スレッドとプロセスのメモリモデルの違いを明確に区別し、要件に応じて適切な同期プリミティブを選択することが不可欠です。