ソケットを使用した単一シリアルポートの共有読み書き実装

物理シリアルポートはPCに接続され、シリアルポート名によって一意に識別されます。

この場合、物理シリアルポートはそのポート名によって単一のスレッドまたはプロセスインスタンスのみが占有でき、他のスレッドやプロセスは同じポート名で物理シリアルポートと通信できません。これをシリアルポートの排他性と呼びます。

解決策: 核心思想:コンピュータソフトウェアにおけるソケットプログラミングを利用し、1つのソケットサーバーが複数のソケットクライアントを接続でき、ソケットサーバーが複数のソケットクライアントと物理シリアルポート間の通信を処理します。

実装プロセス: 1、物理シリアルポートのシリアルポート名に基づいてシリアルポート操作クラスをインスタンス化し、このクラスが物理シリアルポートとの通信を担当します。シリアルポート書き込みスレッドとシリアルポート読み取りスレッドを確立します。その中で、シリアルポート読み取りスレッドは継続的に物理シリアルポートの出力を収集し、読み取りバッファに保存します。シリアルポート書き込みスレッドは継続的に書き込みバッファからコマンドを取得し、物理シリアルポートに送信します。 2、信頼性の高いSocket Serverを構築し、Socket Clientが接続した際に、読み取りバッファ内のデータをSocket Clientに送信し、継続的にSocket Clientから送信されたコマンドを収集して書き込みバッファに保存します。 3、プログラミング言語のスレッド/プロセスはSocket Clientを構築してSocket Serverに接続することで、複数のスレッド/プロセスが物理シリアルポートとの通信を実現できます

以下ではバッファリング機構を実装せず、読み取ったシリアルデータをキューに保存する方式を示します:

ソケットサーバー

#!/usr/bin/python
# -*- coding: utf-8 -*-

import socket
import psutil
import traceback
import threading
import SocketServer 
import json
import sys
import Queue
import time

from serial import Serial
from SocketServer import StreamRequestHandler as SRH
from CustomStringIO import  CustomStringIO

SERIAL_PORT_REGISTRY = {}

class MainHandler(SRH):
    
    def handle(self):
        try:
            print 'クライアント [%s] がサーバーに接続しました (ポート [%s])' % (self.client_address[0], self.client_address[1])
            self.keep_alive = True
            while self.keep_alive:
                data = self.request.recv(4096 * 3)
                if not data:
                    break
                data_json = json.loads(data)
                if "RequestType" in data_json:
                    if data_json["RequestType"] == "SerialDeviceAccess":
                        if "Port" in data_json["Args"]:
                            self.handle_serial_device(data_json, close_timeout=60)
                            break
                else:
                    break
        except Exception as e:
            traceback.print_exc()
        finally:
          print '<------ シリアルソケットサーバー処理完了 ------>'


    def handle_serial_device(self, data_json, close_timeout=60):
        self.read_queue = Queue.Queue()
        read_id = str(time.time())
        if data_json["Args"]['Port'] in SERIAL_PORT_REGISTRY:
            self.serial_device = SERIAL_PORT_REGISTRY[data_json["Args"]["Port"]]['serial']
            self.serial_device.client_buffers.update({read_id:self.read_queue})
            SERIAL_PORT_REGISTRY[data_json["Args"]["Port"]]['connection_count'] += 1
        else:
            self.serial_device = SerialManager(data_json["Args"]['Port'])
            self.serial_device.client_buffers.update({read_id:self.read_queue})
            SERIAL_PORT_REGISTRY.update({data_json["Args"]['Port']:{'serial':self.serial_device,'connection_count':1}})

        print(str(SERIAL_PORT_REGISTRY))
        th_serial_reader = threading.Thread(target=self.read_from_serial)
        th_serial_reader.start()
        
        no_data_received = False
        while self.keep_alive:
            try:
                data = self.request.recv(4096 * 3)
                print '入力データ: %s' % str(data)
            except socket.error:
                self.keep_alive = False
                print "シリアル接続を閉じます"
                break
            else: 
                if data:
                    self.serial_device.write_data(data)
                    end_time = time.time() + close_timeout
                # ソケットクライアントが閉じられた後、self.request.recvは常に空文字列を受け取るため、一定時間待機してから接続を閉じる
                else:
                    if not no_data_received:
                        no_data_received = True
                        end_time = time.time() + close_timeout
                    if time.time() > end_time:
                        print 'クライアント接続タイムアウト'
                        print "シリアル接続を閉じます"
                        self.keep_alive = False
                        break

        if SERIAL_PORT_REGISTRY[data_json["Args"]["Port"]]['connection_count'] > 0:
            SERIAL_PORT_REGISTRY[data_json["Args"]["Port"]]['connection_count'] -= 1
            SERIAL_PORT_REGISTRY[data_json["Args"]["Port"]]['serial'].client_buffers.pop(read_id)
        print(str(SERIAL_PORT_REGISTRY))

        if SERIAL_PORT_REGISTRY[data_json["Args"]["Port"]]['connection_count'] <= 0:
            print('シリアルポートを解放します')
            SERIAL_PORT_REGISTRY[data_json["Args"]["Port"]]['serial'].close_connection()
            if data_json["Args"]['Port'] in SERIAL_PORT_REGISTRY:
                SERIAL_PORT_REGISTRY.pop(data_json["Args"]['Port'])
        


    def read_from_serial(self):
        try:
            while self.keep_alive:
                serial_data = self.read_queue.get()
                self.request.send(serial_data)
        except socket.error:
            pass



class ThreadingServer(SocketServer.ThreadingMixIn, SocketServer.TCPServer):

    def _server_threading(self):
        pass


class SerialSocketServer(object):

    def __init__(self, port=33233):
        self.server = None
        self.port = 33233

    def start(self):
        network_interface_ips = self.get_network_interfaces()
        for interface_ip in network_interface_ips:
            host = interface_ip[1]

            try:
                port = self.port
                address = (host, port)
                self.server = ThreadingServer(address, MainHandler)
                self.server.allow_reuse_address = True
                server_thread = threading.Thread(target=self.server.serve_forever)
                server_thread.daemon = True
                server_thread.start()
                print "シリアルソケットサーバーを起動しました!"
                while True:
                    try:
                        user_input = raw_input()
                    except KeyboardInterrupt:
                        sys.exit(0)
                        break
                    except EOFError:
                        print '不明なファイル終端!'
                        continue

            except Exception as e:
                print "シリアルソケットサーバーの起動に失敗: %s" % e

    def stop(self):
        print "シリアルソケットサーバーを停止します!"
        if self.server is not None:
            self.server.shutdown()
            self.server.server_close()


    def get_network_interfaces(self):
        """ネットワークインターフェース情報とIPアドレスを取得

        """
        interface_info = []
        info = psutil.net_if_addrs()
        for interface, addresses in info.items():
            for address in addresses:
                if address.family == 2 and not (address.address == '127.0.0.1' or address.address == '192.168.2.201'):
                    interface_info.append((interface, address.address))
        return interface_info

class SerialManager():

    def __init__(self, port=None, baudrate=115200, timeout=30, *args, **kargs):
        self.serial = Serial(port=port, baudrate=baudrate, timeout=timeout, *args, **kargs)
        self.is_active = True
        self.read_buffer = ""
        self.write_queue = Queue.Queue()
        self.read_buffer = CustomStringIO(4096)
        writer_thread = threading.Thread(target=self.__write_to_serial)
        writer_thread.start()
        reader_thread = threading.Thread(target=self.__read_from_serial)
        reader_thread.start()
        self.client_buffers = {}

    def read_data(self, read_id):
        return self.read_buffer.getvalue()

    def __read_from_serial(self):
        while self.is_active:
            serial_data = self.serial.readline()
            for client_id, buffer_queue in self.client_buffers.items():
                self.client_buffers[client_id].put(serial_data)
       
    def write_data(self, data_to_write):
        self.write_queue.put(data_to_write)

    def __write_to_serial(self):
        while self.is_active:
            data_to_send = self.write_queue.get()
            self.serial.write(data_to_send)

    def close_connection(self):
        self.is_active = False
        self.serial.close()
        print 'シリアル接続を閉じました'

if __name__ == '__main__':
    SerialSocketServer().start()

ソケットクライアント:

import threading
import socket
import traceback
import json
import sys
import re
import Queue

class SerialDeviceClient(threading.Thread):

    def __init__(self, server_ip, server_port=33233, serial_port="COM19"):
        threading.Thread.__init__(self)
        self.serial_server_address = server_ip
        self.server_port = server_port
        self.serial_port = serial_port
        self.buffer_size = 4096 * 4
        self.setDaemon(True)
        self._is_running = True
        self._is_connected = False
        self.client_socket = None

    def connect_to_server(self):
        try:
            address = (self.serial_server_address, self.server_port)
            self.client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            self.client_socket.connect(address)
            self._is_connected = True
        except Exception as e:
            self._is_connected = False
            print "ソケット接続に失敗: %s" % e

    def run(self):
        self.connect_to_server()
        if self.is_connected:
            request_message = json.dumps({"RequestType":"SerialDeviceAccess","Args":{"Port":self.serial_port}})
            self.client_socket.send(request_message)
          
            while self._is_running:
                try:
                    response = self.client_socket.recv(self.buffer_size)
                    if not response:
                        continue
                    processed_response = re.compile('[\\x00-\\x08\\x0b-\\x0c\\x0e-\\x1f|\\xff]').sub(' ', response.decode('unicode-escape'))
                    print '%s' % str(processed_response)
                except socket.error:
                    print 'ソケットエラー'
                    self.connect_to_server()
                except:
                    traceback.print_exc()
            print "------シリアル通信を停止------"
            self.close_connection()  
       
    def close_connection(self):
        try:
            if self.client_socket:
                self.client_socket.shutdown(socket.SHUT_RDWR)
                self.client_socket.close()
        except Exception as e:
            print "ソケットクライアントのクローズに失敗: [%s]" % str(e)

    @property
    def is_connected(self):
        return self._is_connected

    def stop(self):
        self._is_running = False

if __name__ == '__main__':
    import getopt
    options, arguments = getopt.getopt(sys.argv[1:], "h:s:")
    server_ip = "localhost"
    server_port = 33233
    serial_port = None
    for option, value in options:
        if option == "-h":
            server_ip = value
        if option == '-s':
            serial_port = value

    if not serial_port:
        print 'シリアルポート引数を指定してください: 例 -p COM19'

    serial_client = SerialDeviceClient(server_ip=server_ip, server_port=server_port, serial_port=serial_port)
    serial_client.start()
    while True:
        try:
            user_input = raw_input()
            serial_client.client_socket.send(user_input+'\n')
        except KeyboardInterrupt:
            sys.exit(0)
            break
        except EOFError:
            print '不明なファイル終端!'
            continue

サーバーの起動:python serial_socket_server.py

クライアントの起動:python serial_socket_client.py -h 対向IP -s シリアルポート名

コマンドラインから接続したいシリアルポートにコマンドを送信できます。

複数のクライアントで同じシリアルポートを読み書きでき、すべてのクライアントがシリアルポートにデータを送信できます;1つのクライアントがシリアルポートに入力データを送信した後、他のクライアントはすべてシリアルポートの出力を受信できます

タグ: Python ソケットプログラミング シリアル通信 マルチスレッド シリアルポート

6月11日 23:32 投稿