ZeroMQ: Javaにおけるリクエスト/レスポンスとパブリッシュ/サブスクライブパターンの簡単な実装

POM


    
    <dependency>
        <groupId>org.zeromq</groupId>
        <artifactId>jeromq</artifactId>
        <version>0.4.3</version>
    </dependency>

一、リクエスト/レスポンスパターン(ZMQ_REQ + ZMQ_REP)

1. REPサーバー


package com.example.zmq.repreq;

import org.zeromq.ZMQ;

public abstract class ZmqRepServer implements Runnable {
    /**
     * ZMQスレッド数
     */
    private int zmqThreadCount = 1;

    /**
     * ZMQポート番号
     */
    private int zmqPort;

    /**
     * ZMQIPアドレス
     */
    private String zmqIp;

    private ZMQ.Context context;
    private ZMQ.Socket repSocket;

    public ZmqRepServer(String zmqIp, int zmqPort) {
        this.zmqIp = zmqIp;
        this.zmqPort = zmqPort;
        initializeZmq();
    }

    /**
     * ZMQオブジェクトの初期化
     */
    private void initializeZmq() {
        if (context == null) {
            context = ZMQ.context(zmqThreadCount);
        }
        if (zmqPort != 0) {
            repSocket = context.socket(ZMQ.REP);
            String bindUri = "tcp://" + zmqIp + ":" + zmqPort;
            repSocket.bind(bindUri);
        } else {
            throw new RuntimeException("エラー: ポート番号が不正です");
        }
    }

    @Override
    public void run() {
        while (true) {
            try {
                byte[] receivedData = repSocket.recv();
                if (receivedData == null) {
                    continue;
                }
                if (new String(receivedData).equals("TERMINATE")) {
                    break;
                }
                processMessage(receivedData, repSocket);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        // リソースの解放
        repSocket.close();
        context.term();
    }

    /**
     * 受信データを処理する抽象メソッド
     */
    public abstract void processMessage(byte[] data, ZMQ.Socket socket);
}

2. REQクライアント


import org.zeromq.ZMQ;

public class ZmqReqClient {
    /**
     * ZMQスレッド数
     */
    private int zmqThreadCount = 1;

    /**
     * ZMQサーバーポート
     */
    private int zmqServerPort;
    private String zmqServerIp;

    private ZMQ.Context context;
    private ZMQ.Socket reqSocket;

    public ZmqReqClient(String zmqServerIp, int zmqServerPort) {
        this.zmqServerIp = zmqServerIp;
        this.zmqServerPort = zmqServerPort;
        initializeZmq();
    }

    /**
     * ZMQオブジェクトの初期化
     */
    private void initializeZmq() {
        if (context == null) {
            context = ZMQ.context(zmqThreadCount);
        }
        if (zmqServerPort != 0) {
            reqSocket = context.socket(ZMQ.REQ);
            String connectUri = "tcp://" + zmqServerIp + ":" + zmqServerPort;
            reqSocket.connect(connectUri);
        } else {
            throw new RuntimeException("エラー: ポート番号が不正です");
        }
    }

    public void sendTerminateSignal() {
        // 終了信号の送信
        reqSocket.send("TERMINATE".getBytes());
        // リソースの解放
        reqSocket.close();
        context.term();
    }

    public byte[] sendMessage(byte[] message) {
        reqSocket.send(message);
        byte[] response = reqSocket.recv();
        return response;
    }

    public void sendAndReceive(byte[] message) {
        reqSocket.send(message);
        byte[] response = reqSocket.recv();
        System.out.println("サーバーからの応答: " + new String(response));
        // 終了信号の送信
        reqSocket.send("TERMINATE".getBytes());
        // リソースの解放
        reqSocket.close();
        context.term();
    }
}

3. テスト

REPサーバー


public class RepServer {
    public static void main(String[] args) {
        ZmqRepServer zmqRepServer = new ZmqRepServer("*", 8888) {
            @Override
            public void processMessage(byte[] data, ZMQ.Socket socket) {
                System.out.println("受信したリクエスト: " + new String(data));
                String response = "受信しました: " + new String(data);
                socket.send(response.getBytes());
            }
        };
        Thread serverThread = new Thread(zmqRepServer);
        serverThread.start();
    }
}

REQクライアント


public class ReqClient {
    public static void main(String[] args) throws InterruptedException {
        ZmqReqClient zmqReqClient = new ZmqReqClient("127.0.0.1", 8888);
        for (int i = 0; i < 100; i++) {
            String request = "こんにちは、時刻は " + System.currentTimeMillis();
            byte[] responseData = zmqReqClient.sendMessage(request.getBytes());
            System.out.println("サーバーからの応答: " + new String(responseData));
            Thread.sleep(1000);
        }
        zmqReqClient.sendTerminateSignal();
    }
}

二、パブリッシュ/サブスクライブパターン(ZMQ_PUB + ZMQ_SUB)

1. PUBクライアント


import org.zeromq.ZMQ;

public class ZmqPubClient {
    /**
     * ZMQスレッド数
     */
    private int zmqThreadCount = 1;

    /**
     * ZMQパブリッシュポート
     */
    private int zmqPublishPort;
    private String zmqPublishIp;

    private ZMQ.Context context;
    private static ZMQ.Socket pubSocket;

    public ZmqPubClient(String zmqPublishIp, int zmqPublishPort) {
        this.zmqPublishIp = zmqPublishIp;
        this.zmqPublishPort = zmqPublishPort;
        initializeZmq();
    }

    /**
     * ZMQオブジェクトの初期化
     */
    private void initializeZmq() {
        if (zmqPublishIp == null || zmqPublishIp.isEmpty()) {
            throw new RuntimeException("エラー: IPアドレスが不正です");
        }
        if (context == null) {
            context = ZMQ.context(zmqThreadCount);
        }
        if (zmqPublishPort != 0) {
            pubSocket = context.socket(ZMQ.PUB);
            String bindUri = "tcp://" + zmqPublishIp + ":" + zmqPublishPort;
            pubSocket.bind(bindUri);
            // 初期化用の空メッセージを送信
            pubSocket.send("");
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        } else {
            throw new RuntimeException("エラー: ポート番号が不正です");
        }
    }

    public void publishData(byte[] message) {
        pubSocket.send(message, ZMQ.NOBLOCK);
    }
}

2. SUBクライアント


import org.zeromq.ZMQ;

/**
 * ZMQサブスクライバースレッド
 */
public abstract class ZmqSubClient implements Runnable {

    /**
     * ZMQスレッド数
     */
    private int zmqThreadCount = 1;

    /**
     * ZMQ受信ポート
     */
    private int zmqReceivePort;

    /**
     * ZMQ受信IPアドレス
     */
    private String zmqReceiveIp;

    private ZMQ.Context context;
    private ZMQ.Socket subSocket;

    public ZmqSubClient() {
        initializeZmq();
    }

    public ZmqSubClient(String zmqReceiveIp, int zmqReceivePort) {
        this.zmqReceiveIp = zmqReceiveIp;
        this.zmqReceivePort = zmqReceivePort;
        initializeZmq();
    }

    /**
     * ZMQオブジェクトの初期化
     */
    public void initializeZmq() {
        if (zmqReceiveIp == null || zmqReceiveIp.isEmpty()) {
            throw new RuntimeException("エラー: IPアドレスが不正です");
        }
        if (zmqReceivePort == 0) {
            throw new RuntimeException("エラー: ポート番号が不正です");
        }

        context = ZMQ.context(zmqThreadCount);
        subSocket = context.socket(ZMQ.SUB);
        String connectUri = "tcp://" + zmqReceiveIp + ":" + zmqReceivePort;
        subSocket.connect(connectUri);
        subSocket.subscribe("".getBytes());
    }

    @Override
    public void run() {
        while (true) {
            try {
                byte[] receivedData = subSocket.recv(ZMQ.SUB);
                if (receivedData == null) {
                    continue;
                }
                handleMessage(receivedData);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    /**
     * 受信データを処理する抽象メソッド
     */
    public abstract void handleMessage(byte[] data);
}

3. テスト

PUBクライアント


public class Publisher {
    public static void main(String[] args) throws InterruptedException {
        ZmqPubClient zmqPubClient = new ZmqPubClient("127.0.0.1", 7777);
        for (int i = 0; i < 1000; i++) {
            String data = "データ: " + System.currentTimeMillis() + "\t" + i;
            zmqPubClient.publishData(data.getBytes());
            System.out.println("パブリッシュしたデータ: " + data);
            Thread.sleep(1000);
        }
    }
}

SUBクライアント


public class Subscriber {
    public static void main(String[] args) {
        ZmqSubClient zmqSubClient = new ZmqSubClient("127.0.0.1", 7777) {
            @Override
            public void handleMessage(byte[] data) {
                System.out.println("受信したデータ: " + new String(data));
            }
        };
        Thread subscriberThread = new Thread(zmqSubClient);
        subscriberThread.start();
    }
}

タグ: ZeroMQ jeromq Java リクエスト/レスポンス パブリッシュ/サブスクライブ

7月4日 17:30 投稿