POM
<dependency>
<groupId>org.zeromq</groupId>
<artifactId>jeromq</artifactId>
<version>0.4.3</version>
</dependency>
一、リクエスト/レスポンスパターン(ZMQ_REQ + ZMQ_REP)
1. REPサーバー
package com.example.zmq.repreq;
import org.zeromq.ZMQ;
public abstract class ZmqRepServer implements Runnable {
/**
* ZMQスレッド数
*/
private int zmqThreadCount = 1;
/**
* ZMQポート番号
*/
private int zmqPort;
/**
* ZMQIPアドレス
*/
private String zmqIp;
private ZMQ.Context context;
private ZMQ.Socket repSocket;
public ZmqRepServer(String zmqIp, int zmqPort) {
this.zmqIp = zmqIp;
this.zmqPort = zmqPort;
initializeZmq();
}
/**
* ZMQオブジェクトの初期化
*/
private void initializeZmq() {
if (context == null) {
context = ZMQ.context(zmqThreadCount);
}
if (zmqPort != 0) {
repSocket = context.socket(ZMQ.REP);
String bindUri = "tcp://" + zmqIp + ":" + zmqPort;
repSocket.bind(bindUri);
} else {
throw new RuntimeException("エラー: ポート番号が不正です");
}
}
@Override
public void run() {
while (true) {
try {
byte[] receivedData = repSocket.recv();
if (receivedData == null) {
continue;
}
if (new String(receivedData).equals("TERMINATE")) {
break;
}
processMessage(receivedData, repSocket);
} catch (Exception e) {
e.printStackTrace();
}
}
// リソースの解放
repSocket.close();
context.term();
}
/**
* 受信データを処理する抽象メソッド
*/
public abstract void processMessage(byte[] data, ZMQ.Socket socket);
}
2. REQクライアント
import org.zeromq.ZMQ;
public class ZmqReqClient {
/**
* ZMQスレッド数
*/
private int zmqThreadCount = 1;
/**
* ZMQサーバーポート
*/
private int zmqServerPort;
private String zmqServerIp;
private ZMQ.Context context;
private ZMQ.Socket reqSocket;
public ZmqReqClient(String zmqServerIp, int zmqServerPort) {
this.zmqServerIp = zmqServerIp;
this.zmqServerPort = zmqServerPort;
initializeZmq();
}
/**
* ZMQオブジェクトの初期化
*/
private void initializeZmq() {
if (context == null) {
context = ZMQ.context(zmqThreadCount);
}
if (zmqServerPort != 0) {
reqSocket = context.socket(ZMQ.REQ);
String connectUri = "tcp://" + zmqServerIp + ":" + zmqServerPort;
reqSocket.connect(connectUri);
} else {
throw new RuntimeException("エラー: ポート番号が不正です");
}
}
public void sendTerminateSignal() {
// 終了信号の送信
reqSocket.send("TERMINATE".getBytes());
// リソースの解放
reqSocket.close();
context.term();
}
public byte[] sendMessage(byte[] message) {
reqSocket.send(message);
byte[] response = reqSocket.recv();
return response;
}
public void sendAndReceive(byte[] message) {
reqSocket.send(message);
byte[] response = reqSocket.recv();
System.out.println("サーバーからの応答: " + new String(response));
// 終了信号の送信
reqSocket.send("TERMINATE".getBytes());
// リソースの解放
reqSocket.close();
context.term();
}
}
3. テスト
REPサーバー
public class RepServer {
public static void main(String[] args) {
ZmqRepServer zmqRepServer = new ZmqRepServer("*", 8888) {
@Override
public void processMessage(byte[] data, ZMQ.Socket socket) {
System.out.println("受信したリクエスト: " + new String(data));
String response = "受信しました: " + new String(data);
socket.send(response.getBytes());
}
};
Thread serverThread = new Thread(zmqRepServer);
serverThread.start();
}
}
REQクライアント
public class ReqClient {
public static void main(String[] args) throws InterruptedException {
ZmqReqClient zmqReqClient = new ZmqReqClient("127.0.0.1", 8888);
for (int i = 0; i < 100; i++) {
String request = "こんにちは、時刻は " + System.currentTimeMillis();
byte[] responseData = zmqReqClient.sendMessage(request.getBytes());
System.out.println("サーバーからの応答: " + new String(responseData));
Thread.sleep(1000);
}
zmqReqClient.sendTerminateSignal();
}
}
二、パブリッシュ/サブスクライブパターン(ZMQ_PUB + ZMQ_SUB)
1. PUBクライアント
import org.zeromq.ZMQ;
public class ZmqPubClient {
/**
* ZMQスレッド数
*/
private int zmqThreadCount = 1;
/**
* ZMQパブリッシュポート
*/
private int zmqPublishPort;
private String zmqPublishIp;
private ZMQ.Context context;
private static ZMQ.Socket pubSocket;
public ZmqPubClient(String zmqPublishIp, int zmqPublishPort) {
this.zmqPublishIp = zmqPublishIp;
this.zmqPublishPort = zmqPublishPort;
initializeZmq();
}
/**
* ZMQオブジェクトの初期化
*/
private void initializeZmq() {
if (zmqPublishIp == null || zmqPublishIp.isEmpty()) {
throw new RuntimeException("エラー: IPアドレスが不正です");
}
if (context == null) {
context = ZMQ.context(zmqThreadCount);
}
if (zmqPublishPort != 0) {
pubSocket = context.socket(ZMQ.PUB);
String bindUri = "tcp://" + zmqPublishIp + ":" + zmqPublishPort;
pubSocket.bind(bindUri);
// 初期化用の空メッセージを送信
pubSocket.send("");
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
} else {
throw new RuntimeException("エラー: ポート番号が不正です");
}
}
public void publishData(byte[] message) {
pubSocket.send(message, ZMQ.NOBLOCK);
}
}
2. SUBクライアント
import org.zeromq.ZMQ;
/**
* ZMQサブスクライバースレッド
*/
public abstract class ZmqSubClient implements Runnable {
/**
* ZMQスレッド数
*/
private int zmqThreadCount = 1;
/**
* ZMQ受信ポート
*/
private int zmqReceivePort;
/**
* ZMQ受信IPアドレス
*/
private String zmqReceiveIp;
private ZMQ.Context context;
private ZMQ.Socket subSocket;
public ZmqSubClient() {
initializeZmq();
}
public ZmqSubClient(String zmqReceiveIp, int zmqReceivePort) {
this.zmqReceiveIp = zmqReceiveIp;
this.zmqReceivePort = zmqReceivePort;
initializeZmq();
}
/**
* ZMQオブジェクトの初期化
*/
public void initializeZmq() {
if (zmqReceiveIp == null || zmqReceiveIp.isEmpty()) {
throw new RuntimeException("エラー: IPアドレスが不正です");
}
if (zmqReceivePort == 0) {
throw new RuntimeException("エラー: ポート番号が不正です");
}
context = ZMQ.context(zmqThreadCount);
subSocket = context.socket(ZMQ.SUB);
String connectUri = "tcp://" + zmqReceiveIp + ":" + zmqReceivePort;
subSocket.connect(connectUri);
subSocket.subscribe("".getBytes());
}
@Override
public void run() {
while (true) {
try {
byte[] receivedData = subSocket.recv(ZMQ.SUB);
if (receivedData == null) {
continue;
}
handleMessage(receivedData);
} catch (Exception e) {
e.printStackTrace();
}
}
}
/**
* 受信データを処理する抽象メソッド
*/
public abstract void handleMessage(byte[] data);
}
3. テスト
PUBクライアント
public class Publisher {
public static void main(String[] args) throws InterruptedException {
ZmqPubClient zmqPubClient = new ZmqPubClient("127.0.0.1", 7777);
for (int i = 0; i < 1000; i++) {
String data = "データ: " + System.currentTimeMillis() + "\t" + i;
zmqPubClient.publishData(data.getBytes());
System.out.println("パブリッシュしたデータ: " + data);
Thread.sleep(1000);
}
}
}
SUBクライアント
public class Subscriber {
public static void main(String[] args) {
ZmqSubClient zmqSubClient = new ZmqSubClient("127.0.0.1", 7777) {
@Override
public void handleMessage(byte[] data) {
System.out.println("受信したデータ: " + new String(data));
}
};
Thread subscriberThread = new Thread(zmqSubClient);
subscriberThread.start();
}
}