PythonのGIL(グローバルインタープリタロック)の影響で、CPU負荷の高いタスクにはマルチスレッドを使用することができません。マルチコアCPUが普及する中、PythonはCPU負荷の高いタスクを実行するためのマルチプロセスソリューションを提供しています。しかし、直接マルチプロセス関連のAPIを使用するにはいくつかの問題があります。
まず、デモンストレーション用のサンプルコードを見てみましょう:
import time
from multiprocessing import Process
def calculate_sum(limit: int) -> int:
start_time = time.monotonic()
total = 0
for i in range(limit + 1):
total += i
print(f"{limit}までの計算が{time.monotonic() - start_time:.2f}秒で完了しました。")
return total
この関数はパラメータを受け取り、0からその値までの合計を計算します。実行時間を表示し、結果を返します。
マルチプロセスの問題点
def main():
# 2つのプロセスを初期化し、大きな数から小さな数の順に実行
proc1 = Process(target=calculate_sum, args=(200_000_000,))
proc2 = Process(target=calculate_sum, args=(50_000_000,))
# プロセスの実行を開始
proc1.start()
proc2.start()
# joinメソッドはブロックし、結果を順次取得します
start_a = time.monotonic()
proc1.join()
print(f"proc1が{time.monotonic() - start_a:.2f}秒で完了しました")
# proc1のjoin待ち中にproc2はすでに完了しているため、時間は0秒になります
start_b = time.monotonic()
proc2.join()
print(f"proc2が{time.monotonic() - start_b:.2f}秒で完了しました")
上記のコードのように、直接複数のプロセスを作成してstartとjoinメソッドを呼び出すと、いくつかの問題があります:
- joinメソッドはタスクの実行結果を返しません
- joinメソッドはメインプロセスをブロックし、順次実行します
後続のタスクが前のタスクより速く実行されても、結果は同じ順序で取得されます。
プールを使用する問題
multiprocessing.Poolを使用しても問題は解決しません:
def main():
with Pool() as pool:
result1 = pool.apply(calculate_sum, args=(200_000_000,))
result2 = pool.apply(calculate_sum, args=(50_000_000,))
print(f"200_000_000までの計算結果: {result1}")
print(f"50_000_000までの計算結果: {result2}")
Poolのapplyメソッドは同期処理であり、前のapplyタスクが完了するまで次のapplyタスクを実行できません。
もちろん、apply_asyncメソッドを使用して非同期タスクを作成することもできます。しかし、結果を取得するためにgetメソッドを使用する必要があり、これはjoinメソッドの問題に戻ってしまいます:
def main():
with Pool() as pool:
result1 = pool.apply_async(calculate_sum, args=(200_000_000,))
result2 = pool.apply_async(calculate_sum, args=(50_000_000,))
print(f"200_000_000までの計算結果: {result1.get()}")
print(f"50_000_000までの計算結果: {result2.get()}")
ProcessPoolExecutorの問題
concurrent.futures.ProcessPoolExecutorを使用する場合:
def main():
with ProcessPoolExecutor() as executor:
numbers = [200_000_000, 50_000_000]
for result in executor.map(calculate_sum, numbers):
print(f"計算結果: {result}")
見た目は良さそうですが、結果は起動順序で取得されます。これはasyncio.as_completedとは異なり、実行順序で結果を取得するものではありません。
asyncioのrun_in_executorで解決
幸いなことに、asyncioのrun_in_executorメソッドを使用することで、IOバウンドタスクと同様にマルチプロセスタスクを呼び出すことができます。これにより、並行処理と並列処理のAPIが統一され、上記の問題が解決されます:
async def main():
loop = asyncio.get_running_loop()
tasks = []
with ProcessPoolExecutor() as executor:
for num in [200_000_000, 50_000_000]:
tasks.append(loop.run_in_executor(executor, calculate_sum, num))
# またはasyncio.gather(*tasks)を使用
for completed in asyncio.as_completed(tasks):
result = await completed
print(f"計算結果: {result}")
このアプローチにより、IOバウンドタスクとCPUバウンドタスクを同時に処理し、効率的なコードを実現できます。実際のアプリケーションでは、このテクニックを活用することでパフォーマンスを大幅に向上させることができます。