Apache RocketMQは、阿里巴巴グループが開発しオープンソース化した分散メッセージミドルウェアであり、現在ではApache Foundationのトップレベルプロジェクトとなっています。このシステムは、高スループット、高信頼性、低レイテンシーを必要とする金融グレードのユースケースに特化しています。
1. 基本概念
- 機能:分散型「パブリッシュ-サブスクライブ」メッセージシステム。
- 目的:大規模な分散システムにおける非同期通信、トラフィックの平準化、データの結合問題を解決する。
- 主要ユースケース:天猫の双十一イベントにおいて、1日に数兆件のメッセージを処理します。
2. アーキテクチャの構成要素
- NameServer
- 状態を持たない軽量なサービスディスカバリーセンター。
- ブローカーのルーティング情報を管理します(KafkaのZooKeeperに似ていますが、より軽量)。
- Broker
- メッセージのストレージと転送を行うノード。
- マスタースレーブモデル:マスターが読み書きを行い、スレーブがホットスタンバイとして動作します(同期または非同期複製をサポート)。
- CommitLog
- 全てのメッセージを単一の物理ファイルに順次書き込むという革新的な設計。
- この手法により、ディスクI/O効率が大幅に向上します(Kafkaのパーティションベースのストレージとは異なります)。
3. 主要機能
| **機能** |
**詳細** |
**ビジネス価値** |
| トランザクションメッセージ |
2段階コミット方式(半メッセージ+ローカルトランザクション実行) |
分散トランザクションの一貫性を保証 |
| シーケンシャルメッセージ |
同じキュー内で厳密なFIFOを維持 |
決済や注文ステータス変更などの順序が必要な場面で利用 |
| 遅延メッセージ |
18段階の事前定義された遅延時間(1秒〜2時間) |
注文タイムアウト閉鎖やタイマー仕様に適応 |
| メッセージフィルタリング |
TagまたはSQL92文法によるフィルタリングをサポート |
消費者の負荷を減らすための精密な配信 |
| メッセージ蓄積 |
単一サーバで数兆件のメッセージを蓄積可能(ディスク容量に依存) |
突発的なトラフィックに対抗 |
| 高可用性 |
Dledgerアルゴリズムに基づくマルチコピー選択 |
金融級の信頼性(SLA 99.9999%) |
| トレース機能 |
メッセージ生成から消費までの全プロセスを追跡 |
メッセージの紛失問題を迅速に特定 |
4. 適用例
- 金融決済
- シーン:支払い状態の同期(引き落としと注文ステータス変更の順序を保証)
- キーポイント:トランザクションメッセージ + シーケンシャルメッセージ
- EC大規模プロモーション
- シーン:秒間大量のオーダー受付バッファリング
- キーポイント:百万件以上のTPS対応 + 蓄積能力
- リアルタイムデータウェアハウス
- シーン:MySQL BinlogをHBaseに同期
- キーポイント:低レイテンシー + 高スループット(Canalの代替)
- IoT(インターネット・オブ・シングス)
- シーン:数百万台のデバイスへのコマンド配信
- キーポイント:膨大なトピックサポート + メッセージフィルタリング
5. コードサンプル
5.1 プロデューサー(同期送信)
@RestController
public class ProducerController {
@Autowired
private DefaultMQProducer producer;
@GetMapping("/send")
public String sendMessage(@RequestParam String msg) {
try {
producer.start();
Message message = new Message("test-topic", "TagA", msg.getBytes());
SendResult result = producer.send(message);
return "Message ID: " + result.getMsgId();
} catch (Exception e) {
return "Error: " + e.getMessage();
}
}
}
5.2 コンシューマー(並列処理)
@Service
public class ConsumerService extends DefaultMQPushConsumer {
public ConsumerService() throws MQClientException {
this.subscribe("test-topic", "*");
this.registerMessageListener((List<MessageExt> msgs, ConsumeConcurrentlyContext context) -> {
msgs.forEach(msg -> System.out.println("Received: " + new String(msg.getBody())));
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
this.start();
}
}
6. 同類との比較
| **機能** |
**RocketMQ** |
**Kafka** |
**RabbitMQ** |
| トランザクションメッセージ |
フルサポート |
未サポート |
未サポート |
| 遅延メッセージ |
事前定義されたレベル |
外部スケジューラが必要 |
TTLを使用して柔軟 |
| 消費モード |
Pull/Pushハイブリッド |
Pullのみ |
Pushのみ |
| 順序の保証 |
キュー単位で厳密 |
パーティション内でのみ |
なし |
| 運用の複雑さ |
中程度 |
高い(ZooKeeper必須) |
低い |
7. 制限点
- 名前制約
- Topic/Group名の長さは255文字以内(Kafkaにはこの制約がない)。
- 遅延精度
- 固定された遅延レベルのみサポート(任意の時間精度は不可)。
- エコシステムツール
- 監視ツールはKafkaほど成熟していない(RocketMQ-Exporter + Grafanaの使用が推奨される)。