1. RocketMQのインストールと設定
1.1 Java環境の準備
RocketMQはJavaベースのメッセージミドルウェアであるため、動作にはJavaランタイムが必要です。
1.2 RocketMQのインストール手順
1.2.1 ファイルのダウンロードと展開
mkdir -p /home/data/rocketmq
cd /home/data/rocketmq
wget https://mirrors.bfsu.edu.cn/apache/rocketmq/4.8.0/rocketmq-all-4.8.0-bin-release.zip
unzip rocketmq-all-4.8.0-bin-release.zip
1.2.2 シンボリックリンクとストレージディレクトリの作成
cd /usr/local
ln -s /home/data/rocketmq/rocketmq-all-4.8.0-bin-release rocketmq
mkdir -p /usr/local/rocketmq/{store/{commitlog,consumequeue,index},logs}
1.2.3 設定ファイルの調整
Broker設定ファイルの編集(broker-a.properties):
brokerClusterName=rocketmq-cluster
brokerName=broker-a
brokerId=0
namesrvAddr=124.70.143.9:9876
brokerIP1=124.70.143.9
listenPort=10911
autoCreateTopicEnable=true
storePathRootDir=/usr/local/rocketmq/store
storePathCommitLog=/usr/local/rocketmq/store/commitlog
ログ設定の置換:
cd /usr/local/rocketmq/conf
sed -i 's#${user.home}#/usr/local/rocketmq#g' *.xml
JVMメモリ設定の変更(runserver.shとrunbroker.sh):
JAVA_OPT="${JAVA_OPT} -server -Xms512m -Xmx512m -Xmn128m"
1.2.4 サービスの起動と確認
# NameServerの起動
nohup sh /usr/local/rocketmq/bin/mqnamesrv >/dev/null 2>&1 &
# Brokerの起動
nohup sh /usr/local/rocketmq/bin/mqbroker -c /usr/local/rocketmq/conf/2m-2s-async/broker-a.properties -n localhost:9876 >/dev/null 2>&1 &
# 状態確認
jps
sh /usr/local/rocketmq/bin/mqadmin clusterList -n localhost:9876
2. RocketMQ管理コンソールの構築
2.1 Mavenのインストール
mkdir -p /home/data/maven
cd /home/data/maven
wget http://archive.apache.org/dist/maven/maven-3/3.6.0/binaries/apache-maven-3.6.0-bin.zip
unzip apache-maven-3.6.0-bin.zip
ln -s /home/data/maven/apache-maven-3.6.0 /usr/local/maven
環境変数の設定:
export MAVEN_HOME=/usr/local/maven
export PATH=$MAVEN_HOME/bin:$PATH
source /etc/profile
2.2 コンソールアプリケーションの構築
mkdir -p /home/data/rocketmq-console
cd /home/data/rocketmq-console
# ソースコードの展開と設定ファイルの編集
cd rocketmq-console
vim ./src/main/resources/application.properties
設定ファイルの例:
server.contextPath=/rocketmq
server.port=8099
rocketmq.config.namesrvAddr=124.70.143.9:9876
mvn clean package -Dmaven.test.skip=true
java -jar target/rocketmq-console-ng-2.0.0.jar
3. Javaクライアントによるメッセージ送受信
Maven依存関係
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.8.0</version>
</dependency>
メッセージ送信クラス
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
public class MessageSender {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("TestProducerGroup");
producer.setNamesrvAddr("124.70.143.9:9876");
producer.setVipChannelEnabled(false);
producer.start();
for (int i = 0; i < 2; i++) {
Message msg = new Message("TestTopic", "TagA", ("Hello RocketMQ " + i).getBytes());
producer.send(msg);
}
producer.shutdown();
}
}
メッセージ受信クラス
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.*;
import org.apache.rocketmq.common.message.MessageExt;
public class MessageReceiver {
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("TestConsumerGroup");
consumer.setNamesrvAddr("124.70.143.9:9876");
consumer.subscribe("TestTopic", "*");
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
for (MessageExt msg : msgs) {
System.out.println("Received message: " + new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
consumer.start();
}
}
4. 常見エラー対応
- メモリ不足エラー: JVMメモリ設定を適切に調整
- ルート情報なしエラー: RocketMQクライアントライブラリのバージョンを統一
- タイムアウトエラー: BrokerのIP設定を確認
- ポート競合エラー: application.propertiesのserver.address設定をコメントアウト