物理シリアルポートはPCに接続され、シリアルポート名によって一意に識別されます。
この場合、物理シリアルポートはそのポート名によって単一のスレッドまたはプロセスインスタンスのみが占有でき、他のスレッドやプロセスは同じポート名で物理シリアルポートと通信できません。これをシリアルポートの排他性と呼びます。
解決策: 核心思想:コンピュータソフトウェアにおけるソケットプログラミングを利用し、1つのソケットサーバーが複数のソケットクライアントを接続でき、ソケットサーバーが複数のソケットクライアントと物理シリアルポート間の通信を処理します。
実装プロセス: 1、物理シリアルポートのシリアルポート名に基づいてシリアルポート操作クラスをインスタンス化し、このクラスが物理シリアルポートとの通信を担当します。シリアルポート書き込みスレッドとシリアルポート読み取りスレッドを確立します。その中で、シリアルポート読み取りスレッドは継続的に物理シリアルポートの出力を収集し、読み取りバッファに保存します。シリアルポート書き込みスレッドは継続的に書き込みバッファからコマンドを取得し、物理シリアルポートに送信します。 2、信頼性の高いSocket Serverを構築し、Socket Clientが接続した際に、読み取りバッファ内のデータをSocket Clientに送信し、継続的にSocket Clientから送信されたコマンドを収集して書き込みバッファに保存します。 3、プログラミング言語のスレッド/プロセスはSocket Clientを構築してSocket Serverに接続することで、複数のスレッド/プロセスが物理シリアルポートとの通信を実現できます
以下ではバッファリング機構を実装せず、読み取ったシリアルデータをキューに保存する方式を示します:
ソケットサーバー
#!/usr/bin/python
# -*- coding: utf-8 -*-
import socket
import psutil
import traceback
import threading
import SocketServer
import json
import sys
import Queue
import time
from serial import Serial
from SocketServer import StreamRequestHandler as SRH
from CustomStringIO import CustomStringIO
SERIAL_PORT_REGISTRY = {}
class MainHandler(SRH):
def handle(self):
try:
print 'クライアント [%s] がサーバーに接続しました (ポート [%s])' % (self.client_address[0], self.client_address[1])
self.keep_alive = True
while self.keep_alive:
data = self.request.recv(4096 * 3)
if not data:
break
data_json = json.loads(data)
if "RequestType" in data_json:
if data_json["RequestType"] == "SerialDeviceAccess":
if "Port" in data_json["Args"]:
self.handle_serial_device(data_json, close_timeout=60)
break
else:
break
except Exception as e:
traceback.print_exc()
finally:
print '<------ シリアルソケットサーバー処理完了 ------>'
def handle_serial_device(self, data_json, close_timeout=60):
self.read_queue = Queue.Queue()
read_id = str(time.time())
if data_json["Args"]['Port'] in SERIAL_PORT_REGISTRY:
self.serial_device = SERIAL_PORT_REGISTRY[data_json["Args"]["Port"]]['serial']
self.serial_device.client_buffers.update({read_id:self.read_queue})
SERIAL_PORT_REGISTRY[data_json["Args"]["Port"]]['connection_count'] += 1
else:
self.serial_device = SerialManager(data_json["Args"]['Port'])
self.serial_device.client_buffers.update({read_id:self.read_queue})
SERIAL_PORT_REGISTRY.update({data_json["Args"]['Port']:{'serial':self.serial_device,'connection_count':1}})
print(str(SERIAL_PORT_REGISTRY))
th_serial_reader = threading.Thread(target=self.read_from_serial)
th_serial_reader.start()
no_data_received = False
while self.keep_alive:
try:
data = self.request.recv(4096 * 3)
print '入力データ: %s' % str(data)
except socket.error:
self.keep_alive = False
print "シリアル接続を閉じます"
break
else:
if data:
self.serial_device.write_data(data)
end_time = time.time() + close_timeout
# ソケットクライアントが閉じられた後、self.request.recvは常に空文字列を受け取るため、一定時間待機してから接続を閉じる
else:
if not no_data_received:
no_data_received = True
end_time = time.time() + close_timeout
if time.time() > end_time:
print 'クライアント接続タイムアウト'
print "シリアル接続を閉じます"
self.keep_alive = False
break
if SERIAL_PORT_REGISTRY[data_json["Args"]["Port"]]['connection_count'] > 0:
SERIAL_PORT_REGISTRY[data_json["Args"]["Port"]]['connection_count'] -= 1
SERIAL_PORT_REGISTRY[data_json["Args"]["Port"]]['serial'].client_buffers.pop(read_id)
print(str(SERIAL_PORT_REGISTRY))
if SERIAL_PORT_REGISTRY[data_json["Args"]["Port"]]['connection_count'] <= 0:
print('シリアルポートを解放します')
SERIAL_PORT_REGISTRY[data_json["Args"]["Port"]]['serial'].close_connection()
if data_json["Args"]['Port'] in SERIAL_PORT_REGISTRY:
SERIAL_PORT_REGISTRY.pop(data_json["Args"]['Port'])
def read_from_serial(self):
try:
while self.keep_alive:
serial_data = self.read_queue.get()
self.request.send(serial_data)
except socket.error:
pass
class ThreadingServer(SocketServer.ThreadingMixIn, SocketServer.TCPServer):
def _server_threading(self):
pass
class SerialSocketServer(object):
def __init__(self, port=33233):
self.server = None
self.port = 33233
def start(self):
network_interface_ips = self.get_network_interfaces()
for interface_ip in network_interface_ips:
host = interface_ip[1]
try:
port = self.port
address = (host, port)
self.server = ThreadingServer(address, MainHandler)
self.server.allow_reuse_address = True
server_thread = threading.Thread(target=self.server.serve_forever)
server_thread.daemon = True
server_thread.start()
print "シリアルソケットサーバーを起動しました!"
while True:
try:
user_input = raw_input()
except KeyboardInterrupt:
sys.exit(0)
break
except EOFError:
print '不明なファイル終端!'
continue
except Exception as e:
print "シリアルソケットサーバーの起動に失敗: %s" % e
def stop(self):
print "シリアルソケットサーバーを停止します!"
if self.server is not None:
self.server.shutdown()
self.server.server_close()
def get_network_interfaces(self):
"""ネットワークインターフェース情報とIPアドレスを取得
"""
interface_info = []
info = psutil.net_if_addrs()
for interface, addresses in info.items():
for address in addresses:
if address.family == 2 and not (address.address == '127.0.0.1' or address.address == '192.168.2.201'):
interface_info.append((interface, address.address))
return interface_info
class SerialManager():
def __init__(self, port=None, baudrate=115200, timeout=30, *args, **kargs):
self.serial = Serial(port=port, baudrate=baudrate, timeout=timeout, *args, **kargs)
self.is_active = True
self.read_buffer = ""
self.write_queue = Queue.Queue()
self.read_buffer = CustomStringIO(4096)
writer_thread = threading.Thread(target=self.__write_to_serial)
writer_thread.start()
reader_thread = threading.Thread(target=self.__read_from_serial)
reader_thread.start()
self.client_buffers = {}
def read_data(self, read_id):
return self.read_buffer.getvalue()
def __read_from_serial(self):
while self.is_active:
serial_data = self.serial.readline()
for client_id, buffer_queue in self.client_buffers.items():
self.client_buffers[client_id].put(serial_data)
def write_data(self, data_to_write):
self.write_queue.put(data_to_write)
def __write_to_serial(self):
while self.is_active:
data_to_send = self.write_queue.get()
self.serial.write(data_to_send)
def close_connection(self):
self.is_active = False
self.serial.close()
print 'シリアル接続を閉じました'
if __name__ == '__main__':
SerialSocketServer().start()
ソケットクライアント:
import threading
import socket
import traceback
import json
import sys
import re
import Queue
class SerialDeviceClient(threading.Thread):
def __init__(self, server_ip, server_port=33233, serial_port="COM19"):
threading.Thread.__init__(self)
self.serial_server_address = server_ip
self.server_port = server_port
self.serial_port = serial_port
self.buffer_size = 4096 * 4
self.setDaemon(True)
self._is_running = True
self._is_connected = False
self.client_socket = None
def connect_to_server(self):
try:
address = (self.serial_server_address, self.server_port)
self.client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.client_socket.connect(address)
self._is_connected = True
except Exception as e:
self._is_connected = False
print "ソケット接続に失敗: %s" % e
def run(self):
self.connect_to_server()
if self.is_connected:
request_message = json.dumps({"RequestType":"SerialDeviceAccess","Args":{"Port":self.serial_port}})
self.client_socket.send(request_message)
while self._is_running:
try:
response = self.client_socket.recv(self.buffer_size)
if not response:
continue
processed_response = re.compile('[\\x00-\\x08\\x0b-\\x0c\\x0e-\\x1f|\\xff]').sub(' ', response.decode('unicode-escape'))
print '%s' % str(processed_response)
except socket.error:
print 'ソケットエラー'
self.connect_to_server()
except:
traceback.print_exc()
print "------シリアル通信を停止------"
self.close_connection()
def close_connection(self):
try:
if self.client_socket:
self.client_socket.shutdown(socket.SHUT_RDWR)
self.client_socket.close()
except Exception as e:
print "ソケットクライアントのクローズに失敗: [%s]" % str(e)
@property
def is_connected(self):
return self._is_connected
def stop(self):
self._is_running = False
if __name__ == '__main__':
import getopt
options, arguments = getopt.getopt(sys.argv[1:], "h:s:")
server_ip = "localhost"
server_port = 33233
serial_port = None
for option, value in options:
if option == "-h":
server_ip = value
if option == '-s':
serial_port = value
if not serial_port:
print 'シリアルポート引数を指定してください: 例 -p COM19'
serial_client = SerialDeviceClient(server_ip=server_ip, server_port=server_port, serial_port=serial_port)
serial_client.start()
while True:
try:
user_input = raw_input()
serial_client.client_socket.send(user_input+'\n')
except KeyboardInterrupt:
sys.exit(0)
break
except EOFError:
print '不明なファイル終端!'
continue
サーバーの起動:python serial_socket_server.py
クライアントの起動:python serial_socket_client.py -h 対向IP -s シリアルポート名
コマンドラインから接続したいシリアルポートにコマンドを送信できます。
複数のクライアントで同じシリアルポートを読み書きでき、すべてのクライアントがシリアルポートにデータを送信できます;1つのクライアントがシリアルポートに入力データを送信した後、他のクライアントはすべてシリアルポートの出力を受信できます