概要
深層学習モデルの学習時間を短縮するため、Python の concurrent.futures と TensorFlow/Keras を組み合わせた並列パイプラインを構築する手順を解説します。
全体像
| フェーズ | 説明 |
|---|---|
| 1 | 依存ライブラリの読み込み |
| 2 | データ前処理の並列実行 |
| 3 | 訓練・検証データの分割 |
| 4 | ネットワークアーキテクチャ定義 |
| 5 | 分散学習設定 |
| 6 | 性能評価 |
実装
1. 必要モジュール
from concurrent.futures import ProcessPoolExecutor
import numpy as np
import tensorflow as tf
from sklearn.model_selection import train_test_split
2. 並列前処理
def normalize_chunk(chunk):
"""チャンク単位で標準化"""
mean = chunk.mean(axis=0)
std = chunk.std(axis=0)
return (chunk - mean) / (std + 1e-7)
def parallel_normalize(features, workers=4):
"""データをチャンクに分割し並列で標準化"""
chunks = np.array_split(features, workers)
with ProcessPoolExecutor(max_workers=workers) as exe:
results = list(exe.map(normalize_chunk, chunks))
return np.vstack(results)
3. データ読み込み・分割
raw = np.load('dataset.npy')
X, y = raw[:, :-1], raw[:, -1]
X = parallel_normalize(X)
X_train, X_val, y_train, y_val = train_test_split(
X, y, test_size=0.2, random_state=42
)
4. モデル定義
strategy = tf.distribute.MirroredStrategy()
with strategy.scope():
net = tf.keras.Sequential([
tf.keras.layers.Dense(128, activation='relu',
input_shape=(X_train.shape[1],)),
tf.keras.layers.BatchNormalization(),
tf.keras.layers.Dense(64, activation='relu'),
tf.keras.layers.Dense(1)
])
5. 学習
net.compile(
optimizer=tf.keras.optimizers.Adam(learning_rate=1e-3),
loss='mse',
metrics=['mae']
)
history = net.fit(
X_train, y_train,
validation_data=(X_val, y_val),
epochs=30,
batch_size=256,
verbose=2
)
6. 評価
val_loss, val_mae = net.evaluate(X_val, y_val, verbose=0)
print(f'Validation MAE: {val_mae:.4f}')