並列処理を活用した深層学習パイプラインの高速化

概要

深層学習モデルの学習時間を短縮するため、Python の concurrent.futures と TensorFlow/Keras を組み合わせた並列パイプラインを構築する手順を解説します。

全体像

フェーズ説明
1依存ライブラリの読み込み
2データ前処理の並列実行
3訓練・検証データの分割
4ネットワークアーキテクチャ定義
5分散学習設定
6性能評価

実装

1. 必要モジュール

from concurrent.futures import ProcessPoolExecutor
import numpy as np
import tensorflow as tf
from sklearn.model_selection import train_test_split

2. 並列前処理

def normalize_chunk(chunk):
    """チャンク単位で標準化"""
    mean = chunk.mean(axis=0)
    std = chunk.std(axis=0)
    return (chunk - mean) / (std + 1e-7)

def parallel_normalize(features, workers=4):
    """データをチャンクに分割し並列で標準化"""
    chunks = np.array_split(features, workers)
    with ProcessPoolExecutor(max_workers=workers) as exe:
        results = list(exe.map(normalize_chunk, chunks))
    return np.vstack(results)

3. データ読み込み・分割

raw = np.load('dataset.npy')
X, y = raw[:, :-1], raw[:, -1]
X = parallel_normalize(X)
X_train, X_val, y_train, y_val = train_test_split(
    X, y, test_size=0.2, random_state=42
)

4. モデル定義

strategy = tf.distribute.MirroredStrategy()

with strategy.scope():
    net = tf.keras.Sequential([
        tf.keras.layers.Dense(128, activation='relu',
                              input_shape=(X_train.shape[1],)),
        tf.keras.layers.BatchNormalization(),
        tf.keras.layers.Dense(64, activation='relu'),
        tf.keras.layers.Dense(1)
    ])

5. 学習

net.compile(
    optimizer=tf.keras.optimizers.Adam(learning_rate=1e-3),
    loss='mse',
    metrics=['mae']
)

history = net.fit(
    X_train, y_train,
    validation_data=(X_val, y_val),
    epochs=30,
    batch_size=256,
    verbose=2
)

6. 評価

val_loss, val_mae = net.evaluate(X_val, y_val, verbose=0)
print(f'Validation MAE: {val_mae:.4f}')

タグ: TensorFlow Keras python-multiprocessing concurrent-futures deep-learning-pipeline

6月26日 21:23 投稿