Pythonソケットにおけるリバース接続モデルと複数回コマンド実行の実装

従来のクライアント/サーバー通信では、制御側がリスンポートを開き、実行側が接続を試行する構成が標準的です。しかし、ファイアウォール制限や特定の監視環境では、役割を反転させ、コマンド実行側が接続待ち状態となり、制御側から複数回の接続要求を受け付けるリバース接続パターンが採用されます。本実装では、同一LAN内で待機側が制御側の接続を複数回受け付け、受信したシェルコマンドをローカルで実行して標準出力を返送し、接続断発生時にも双方が自動的に再接続サイクルへ復帰する仕組みを構築します。

待機側(コマンド実行ノード)の実装

待機側は特定のポートでバインドを行い、制御側の接続を監視します。接続確立後はコマンドペイロードを受信し、subprocessモジュール経由で同期実行します。実行完了後に結果をストリーム送信し、セッションを正常に閉じます。例外発生時にはソケットを解放し、無限ループ構造により次の接続要求を継続して待ち受けます。

import socket
import subprocess
import time
import sys

LISTEN_HOST = '0.0.0.0'
LISTEN_PORT = 9090
BUFFER_SIZE = 4096
RECONNECT_DELAY = 3

def run_command_listener():
    print(f"制御側からの接続を待機中 (ポート: {LISTEN_PORT})...")
    while True:
        try:
            with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as listener:
                listener.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
                listener.bind((LISTEN_HOST, LISTEN_PORT))
                listener.listen(1)
                
                # 接続受け付け
                conn, addr = listener.accept()
                with conn:
                    print(f"接続確立: {addr[0]}:{addr[1]}")
                    
                    # コマンド受信
                    payload = conn.recv(BUFFER_SIZE)
                    if not payload:
                        continue
                    cmd = payload.decode('utf-8').strip()
                    print(f"受信コマンド: {cmd}")
                    
                    # 安全なコマンド実行
                    try:
                        result = subprocess.run(
                            cmd,
                            shell=True,
                            capture_output=True,
                            text=True,
                            timeout=20
                        )
                        output = result.stdout if result.returncode == 0 else result.stderr
                        print(f"実行完了 (戻り値: {result.returncode})")
                    except subprocess.TimeoutExpired:
                        output = "Error: Command timed out\n"
                    except Exception as e:
                        output = f"Error: {e}\n"
                        
                    # 結果送信
                    conn.sendall(output.encode('utf-8'))
                    print("実行結果の送信を完了しました。")
                    
        except (ConnectionResetError, OSError) as e:
            print(f"接続断またはネットワークエラー: {e}")
        except KeyboardInterrupt:
            print("\n待機プロセスを終了します。")
            sys.exit(0)
        finally:
            print(f"{RECONNECT_DELAY}秒後に再接続待機へ移行...")
            time.sleep(RECONNECT_DELAY)

if __name__ == '__main__':
    run_command_listener()

接続側(コマンド制御ノード)の実装

接続側は待機側へ向けて一時的なコネクションを確立します。コマンド送信後、ストリームから実行結果をチャンク単位で受信し、バッファに蓄積します。通信が正常に終了するとソケットを閉じ、次のコマンド実行のために新しいセッションを確立するフローを繰り返します。

import socket
import time

CONTROLLER_HOST = '127.0.0.1' # 待機ノードのIPを指定
CONTROLLER_PORT = 9090
BUFFER_SIZE = 4096
SESSION_TIMEOUT = 60

def dispatch_commands(targets):
    for cmd in targets:
        print(f"\n--- コマンド送信: '{cmd}' ---")
        try:
            with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as ctrl_sock:
                ctrl_sock.settimeout(SESSION_TIMEOUT)
                ctrl_sock.connect((CONTROLLER_HOST, CONTROLLER_PORT))
                print("待機ノードへ接続成功")
                
                # コマンド送信
                ctrl_sock.sendall(f"{cmd}\n".encode('utf-8'))
                
                # 結果受信(チャンキング対応)
                response_data = []
                while True:
                    chunk = ctrl_sock.recv(BUFFER_SIZE)
                    if not chunk:
                        break
                    response_data.append(chunk.decode('utf-8'))
                    
                full_output = ''.join(response_data)
                print(f"受信データサイズ: {len(full_output)} バイト")
                print(full_output)
                
        except socket.timeout:
            print("警告: 結果受信がタイムアウトしました。")
        except ConnectionRefusedError:
            print("エラー: 接続が拒否されました。待機ノードが起動しているか確認してください。")
        except Exception as e:
            print(f"通信エラー: {e}")
        
        # 連続実行間のインターバル
        time.sleep(1)

if __name__ == '__main__':
    command_list = ['hostname', 'ipconfig', 'dir']
    dispatch_commands(command_list)

アーキテクチャの考慮点

  • セッションの独立性: 各コマンド実行ごとに独立的なTCPコネクションを確立・破棄することで、中途半端な状態のソケットが残留するリスクを排除しています。
  • リソース管理: コンテキストマネージャ(with 構文)を採用し、正常終了・異常終了問わずファイルディスクリプタとポートが確実にOSへ返却されるようにしています。
  • 実行環境の隔離: subprocess.runcapture_output=True を利用することで、標準出力と標準エラー出力を分離し、デッドロックや出力混在を防ぎます。タイムアウト設定によりハングアッププロセスの発生を抑制しています。
  • 再接続ロジック: 待機側は while True ループ内で例外を捕捉し、ネットワーク切断や制御側の再起動後も自動的にリスン状態へ復帰するフォールトトレラントな構成としています。

タグ: python-socket reverse-connection subprocess-execution tcp-networking fault-tolerant-sockets

5月25日 05:06 投稿