安定した IoT 通信基盤の必要性
既存の HTTP や WebSocket を経由した通信方式では、サーバーのアプリケーションプール回収などにより接続が不安定になる事例が見受けられます。特に GPS 端末のような常時接続が求められる IoT デバイスとの通信においては、TCP ソケットを直接使用し、Apache Mina フレームワークを活用することで、より信頼性の高い非同期通信サーバーを構築することが可能です。
プロトコルの解析と仕様確認
既存の GPS 端末(GT02D 等)との通信を実現するためには、まず通信プロトコルの理解が不可欠です。提供されたバイナリファイルを逆コンパイルし、通信仕様を分析した結果、主なやり取りは以下の 3 種類に分類されました。
- ログイン処理: 接続確立後、端末識別子(IMEI 等)を含むパケットを送信し、サーバーが承認応答を返します。
- 心跳パケット(ハートビート): 接続維持のため、一定間隔で送信される生存確認パケットです。
- 位置情報パケット: 認証完了後、緯度、経度、速度、方位などのデータを送信します。
当初 UDP 通信が想定されていましたが、実際の端末仕様では TCP 通信が必須であることが判明しました。そのため、Mina の NioSocketAcceptor を使用して TCP リスナーを設定します。
Apache Mina によるサーバー実装
Mina は Java NIO を抽象化したイベント駆動型のフレームワークです。フィルターチェーンにログ出力とプロトコルコーデックを設定し、メインのハンドラーでメッセージ処理を行います。非同期処理の特性上、messageReceived メソッドでは受信データのキューイングのみを行い、実際のビジネスロジックは別スレッドで処理する構成が推奨されます。
サーバー起動設定
サーバーの初期化クラスでは、アクセプターの設定とフィルターチェーンの構築を行います。ログ出力フィルターと、カスタムコーデックファクトリーを登録します。
public class TcpServerBootstrap {
private static final Logger logger = Logger.getLogger(TcpServerBootstrap.class);
private MessageDispatchThread processor;
public void start() throws IOException {
// 設定の初期化
ConfigManager.loadSettings();
// NIO アクセプターの生成
IoAcceptor acceptor = new NioSocketAcceptor();
// フィルターチェーンの設定
acceptor.getFilterChain().addLast("logger", new LoggingFilter());
acceptor.getFilterChain().addLast("codec", new ProtocolCodecFilter(new GpsCodecFactory()));
// ハンドラーの設定
acceptor.setHandler(new ServerIoHandler(this));
// セッション設定
acceptor.getSessionConfig().setReadBufferSize(2048);
acceptor.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 10);
// ポートバインド
acceptor.bind(new InetSocketAddress(ConfigManager.TCP_PORT));
// ログ設定
PropertyConfigurator.configure("config/log4j.properties");
// 非同期処理スレッドの起動
this.processor = new MessageDispatchThread();
this.processor.start();
logger.info("GPS Server started on port: " + ConfigManager.TCP_PORT);
}
public void queueMessage(MessagePacket packet) {
if (processor != null) {
processor.enqueue(packet);
}
}
}
プロトコルデコーダーの実装
受信したバイト列を解析し、プロトコル種別ごとにオブジェクトへ変換します。ヘッダーの検証、長さの読み取り、CRC チェックサムの計算などを行い、不正なパケットは破棄します。
public class GpsMessageDecoder extends CumulativeProtocolDecoder {
private static final Logger logger = Logger.getLogger(GpsMessageDecoder.class);
private static final byte HEADER_START = (byte) 0x78;
@Override
public boolean doDecode(IoSession session, IoBuffer input, ProtocolDecoderOutput out) {
try {
if (input.remaining() < 4) {
return false;
}
input.mark();
// ヘッダー検証
byte[] headerMark = new byte[2];
input.get(headerMark);
if (headerMark[0] != HEADER_START) {
input.reset();
return false;
}
// パケット長とプロトコル ID の読み取り
byte[] lengthBytes = new byte[2];
input.get(lengthBytes);
int protocolId = lengthBytes[1] & 0xFF;
// ペイロードの読み取り準備
MessagePacket packet = new MessagePacket();
packet.setProtocolId(protocolId);
packet.setSession(session);
switch (protocolId) {
case 0x01: // ログイン
handleLoginPacket(input, packet);
break;
case 0x02: // 心跳
handleHeartbeatPacket(input, packet);
break;
case 0x12: // 位置情報
handleLocationPacket(input, packet);
break;
default:
logger.warn("Unknown protocol ID: " + protocolId);
return false;
}
out.write(packet);
return true;
} catch (Exception e) {
logger.error("Decode failed", e);
return false;
}
}
private void handleLoginPacket(IoBuffer input, MessagePacket packet) {
byte[] deviceData = new byte[18];
input.get(deviceData);
// 端末 ID の抽出(CRC 検証などを含む)
String deviceId = extractDeviceId(deviceData);
packet.setAttribute("deviceId", deviceId);
// 応答データの生成
byte[] responsePayload = buildLoginResponse(deviceData);
packet.setResponsePayload(responsePayload);
}
private String extractDeviceId(byte[] data) {
// 具体的なバイト変換ロジック
return HexUtil.bytesToHex(data, 12, 8);
}
private byte[] buildLoginResponse(byte[] originalData) {
// CRC 計算および応答パケット構築
// 省略
return new byte[8];
}
private void handleHeartbeatPacket(IoBuffer input, MessagePacket packet) {
// 心跳パケット処理
packet.setResponsePayload(new byte[]{0x78, 0x02, 0x00, 0x01, 0x0D, 0x0A});
}
private void handleLocationPacket(IoBuffer input, MessagePacket packet) {
// 位置情報パケット処理
// 緯度経度の解析ロジック
}
}
メッセージ処理スレッド
受信したメッセージをキューに蓄積し、順次処理します。これにより、I/O スレッドがブロックされるのを防ぎ、サーバーの応答性を維持します。
public class MessageDispatchThread extends Thread {
private BlockingQueue<MessagePacket> queue = new LinkedBlockingQueue<>();
public void enqueue(MessagePacket packet) {
queue.offer(packet);
}
@Override
public void run() {
while (!Thread.interrupted()) {
try {
MessagePacket msg = queue.take();
MessagePacket response = processBusinessLogic(msg);
IoSession session = msg.getSession();
if (response != null && session != null && session.isConnected()) {
// ヘッダー情報の継承
response.copyHeaderFrom(msg);
session.write(response);
}
} catch (InterruptedException e) {
break;
} catch (Exception e) {
System.err.println("Processing error: " + e.getMessage());
}
}
}
private MessagePacket processBusinessLogic(MessagePacket msg) {
// データベース保存などの処理
// 応答が必要な場合にパケットを返す
return msg.generateResponse();
}
}
プロトコルエンコーダー
サーバーから端末へ送信する応答データをバイト列へ変換します。
public class GpsMessageEncoder extends ProtocolEncoderAdapter {
private static final Logger logger = Logger.getLogger(GpsMessageEncoder.class);
@Override
public void encode(IoSession session, Object message, ProtocolEncoderOutput out) {
if (!(message instanceof MessagePacket)) {
return;
}
MessagePacket packet = (MessagePacket) message;
packet.prepareOutput();
IoBuffer buffer = IoBuffer.allocate(packet.getTotalLength()).setAutoExpand(true);
// ヘッダー書き込み
buffer.put(packet.getHeaderBytes());
// ペイロード書き込み
byte[] payload = packet.getPayloadBytes();
if (payload != null) {
buffer.put(payload);
}
buffer.flip();
logger.debug("Sending packet: " + buffer.toString());
out.write(buffer);
}
}
実装結果とデータ連携
上記の実装により、端末からのログイン要求に対して正しく応答し、継続的な心跳パケットのやり取りが可能になりました。その後、送信される位置情報パケットを解析し、緯度・経度データを抽出してデータベースへ格納するフローを完成させました。コンソール上に座標データが出力され、地図上への軌跡描画基盤が整ったことで、安定した IoT 通信サーバーとしての役割を果たせる状態となっています。この仕組みは、既存の注文配送システムなど、リアルタイム位置情報が必要な他のアプリケーションへの統合も視野に入れています。