Pythonの高度なテクニック:マルチプロセス、マルチスレッド、デコレータ

Pythonの高度なテクニック:マルチプロセス、マルチスレッド、デコレータ

本記事では、Pythonにおけるマルチプロセス、マルチスレッド、デコレータの概念と実装方法について解説します。これらのテクニックは、プログラムのパフォーマンス向上やコードの再利用性を高める上で重要です。

1. 基本概念の理解

マルチプロセス

マルチプロセスとは、単一のプログラム内で複数の独立したプロセスを同時に実行する技術です。各プロセスは独自のメモリ空間とリソースを持ち、互いに干渉しません。CPU密集型のタスクに適しています。

例えば、複数のブラウザウィンドウを開いている状況を考えてみましょう。各ウィンドウは独立したプロセスとして動作しており、一方のウィンドウがクラッシュしても他のウィンドウには影響しません。

マルチスレッド

マルチスレッドは、単一のプロセス内で複数のスレッドを同時に実行する技術です。複数のスレッドは同じメモリ空間を共有します。I/O密集型のタスク(主に入出力操作を伴うタスク)に適しており、スレッド間の切り替えはプロセスよりも軽量です。

例えば、ビデオを見ながらバックグラウンドでファイルをダウンロードするような状況です。これらの操作は異なるスレッドによって実行され、ビデオ再生とダウンロードは互いに干渉しません。

デコレータ

デコレータは、元の関数のコードを変更することなく、追加の機能を関数に付与できる特殊な関数です。

例えば、関数の実行前後に自動的に実行時間やログを記録する場合に使用されます。一般的に @decorator_name 構文が使用されます。

グローバルインタプリタロック(GIL)

GILは、任意の時点で1つのスレッドのみが実行されるようにする仕組みです。マルチコアプロセッサ上であっても、GILを使用するインタプリタは同時に1つのスレッドしか実行しません。GILを使用する一般的なインタプリタにはCPythonとRuby MRIがあります。

2. 実際のタスクでのマルチプロセス/マルチスレッドの使用

8000000個の数字1を足し合わせるタスクを考えてみましょう。事前にこのような配列を構築済みであり、コードの実行時間を記録する必要があるとします。通常の処理の考え方は以下の通りです:


start_time = time.time()
sum_num = 0
for i in num_list:
    sum_num += i
print(f"Used Time:{time.time()- start_time}")

実行時間:`Used Time:0.9650969505310059`

ここで、num_listを8つに分割するという考え方があります(数字の足し算は競合しないため、分割の知識を使用して解決します)。そして時間を計測すると `Used Time:0.07107281684875488` となります。しかし、このデータ計算は直列的です(最初のブロックを実行してから、次のブロックを計算します)。そこで、マルチスレッドで8つのブロックを直接計算し、最終結果を集約する方法を考えます。この場合の時間は:`Used Time:0.09244751930236816` となります。同時にマルチプロセスで計算した結果は:`Used Time:0.854262113571167` となります。

ここで興味深い現象が発生します。理論的には、マルチプロセスとマルチスレッドの速度は、通常の分割法より速くなるはずです。この現象の理由は以下の通りです:

  1. GIL(グローバルインタプリタロック)の影響
    • Pythonの `ThreadPoolExecutor` は GILの制限を受けます。複数のスレッドは実際に並列で実行されるのではなく、交互に実行されるため、シングルスレッドよりは速いですが、向上は限定的です。
    • `sum`計算はCPU密集型タスクであるため、スレッドプールはCPUのマルチコアの利点を十分に活かせず、パフォーマンスは直列の分割計算より劣ります
  2. プロセス間通信(IPC)のオーバーヘッド
    • `ProcessPoolExecutor` は各プロセスに独立したPythonインタプリタを作成し、データはメインプロセスとサブプロセス間で転送する必要があります。しかし、`num_list`が非常に大きいため、データ転送とプロセススケジューリングのコストが高く、パフォーマンスに影響します。
  3. タスク分割の追加オーバーヘッド
    • `sum`操作自体は非常にシンプルで計算時間が短いため、スレッドプールとプロセスプールの管理オーバーヘッド(スレッド/プロセスの作成、スケジューリング、リサイクル)が計算自体のコストを上回ることがあり、全体の実行時間が逆に長くなることがあります。

要するに、マルチプロセスを使用する場合は通信のコストを考慮し、マルチスレッドを使用する場合はGILの制限を考慮する必要があります。言い換えれば、得られる結論は以下の通りです

  • マルチスレッド(ThreadPoolExecutor)I/O密集型タスク(ファイルの読み書き、ネットワークリクエストなど)に適していますが、CPU計算タスクはGILの制限を受け、向上は限定的です。
  • マルチプロセス(ProcessPoolExecutor)CPU密集型タスクに適していますが、データ転送のオーバーヘッドが大きく、短時間の計算タスクには適さない場合があります。

3. コードでのマルチプロセス/マルチスレッド/デコレータの使用方法

1. マルチスレッドの使用

マルチスレッドの使用方法は比較的簡単です。以下に例を示します:


from concurrent.futures import ThreadPoolExecutor
with ThreadPoolExecutor(max_workers= n) as executor:
    futures = [executor.submit(sum, chunk) for chunk in chunks]  # タスクの提出
    results = [future.result() for future in as_completed(futures)]  # 結果の取得

もう一つの実行方式もあります:


import threading
thread_1 = threading.Thread(target= sum)
thread_2 = threading.Thread(target= sum)

thread_1.start()
thread_2.start()

thread_1.join()
thread_2.join()

最初の方法は比較的簡単です(スレッドの自動管理)、一方で2番目の方法では複数のプロセスを作成し、異なるプロセスに対して `start()` および `join()` を実行する必要があります。実際の使用では、長期間実行されるタスクの場合は `threading.Thread` を使用できます(例えば、常にカメラを開いたままにする場合は `threading.Thread(target=video_capture_thread, daemon=True).start()` を直接使用できます)。並列タスクの場合は、手動で作成する必要のない `ThreadPoolExecutor` を選択できます。

一般的に、使用過程で注意すべき操作はいくつかあります:1. 作成したプロセスにタスクを提出すること(提出する内容:計算する関数、関数が必要とするパラメータ);2. 提出したタスクの結果を取得すること(マルチスレッドであるため、返される結果は異なるスレッドの結果です)。

注意すべき点は以下の通りです:1. `submit` でタスクを提出する;2. `as_completed` でタスクを実行する。

しかし注意すべき点として、マルチスレッドを使用する場合は thread-safe(スレッドセーフ)である必要があります。例えば、同時にファイルに書き込む場合、LLMのAPIを実行する際に長いテキストを分割し(モデルの最大許容入力を保証)、「一度に」(スレッド数が分割数とちょうど一致すると仮定)APIアクセスを行う場合、処理結果をファイルに書き込む際にはプロセスロックの問題を考慮する必要があります。なぜなら、すべてのタスクの結果が同じファイルに書き込まれるとプロセスの競合が発生する可能性があるからです。例えば:


from concurrent.futures import ThreadPoolExecutor
import time

def llm_api_result(num):
    time.sleep(2)
    return f"{num}"*100

def write_to_file(num):
    content = f"Thread-{num}: " + llm_api_result(num)
    with open("./output-without-lock.txt", "a", encoding= "utf-8") as f:
        f.write(content)
        f.write("\n")

def main():
    with ThreadPoolExecutor(max_workers=5) as executor:
        futures = [executor.submit(write_to_file, num) for num in range(10)]
        for future in futures:
            future.result()

if __name__ == "__main__":
    start_time = time.time()
    main()
    print("Used Time:", time.time()- start_time)

このようにして得られた結果は以下の通りです:

しかし、上記のコードではプロセスにロック(`lock = threading.Lock()`)をかけていませんが、結果は正常に(`write` は原子操作)書き込まれます(時には内容が欠落することがあります)。しかし、書き込み順序は正しくありません。

原子操作(atomic operation)とは、中断されることのない操作を指します。それは完全に実行されるか、全く実行されないかのいずれかです。

コードを以下のように変更し、プロセスロックを使用して原子操作を保護します:


import threading
lock = threading.Lock()
def write_to_file(num):
    content = f"Thread-{num}: " + llm_api_result(num)
    with lock:
        with open("output-with-lock.txt", "a", encoding="utf-8") as f:
            f.write(content)
            f.write("\n")

このようにすると、結果を正常に書き込むことができます。

2. マルチプロセスの使用

Pythonのmultiprocessingモジュールはforkまたはspawnメカニズムに基づいており、複数の独立したプロセスを作成してタスクを並列実行させることができます。これにより、GIL(グローバルインタプリタロック)を回避し、CPU密集型タスクのパフォーマンスを向上させます(数学的演算、データ処理など)。使用方法も比較的簡単です。

具体的な例を以下に示します:


import time
import multiprocessing
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor

# フィボナッチ数列を計算する関数
def fibonacci(n):
    if n <= 1:
        return n
    else:
        return fibonacci(n - 1) + fibonacci(n - 2)

def single_process():
    start_time = time.time()
    for _ in range(4):
        fibonacci(35)
    end_time = time.time()
    print(f"Single-process time: {end_time - start_time:.2f} seconds")

def multi_thread():
    start_time = time.time()
    with ThreadPoolExecutor(max_workers= 4) as executor:
        futures = [executor.submit(fibonacci, 35) for _ in range(4)]
        result = [future.result() for future in futures]
    end_time = time.time()
    print(f"Multi-thread time: {end_time - start_time:.2f} seconds")

def multi_process1():
    start_time = time.time()
    processes = []
    for _ in range(4):
        process = multiprocessing.Process(target=fibonacci, args=(35,))
        processes.append(process)
        process.start()
    for process in processes:
        process.join()
    end_time = time.time()
    print(f"Multi-process-1 time: {end_time - start_time:.2f} seconds")

def multi_process2():
    start_time = time.time()
    with ProcessPoolExecutor(max_workers= 4) as executor:
        futures = [executor.submit(fibonacci, 35) for _ in range(4)]
        result = [future.result() for future in futures]
    end_time = time.time()
    print(f"Multi-process-2 time: {end_time - start_time:.2f} seconds")

if __name__ == "__main__":
    single_process()
    multi_thread()
    multi_process1()
    multi_process2()

Single-process time: 8.93 seconds
Multi-thread time: 9.89 seconds
Multi-process-1 time: 3.81 seconds
Multi-process-2 time: 3.67 seconds

Pythonでマルチプロセスとマルチスレッドを使用するコードには大きな違いはありませんが、マルチプロセスを使用する際には上記のコードで(Windowsシステム上で)以下のコードを使用する必要があることに注意してください。しかし、Linuxシステムではこの問題はありません。これは2つの起動方法が異なるためです。


if __name__ == "__main__":

これは、子プロセスを作成する際にメインモジュールが再インポートされるためです。マルチプロセスのコードを `if __name__ == "__main__":` ブロック内に配置しない場合、再帰的に子プロセスが作成される可能性があり、さらにはプログラムがクラッシュする可能性があります。より根本的な理由については、Python公式ドキュメントを直接参照してください。Pythonのマルチプロセスをさらに深く理解するために。

まず、Pythonのマルチプロセスプログラミングにおいて、プロセス間は相互に独立しており、直接メモリを共有することはできません。異なるプロセス間でデータを渡すためには、通常データをバイトストリームにシリアライズし、対象プロセスで逆シリアライズする必要があります。pickle(より詳細な説明🔗)はこのタスクを完了するために使用されます。言い換えれば、異なるプロセス間でデータを渡すためにpickleを使用する必要があるということです。例えば:


import multiprocessing

def square(number):
    return number ** 2

if __name__ == "__main__":
    numbers = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
    # 4つのサブプロセスを使用してプロセスプールを作成
    with multiprocessing.Pool(processes=4) as pool:
        # mapメソッドを使用してsquare関数をnumbersリストの各要素に適用
        # mapメソッドはタスクをサブプロセスに割り当てて並列実行
        results = pool.map(square, numbers)
    print("Original list:", numbers)
    print("Square results:", results)

上記の例では、pickle(multiprocessingモジュールはpickleを自動的に使用してシリアライズ一般的に:基本的なデータ型、リスト、タプル、辞書などのコンテナ型、カスタムクラスのインスタンス、関数(ただし、関数が参照する外部オブジェクト、ファイルオブジェクト、データベース接続などを含まない))と逆シリアライズ))以下の内容:`square`関数(サブプロセスに渡される)。`numbers`リスト(サブプロセスに渡される)。`results`リスト(サブプロセスからメインプロセスに返される)。

3. デコレータ

デコレータ(Decorator)は、関数やクラスの動作を変更するための高度なPython構文です。本質的には高階関数であり、元の関数のコードを変更することなく、動的に機能を追加できます。主な役割は重複コードの削減などです。言い換えれば関数をパラメータとして関数に入力することです。使用方法は非常に簡単で、使用したい関数の上に直接 `@デコレータ` を追加するだけです。

例えば、関数の実行時間を計算する一般的な方法は以下の通りです:


import time

def test():
    print("Hello!")

def main():
    start_time = time.time()
    test()
    print(f"Used Time: {time.time()- start_time}")

しかし、時間計算関数を定義する場合、例えば:


def com_time(func):
    start_time = time.time()
    func()
    print(f"Used Time: {time.time()- start_time}")

def main():
    com_time(test)

ここでは関数 `test` をパラメータとして入力しています。より簡単な方法は直接デコレータを使用することです。例えば:


def com_time(func):
    def wrapper(*args, **kwargs):
        start_time = time.time()  # 開始時間を記録
        result = func(*args, **kwargs)  # 装飾された関数を実行
        print(f"Used Time: {time.time() - start_time:.4f} seconds")  # 実行時間を計算して出力
        return result
    return wrapper

@com_time # com_time(test)と同等
def test():
    time.sleep(0.5)
    print("Hello!")

def main():
    test()

注目すべき点は、以下のように直接定義するとエラーが発生することです:


def com_time(func):
    start_time = time.time()
    func()
    print(f"Used Time: {time.time()- start_time}")

これはデコレータ内のコードロジックによるものです。具体的には、デコレータ内で直接func()を呼び出していますが、デコレータの正しい使用法は***包装関数を返して、元の関数の実行を置き換える***ことです。

Pythonの特殊属性

1. オブジェクトとクラスの特殊属性(これらの属性は通常クラスオブジェクトに現れます)

属性 役割
`__dict__` オブジェクトの属性辞書を返します(`__dict__`を持つオブジェクトにのみ適用)。
`__class__` オブジェクトのクラスを取得します。
`__bases__` クラスのすべての基底クラスを取得します(クラスにのみ適用)。
`__name__` クラスまたはモジュールの名前を返します
`__module__` クラス定義があるモジュールを示します。
`__mro__` メソッド解決順序(Method Resolution Order)を返します。

2. モジュールとファイルの特殊属性(これらの属性は通常モジュールファイルレベルで使用されます)

属性 役割
`__file__` モジュールのファイルパスを返します(Pythonスクリプトにのみ適用)。
`__name__` モジュールの名前を返し、メインプログラム実行時には `"__main__"` を返します。
`__package__` モジュールのパッケージ名を返し、モジュールがパッケールの一部でない場合は `None` になります。
`__doc__` モジュール、クラス、または関数のドキュメント文字列(Docstring)を返します。
`__annotations__` 関数のパラメータと戻り値の注釈辞書を返します。

3. 実行時関連の特殊属性(これらの属性はPython実行時に関連します)

属性 役割
`__import__` モジュールのインポートを担当します(通常は `import` 文を使用し、直接呼び出しはしません)。
`__builtins__` Pythonの組み込み関数と例外を含むモジュール。
`__debug__` Python実行時の `debug` モードで、デフォルト値は `True` です。
`__loader__` モジュールを読み込むローダーオブジェクト。

4. メソッド関連の特殊属性(これらの属性は主にメソッドに関連します)

属性 役割
`__call__` オブジェクトを呼び出し可能にします(`__call__` メソッドをオーバーロードできます)。
`__getitem__` オブジェクトが `obj[key]` でアクセスできるようにします(`__getitem__` をオーバーロードできます)。
`__setitem__` オブジェクトが `obj[key] = value` で代入できるようにします(`__setitem__` をオーバーロードできます)。
`__delitem__` オブジェクトが `del obj[key]` で要素を削除できるようにします(`__delitem__` をオーバーロードできます)。
`__len__` オブジェクトが `len(obj)` で長さを取得できるようにします(`__len__` をオーバーロードできます)。
`__repr__` オブジェクトの公式文字列表現を返し、`repr(obj)` で使用されます。
`__str__` オブジェクトの読みやすい文字列表現を返し、`str(obj)` や `print(obj)` で使用されます。

class Test():
    def __init__(self, age):
        self.age = age
    
    def add(self):
        '''加算'''
        return self.age+ 1

test = Test(13)
test.__dict__['name'] = 'https://www.example.com/'
print(test.name)
print(test.add.__name__)
print(test.add.__doc__)

https://www.example.com/
add
加算

Pythonの組み込みデコレータ:

  1. `@staticmethod`: 静的メソッドを定義し、インスタンス化せずに呼び出し可能
  2. `@classmethod`: クラスメソッドを定義し、クラス変数にアクセス可能
  3. `@property`: メソッドをプロパティに変換

例えば:


class Person:
    place= 'bj' # クラス変数(すべてのインスタンスで共有)

    def __init__(self, name):
        self.name = name
    
    @staticmethod
    def age1(age):
        print(f"{age}")
    
    @classmethod
    def new_place(cls, new):
        cls.place = new

    def age2(self, age):
        print(f"{self.name}:{age} from {self.place}")
    
    def age3(self, age):
        if age>= 20:
            Person.new_place('sh')
        print(f"{self.name}:{age} from {self.__class__.place}")

Person.age1(13)
Person("Tom").age2(13)
Person("Tom").age3(23)

13
Tom:13 from bj
Tom:23 from sh

しかし、順序を変更すると:


Person.age1(13)
Person("Tom").age3(23)
Person("Tom").age2(13)

13
Tom:23 from sh
Tom:13 from sh

これは `age3` を使用した際にクラス変数が変更されたためです。他のデコレータとして、PyTorch内には勾配更新を行わないものがあり、`with torch.no_grad()` の代わりに `@torch.no_grad()` を使用できます(これは一般的にクラス内に追加します(例えば、コンテンツ生成時には勾配の更新は通常不要です))。

結論

Pythonにおけるマルチプロセス/マルチスレッド/デコレータについて説明しました。より詳細かつ正確な情報については、公式ドキュメントを直接参照してください!リンク:https://docs.python.org/ja/3.12

参考

  1. https://docs.python.org/ja/3.13/library/concurrent.futures.html
  2. https://docs.python.org/ja/3.13/library/threading.html
  3. https://ja.wikipedia.org/wiki/%E3%82%B0%E3%83%AD%E3%83%BC%E3%83%90%E3%83%AB%E3%82%A4%E3%83%B3%E3%82%BF%E3%83%97%E3%83%AA%E3%82%BF%E3%83%AD%E3%83%83%E3%82%AF
  4. https://ja.wikipedia.org/wiki/CPU%E5%AF%86%E9%9B%86%E5%9E%8B
  5. https://docs.python.org/ja/3.13/library/multiprocessing.html
  6. https://docs.python.org/ja/3.12/library/pickle.html
  7. https://docs.python.org/ja/3.12/library/stdtypes.html#definition.__name__

タグ: Python マルチプロセス マルチスレッド デコレータ GIL

6月8日 20:59 投稿