概要
電子機器の開発・検証プロセスにおいて、シリアル通信(UART)によるコマンド送受信やステータス取得は頻繁に行われます。Windows および Linux などの OS はシリアルポートをデバイスファイルとして扱いますが、アプリケーションからは API 経由でのみアクセス可能です。Python の pySerial ライブラリを使用することで、外部デバイスの制御や通信データの自動抽出が可能となり、手動操作に比べて効率性と再現性が大幅に向上します。
本ガイドでは、以下の機能を持つ自動化テストスクリプトの構築手順について説明します。
- システム上の利用可能なシリアルポートの自動スキャン
- 指定パラメータによる通信接続とエラーハンドリング
- AT コマンドを送信し、特定のレスポンス文字列で正常性を判定するロジック
- 通信履歴の自動記録と、タイムスタンプ付きのバックアップ管理
必要なライブラリ
まず、以下の標準ライブラリおよびサードパーティライブラリを準備する必要があります。
import os
import sys
import time
import datetime
import shutil
import logging
import csv
from pathlib import Path
# pySerial ライブラリのインストールが必要: pip install pyserial
from serial import Serial
from serial.tools import list_ports
実装の詳細
コード構造を整理するため、クラスベースで構成しています。これにより、シリアルインスタンスのライフサイクル管理やロギングの設定が一元的に行えます。
1. ポート検索関数
システムに登録されているすべてのシリアルインターフェースを取得します。
def scan_available_ports() -> list:
"""
システム内のシリアルポートを検索し、デバイス ID のリストを返す
"""
com_ports = []
try:
devices = list_ports.comports()
com_ports = [device.device for device in devices]
except Exception as e:
print(f"ポート検索エラー: {str(e)}")
return com_ports
2. 接続管理クラス
シリアル接続のセットアップ、通信、切断をカプセル化します。
class SerialConnection:
def __init__(self, port_name: str, baud_rate: int = 115200):
self.port = port_name
self.baud_rate = baud_rate
self.serial_instance = None
def connect(self) -> bool:
"""
指定されたポートへ接続を試みる
"""
try:
self.serial_instance = Serial(
port=self.port,
baudrate=self.baud_rate,
bytesize=8,
parity='N',
stopbits=1,
timeout=5.0
)
print(f"[OK] 接続成功:{self.port} ({self.baud_rate}bps)")
return True
except Exception as e:
print(f"[NG] 接続失敗:{str(e)}")
return False
def disconnect(self):
"""
接続を閉じる処理
"""
if self.serial_instance and self.serial_instance.is_open:
self.serial_instance.close()
print("[INFO] 接続終了")
def send_command(self, command: bytes):
"""
デバイスへのデータ送信
"""
if self.serial_instance and self.serial_instance.is_open:
self.serial_instance.write(command)
def read_response(self, buffer_size: int = 1024) -> str:
"""
受信バッファからデータを取得
"""
if self.serial_instance and self.serial_instance.in_waiting > 0:
raw_data = self.serial_instance.read(buffer_size)
return raw_data.decode('utf-8', errors='ignore').strip()
return ""
3. 通信ロジックとレスポンスチェック
デバイスの起動確認(Boot OK など)や、コマンド実行結果のパースを行います。非同期処理を考慮したタイマー制御を追加しています。
def verify_device_status(conn: SerialConnection, expected_keyword: str, timeout_sec: float = 60.0) -> bool:
"""
デバイスが期待する応答(例:"Param OK")を示すまで待機
"""
start_time = time.time()
conn.serial_instance.reset_input_buffer() # バッファクリア
while (time.time() - start_time) < timeout_sec:
response = conn.read_response()
if response:
print(f"[LOG] 受信データ:{response}")
if expected_keyword in response:
return True
time.sleep(0.5)
return False
def execute_test_sequence(conn: SerialConnection):
"""
テストに必要な一連のコマンド送信と監視
"""
# デバッグモード無効化など初期化処理
init_cmd = b'AT+NV=SET,CLOSEDEBUG,0\r\n'
conn.send_command(init_cmd)
time.sleep(2)
# ステータス確認
if not verify_device_status(conn, "Param OK"):
print("警告: デバイス初期化応答が確認できません")
return
# メインテストコマンド
main_cmd = b'AT+F4\r\n'
conn.send_command(main_cmd)
# 完了待ち(累積稼働時間など特定キーワード)
target_result = "Accumulated Working Time"
if not verify_device_status(conn, target_result, timeout_sec=30.0):
print("エラー:主要レスポンス未受信")
else:
print("テストコマンド実行完了を確認")
4. ロギングとファイル管理
運用中に発生する出力はすべてファイルに保存し、日付ベースでアーカイブされます。
class LogManager:
def __init__(self, log_dir: str, backup_dir: str):
self.log_dir = Path(log_dir)
self.backup_dir = Path(backup_dir)
self.logger = None
def setup_logging(self):
"""
ログディレクトリ作成と Logger 設定
"""
self.log_dir.mkdir(parents=True, exist_ok=True)
timestamp_file = datetime.datetime.now().strftime('%Y%m%d_%H%M%S')
log_filename = f"log_{timestamp_file}.txt"
log_path = self.log_dir / log_filename
self.logger = logging.getLogger("SerialAutoTest")
self.logger.setLevel(logging.INFO)
fh = logging.FileHandler(log_path, encoding='utf-8')
sh = logging.StreamHandler()
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
fh.setFormatter(formatter)
sh.setFormatter(formatter)
self.logger.addHandler(fh)
self.logger.addHandler(sh)
def clear_logs(self):
"""
旧ログの清理(必要に応じて実行)
"""
for item in self.log_dir.iterdir():
if item.is_file():
item.unlink()
print("旧ログファイルをクリアしました")
def backup_logs(self):
"""
現在のログを日時フォルダに移動
"""
current_time_str = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
backup_subdir = self.backup_dir / current_time_str
backup_subdir.mkdir(parents=True, exist_ok=True)
try:
shutil.copytree(str(self.log_dir), str(backup_subdir), dirs_exist_ok=True)
self.logger.info(f"ログをバックアップしました:{backup_subdir}")
except Exception as e:
self.logger.error(f"バックアップエラー:{str(e)}")
5. メインフロー
全モジュールを組み合わせて実行フローを定義します。
def run_automation_test():
"""
メインエントリーポイント
"""
config = {
'log_base': r"./logs/output",
'backup_base': r"./logs/archive",
'default_port': None,
'baudrate': 115200
}
manager = LogManager(config['log_base'], config['backup_base'])
try:
manager.clear_logs()
manager.setup_logging()
available_ports = scan_available_ports()
if not available_ports:
manager.logger.warning("利用可能なシリアルポートが見つかりません")
return
print(f"利用可能なポート:{available_ports}")
selected_port = input("対象ポートを入力してください(空ならデフォルト):") or available_ports[0]
# 接続確立
connection = SerialConnection(selected_port, config['baudrate'])
if connection.connect():
manager.logger.info(f"デバイス接続開始:{selected_port}")
# 通信動作テスト
execute_test_sequence(connection)
connection.disconnect()
else:
manager.logger.error("デバイス接続に失敗しました")
finally:
manager.backup_logs()
print("テストセッション完了")
if __name__ == "__main__":
run_automation_test()
補足事項
上記スクリプトを実行する際は、実行ユーザーに COM ポートへの書き込み権限があることを確認してください。また、実際のバッチ処理や継続的な監視を想定する場合は、スレッドやマルチプロセスによる並列処理の導入を検討するとより効率的です。