RocketMQ シングルノード構築とJavaによるメッセージ送受信

1. RocketMQのインストールと設定

1.1 Java環境の準備

RocketMQはJavaベースのメッセージミドルウェアであるため、動作にはJavaランタイムが必要です。

1.2 RocketMQのインストール手順

1.2.1 ファイルのダウンロードと展開

mkdir -p /home/data/rocketmq
cd /home/data/rocketmq
wget https://mirrors.bfsu.edu.cn/apache/rocketmq/4.8.0/rocketmq-all-4.8.0-bin-release.zip
unzip rocketmq-all-4.8.0-bin-release.zip

1.2.2 シンボリックリンクとストレージディレクトリの作成

cd /usr/local
ln -s /home/data/rocketmq/rocketmq-all-4.8.0-bin-release rocketmq
mkdir -p /usr/local/rocketmq/{store/{commitlog,consumequeue,index},logs}

1.2.3 設定ファイルの調整

Broker設定ファイルの編集(broker-a.properties):

brokerClusterName=rocketmq-cluster
brokerName=broker-a
brokerId=0
namesrvAddr=124.70.143.9:9876
brokerIP1=124.70.143.9
listenPort=10911
autoCreateTopicEnable=true
storePathRootDir=/usr/local/rocketmq/store
storePathCommitLog=/usr/local/rocketmq/store/commitlog

ログ設定の置換:

cd /usr/local/rocketmq/conf
sed -i 's#${user.home}#/usr/local/rocketmq#g' *.xml

JVMメモリ設定の変更(runserver.shとrunbroker.sh):

JAVA_OPT="${JAVA_OPT} -server -Xms512m -Xmx512m -Xmn128m"

1.2.4 サービスの起動と確認

# NameServerの起動
nohup sh /usr/local/rocketmq/bin/mqnamesrv >/dev/null 2>&1 &

# Brokerの起動
nohup sh /usr/local/rocketmq/bin/mqbroker -c /usr/local/rocketmq/conf/2m-2s-async/broker-a.properties -n localhost:9876 >/dev/null 2>&1 &

# 状態確認
jps
sh /usr/local/rocketmq/bin/mqadmin clusterList -n localhost:9876

2. RocketMQ管理コンソールの構築

2.1 Mavenのインストール

mkdir -p /home/data/maven
cd /home/data/maven
wget http://archive.apache.org/dist/maven/maven-3/3.6.0/binaries/apache-maven-3.6.0-bin.zip
unzip apache-maven-3.6.0-bin.zip
ln -s /home/data/maven/apache-maven-3.6.0 /usr/local/maven

環境変数の設定:

export MAVEN_HOME=/usr/local/maven
export PATH=$MAVEN_HOME/bin:$PATH
source /etc/profile

2.2 コンソールアプリケーションの構築

mkdir -p /home/data/rocketmq-console
cd /home/data/rocketmq-console
# ソースコードの展開と設定ファイルの編集
cd rocketmq-console
vim ./src/main/resources/application.properties

設定ファイルの例:

server.contextPath=/rocketmq
server.port=8099
rocketmq.config.namesrvAddr=124.70.143.9:9876
mvn clean package -Dmaven.test.skip=true
java -jar target/rocketmq-console-ng-2.0.0.jar

3. Javaクライアントによるメッセージ送受信

Maven依存関係

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>4.8.0</version>
</dependency>

メッセージ送信クラス

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;

public class MessageSender {
    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("TestProducerGroup");
        producer.setNamesrvAddr("124.70.143.9:9876");
        producer.setVipChannelEnabled(false);
        producer.start();

        for (int i = 0; i < 2; i++) {
            Message msg = new Message("TestTopic", "TagA", ("Hello RocketMQ " + i).getBytes());
            producer.send(msg);
        }
        
        producer.shutdown();
    }
}

メッセージ受信クラス

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.*;
import org.apache.rocketmq.common.message.MessageExt;

public class MessageReceiver {
    public static void main(String[] args) throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("TestConsumerGroup");
        consumer.setNamesrvAddr("124.70.143.9:9876");
        consumer.subscribe("TestTopic", "*");

        consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
            for (MessageExt msg : msgs) {
                System.out.println("Received message: " + new String(msg.getBody()));
            }
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        });

        consumer.start();
    }
}

4. 常見エラー対応

  • メモリ不足エラー: JVMメモリ設定を適切に調整
  • ルート情報なしエラー: RocketMQクライアントライブラリのバージョンを統一
  • タイムアウトエラー: BrokerのIP設定を確認
  • ポート競合エラー: application.propertiesのserver.address設定をコメントアウト

タグ: RocketMQ Java messaging Apache rocketmq-console

5月23日 06:28 投稿