本稿では、LSTM(Long Short-Term Memory)モデルを使用して銅先物価格を予測するコードについて解説します。コードは、モデルの訓練部分と予測部分の2つに分かれます。
モデル訓練部分 以下のPythonコードは、LSTMモデルを訓練するための関数です。TensorFlowを使用してモデルを構築し、Adamオプティマイザで学習を進めます。
import tensorflow as tf
import numpy as np
import matplotlib.pyplot as plt
# 仮の入力サイズ、出力サイズ、学習率を定義
input_size = 1 # 実際のデータに合わせて調整
output_size = 1 # 実際のデータに合わせて調整
lr = 0.001 # 実際のデータに合わせて調整
# 仮のデータ取得関数(実際にはCSVファイルなどから読み込む)
def get_train_data(batch_size, time_step, train_begin, train_end):
# ダミーデータの生成
data_size = 200
data = np.sin(np.arange(data_size).reshape(-1, 1) * 0.1) + np.random.randn(data_size, 1) * 0.1
mean = np.mean(data[train_begin:train_end])
std = np.std(data[train_begin:train_end])
data = (data - mean) / std
batch_index = []
for i in range(train_begin, train_end):
if i % batch_size == 0:
batch_index.append(i)
train_x = []
train_y = []
for i in range(time_step, len(data)):
x = data[i - time_step:i]
y = data[i]
train_x.append(x.tolist())
train_y.append(y.tolist())
return batch_index, np.array(train_x), np.array(train_y)
# 仮のLSTMモデル定義(実際にはより詳細なネットワーク構造を定義)
def lstm(X):
# ここに実際のLSTMセルとレイヤーを定義する
# 例:
# lstm_cell = tf.nn.rnn_cell.BasicLSTMCell(num_units=128, state_is_tuple=True)
# rnn_output, _ = tf.nn.dynamic_rnn(lstm_cell, X, dtype=tf.float32)
# outputs = tf.contrib.layers.fully_connected(rnn_output[:, -1], output_size) # 最後のタイムステップの出力のみを使用
# ダミーの出力
outputs = tf.random_normal(shape=[tf.shape(X)[0], time_step, output_size])
return outputs, None # return pred, _
def train_lstm(batch_size=5, time_step=2, train_begin=0, train_end=150):
X = tf.placeholder(tf.float32, shape=[None, time_step, input_size])
Y = tf.placeholder(tf.float32, shape=[None, time_step, output_size])
batch_indices, train_features, train_targets = get_train_data(batch_size, time_step, train_begin, train_end)
predicted_values, _ = lstm(X)
# 損失関数:予測値と実測値の二乗誤差の平均
loss = tf.reduce_mean(tf.square(tf.reshape(predicted_values, [-1]) - tf.reshape(Y, [-1])))
train_optimizer = tf.train.AdamOptimizer(lr).minimize(loss)
saver = tf.train.Saver(tf.global_variables(), max_to_keep=15)
with tf.Session() as sess:
sess.run(tf.global_variables_initializer())
# モデルの保存と復元(モジュールのパスは実行環境に合わせて調整)
# module_file = tf.train.latest_checkpoint('./')
# if module_file:
# saver.restore(sess, module_file)
num_epochs = 150
for epoch in range(num_epochs):
for step in range(len(batch_indices) - 1):
batch_features = train_features[batch_indices[step]:batch_indices[step+1]]
batch_targets = train_targets[batch_indices[step]:batch_indices[step+1]]
_, current_loss = sess.run([train_optimizer, loss], feed_dict={X: batch_features, Y: batch_targets})
print(f"Epoch {epoch}: Loss = {current_loss:.4f}")
# 15エポックごとにモデルを保存
if epoch % 15 == 0:
save_path = saver.save(sess, './future.model', global_step=epoch)
print(f"Model saved: {save_path}")
# 訓練の実行例
# train_lstm(batch_size=10, time_step=3, train_begin=0, train_end=180)
予測部分 以下のコードは、訓練済みのLSTMモデルを使用して将来の価格を予測する関数です。
# 仮のテストデータ取得関数
def get_test_data(time_step):
# ダミーデータの生成
data_size = 50
data = np.sin(np.arange(data_size).reshape(-1, 1) * 0.1) + np.random.randn(data_size, 1) * 0.1
mean = np.mean(data)
std = np.std(data)
data = (data - mean) / std
test_x = []
test_y = []
for i in range(time_step, len(data)):
x = data[i - time_step:i]
y = data[i]
test_x.append(x.tolist())
test_y.append(y.tolist())
return mean, std, np.array(test_x), np.array(test_y)
def prediction(time_step=2):
X = tf.placeholder(tf.float32, shape=[None, time_step, input_size])
mean_val, std_val, test_features, test_targets = get_test_data(time_step)
predicted_values, _ = lstm(X)
saver = tf.train.Saver(tf.global_variables())
with tf.Session() as sess:
# 保存されたモデルを復元
module_file = tf.train.latest_checkpoint('./')
if module_file:
saver.restore(sess, module_file)
else:
print("Error: No checkpoint found.")
return
predicted_results = []
for i in range(len(test_features)):
# 1ステップずつ予測
prediction_output = sess.run(predicted_values, feed_dict={X: [test_features[i]]})
predicted_flat = prediction_output.reshape((-1))
predicted_results.extend(predicted_flat)
# 予測結果を元のスケールに戻す (この部分のインデックスは実際のデータ構造に依存)
# 仮にstd_val[7]とmean[7]を使用していますが、実際のデータと特徴量に合わせて調整が必要です。
# 例: if len(std_val) > 7 and len(mean_val) > 7:
# scale_factor_std = std_val[7]
# scale_factor_mean = mean_val[7]
# else:
# scale_factor_std = std_val.mean() # フォールバック
# scale_factor_mean = mean_val.mean() # フォールバック
# ここでは単純化のため、平均と標準偏差全体を使用
actual_targets_rescaled = np.array(test_targets) * std_val + mean_val
predicted_rescaled = np.array(predicted_results) * std_val + mean_val
# 予測値と実測値の長さを合わせる
min_len = min(len(predicted_rescaled), len(actual_targets_rescaled))
predicted_rescaled = predicted_rescaled[:min_len]
actual_targets_rescaled = actual_targets_rescaled[:min_len]
# 精度計算(平均絶対誤差の割合)
accuracy = np.mean(np.abs(predicted_rescaled - actual_targets_rescaled) / actual_targets_rescaled)
print(f"Prediction Accuracy (MAE percentage): {accuracy:.4f}")
# 結果を折れ線グラフで表示
plt.figure(figsize=(12, 6))
plt.plot(list(range(len(predicted_rescaled))), predicted_rescaled, color='blue', label='Predicted Price')
plt.plot(list(range(len(actual_targets_rescaled))), actual_targets_rescaled, color='red', label='Actual Price')
plt.title('Copper Futures Price Prediction')
plt.xlabel('Time Steps')
plt.ylabel('Price')
plt.legend()
plt.grid(True)
plt.show()
# 予測の実行例
# prediction(time_step=3)
上記のコードは、TensorFlowとLSTMを使用して銅先物価格の時系列データを予測する基本的な実装例です。実際の運用では、データの正規化方法、モデルのアーキテクチャ、ハイパーパラメータのチューニングなどが重要になります。