キュー
キューはデータを一時的に保存するためのデータ構造で、特に並行処理において重要な役割を果たします。Pythonではqueueモジュールが提供されています。
import queue
# FIFOキュー(先入れ先出し)
q = queue.Queue(2)
q.put('データ1')
print(q.get()) # データ1
q.task_done()
q.join()
# LIFOキュー(後入れ先出し)- スタック
lifo_q = queue.LifoQueue()
lifo_q.put('項目A')
lifo_q.put('項目B')
print(lifo_q.get()) # 項目B
# 優先度キュー
priority_q = queue.PriorityQueue()
priority_q.put((2, '普通のタスク'))
priority_q.put((1, '緊急のタスク'))
priority_q.put((3, '低優先度のタスク'))
print(priority_q.get()) # (1, '緊急のタスク')
スレッドタイマー
スレッドタイマーは指定した時間が経過した後に特定のタスクを実行するために使用されます。
import threading
import time
def scheduled_task():
print('タイマータスク開始...')
time.sleep(2)
print('タイマータスク完了...')
# 5秒後にタスクを実行
timer = threading.Timer(5, scheduled_task)
timer.start()
スレッドプールとプロセスプール
concurrent.futuresモジュールによるプール実装
プールの主な目的は、同時に実行されるスレッドやプロセスの数を制限することです。これは、システムリソースを過度に消費することなく、多数のタスクを効率的に処理するために重要です。
プロセスプールとスレッドプールの使い分け
基本的には、CPUバウンドなタスクにはプロセスプールを、I/Oバウンドなタスクにはスレッドプールを使用します。
import concurrent.futures
import threading
import time
import random
def process_data():
thread_id = threading.current_thread().ident
print(f'スレッド{thread_id}でタスク開始...')
print('-' * 40)
time.sleep(1)
print(f'スレッド{thread_id}でタスク完了...')
print('-' * 40)
return random.randint(1, 100)
if __name__ == '__main__':
# 最大5つのスレッドを持つスレッドプールを作成
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
futures = []
# 20個のタスクをプールに提出
for i in range(20):
future = executor.submit(process_data)
futures.append(future)
# すべてのタスクが完了するのを待ち、結果を処理
for future in concurrent.futures.as_completed(futures):
result = future.result()
print(f'タスク結果: {result}')
コールバック関数の使用
import concurrent.futures
import threading
import time
import random
def compute_value():
thread_id = threading.current_thread().ident
print(f'スレッド{thread_id}で計算開始...')
print('-' * 40)
time.sleep(1)
print(f'スレッド{thread_id}で計算完了...')
print('-' * 40)
return (f'スレッド{thread_id}の結果', random.randint(1, 100))
def process_result(future):
result = future.result()
thread_id = threading.current_thread().ident
processed_value = result[1] ** 2
print(f'スレッド{thread_id}で処理: {result[0]}の二乗は{processed_value}')
if __name__ == '__main__':
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
for i in range(20):
future = executor.submit(compute_value)
# 各タスクにコールバック関数を追加
future.add_done_callback(process_result)
スレッドプールとセマフォの違い
スレッドプールでは、固定数のスレッドがすべてのタスクを処理します。一方、セマフォは、同時に実行できるスレッドの数を制限しますが、タスクの非クリティカルセクションでは制限が適用されません。
コルーチン
コルーチンとは
PythonのスレッドはOSのネイティブスレッドを使用しますが、コルーチンはプログラマが実装する協調的マルチタスクの形式です。単一のスレッド内でコンテキストスイッチングを実現し、I/O待ち時間を効率的に利用します。
意味のあるコルーチンの使用
コルーチンはI/O操作中にコンテキストを切り替える場合に特に有効です。計算密集型タスクでの頻繁な切り替えは、かえってパフォーマンスを低下させる可能性があります。
# 計算密集型タスクでのコルーチンの非効率性
import time
def calculation_task1():
counter = 0
while True:
counter += 1
yield # 状態を保存して制御を放棄
def calculation_task2():
generator = calculation_task1()
for i in range(25000000):
# 計算処理
result = 10 ** 2
next(generator) # タスク1に切り替え
if __name__ == '__main__':
start = time.time()
calculation_task2()
end = time.time()
print(f'コルーチン使用: {end - start:.2f}秒')
# シーケンシャル実行
def sequential_calculation():
for i in range(25000000):
result = 10 ** 2
for i in range(25000000):
result = 10 ** 2
start = time.time()
sequential_calculation()
end = time.time()
print(f'シーケンシャル実行: {end - start:.2f}秒')
コルーチンの利点と欠点
利点:
- OSのスケジューリングによる切り替えよりも高速なコンテキストスイッチが可能
欠点:
- すべてのI/O操作を手動で検出する必要がある
- マルチコアプロセッサの利点を活かせない
geventによるI/Oモニタリングとタスク切り替え
import gevent
from gevent import monkey
import time
import random
# 標準ライブラリのI/O操作をgeventが検出できるようにパッチ適用
monkey.patch_all()
def io_intensive_task1():
print('タスク1開始...')
# 計算処理
for i in range(25000000):
value = random.random() ** 2
# I/O操作(シミュレート)
time.sleep(2)
print('タスク1完了')
def io_intensive_task2():
print('タスク2開始...')
# 計算処理
for i in range(25000000):
value = random.random() ** 2
# I/O操作(シミュレート)
time.sleep(3)
print('タスク2完了')
start_time = time.time()
# グリーンスレッドを作成
greenlet1 = gevent.spawn(io_intensive_task1)
greenlet2 = gevent.spawn(io_intensive_task2)
# すべてのグリーンスレッドが完了するのを待つ
gevent.joinall([greenlet1, greenlet2])
end_time = time.time()
print(f'総実行時間: {end_time - start_time:.2f}秒')