Exchangeの基礎と種類
Exchangeとは
RabbitMQのメッセージングモデルにおいて、プロデューサーはメッセージを直接キューに送信しません。プロデューサーはExchangeにメッセージを送信し、Exchangeがルーティングルールに基づいてキューにメッセージを配信します。
Exchangeの種類
Direct Exchange
ルーティングキーと完全一致するキューにメッセージを配信します。例えば、ルーティングキー「order.created」でバインドされたキューは、「order.created」というルーティングキーのメッセージのみを受信します。
Topic Exchange
パターンマッチングによるルーティングが可能です。「*」は1単語、「#」は0個以上の単語にマッチします。「order.*.created」は「order.payment.created」にマッチしますが、「order.payment.user.created」にはマッチしません。「order.#」は両方にマッチします。
Headers Exchange
ルーティングキーではなく、メッセージのヘッダー属性でマッチングを行います。x-match属性で「all」(全てのヘッダーが一致)または「any」(いずれかのヘッダーが一致)を指定できます。
Fanout Exchange
バインドされた全てのキューにメッセージをブロードキャストします。ルーティングキーを無視するため、最も高速な配信が可能です。
Fanout Exchangeの実装例
ログシステムを構築し、コンソール出力とファイル書き込みの2つのコンシューマーにメッセージを配信する例を示します。
// プロデューサー
public class LogProducer {
private static final String EXCHANGE_NAME = "logs_exchange";
public static void main(String[] args) throws Exception {
Channel channel = ConnectionUtil.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
Scanner scanner = new Scanner(System.in);
while (scanner.hasNext()) {
String msg = scanner.next();
channel.basicPublish(EXCHANGE_NAME, "", null, msg.getBytes(StandardCharsets.UTF_8));
System.out.println("メッセージ送信完了: " + msg);
}
}
}
// コンシューマー(コンソール出力用)
public class ConsoleLogConsumer {
private static final String EXCHANGE_NAME = "logs_exchange";
public static void main(String[] args) throws Exception {
Channel channel = ConnectionUtil.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
String tempQueue = channel.queueDeclare().getQueue();
channel.queueBind(tempQueue, EXCHANGE_NAME, "");
System.out.println("コンソールログ待機中...");
DeliverCallback callback = (tag, delivery) -> {
String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
System.out.println("コンソール出力: " + message);
};
channel.basicConsume(tempQueue, true, callback, tag -> {});
}
}
Direct Exchangeの実装例
ログレベルに応じて異なるキューにルーティングする例です。
// プロデューサー
public class DirectLogProducer {
private static final String EXCHANGE = "direct_logs";
public static void main(String[] args) throws Exception {
Channel channel = ConnectionUtil.createChannel();
channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.DIRECT);
Map logData = new HashMap<>();
logData.put("info", "システム情報ログ");
logData.put("warning", "警告ログ");
logData.put("error", "エラーログ");
for (Map.Entry entry : logData.entrySet()) {
channel.basicPublish(EXCHANGE, entry.getKey(), null, entry.getValue().getBytes());
System.out.println("送信完了 - レベル: " + entry.getKey() + ", 内容: " + entry.getValue());
}
}
}
// エラーログ専用コンシューマー
public class ErrorLogConsumer {
private static final String EXCHANGE = "direct_logs";
private static final String QUEUE = "error_queue";
public static void main(String[] args) throws Exception {
Channel channel = ConnectionUtil.createChannel();
channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.DIRECT);
channel.queueDeclare(QUEUE, false, false, false, null);
channel.queueBind(QUEUE, EXCHANGE, "error");
DeliverCallback callback = (tag, delivery) -> {
String msg = new String(delivery.getBody(), StandardCharsets.UTF_8);
System.out.println("エラーログ受信: " + msg);
};
channel.basicConsume(QUEUE, true, callback, tag -> {});
}
}
Topic Exchangeの実装例
// プロデューサー
public class TopicLogProducer {
private static final String EXCHANGE = "topic_logs";
public static void main(String[] args) throws Exception {
Channel channel = ConnectionUtil.createChannel();
channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.TOPIC);
Map routingData = new LinkedHashMap<>();
routingData.put("quick.orange.rabbit", "Q1とQ2が受信");
routingData.put("lazy.orange.elephant", "Q1とQ2が受信");
routingData.put("quick.brown.fox", "マッチなし、破棄");
routingData.put("lazy.pink.rabbit", "Q2のみ受信");
for (Map.Entry entry : routingData.entrySet()) {
channel.basicPublish(EXCHANGE, entry.getKey(), null, entry.getValue().getBytes());
}
}
}
// コンシューマー(*.orange.* パターン)
public class OrangeTopicConsumer {
private static final String EXCHANGE = "topic_logs";
private static final String QUEUE = "orange_queue";
public static void main(String[] args) throws Exception {
Channel channel = ConnectionUtil.createChannel();
channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.TOPIC);
channel.queueDeclare(QUEUE, false, false, false, null);
channel.queueBind(QUEUE, EXCHANGE, "*.orange.*");
DeliverCallback callback = (tag, delivery) -> {
String msg = new String(delivery.getBody(), StandardCharsets.UTF_8);
System.out.println("受信キュー: " + QUEUE + ", ルーティングキー: " + delivery.getEnvelope().getRoutingKey());
};
channel.basicConsume(QUEUE, true, callback, tag -> {});
}
}
Dead Letter Queue(死文字キュー)
概念と用途
Dead Letter Queue(DLQ)は、正常に消費されなかったメッセージを格納するための特別なキューです。メッセージがDLQに送られる主な理由は以下の通りです:
- メッセージのTTL(Time To Live)が期限切れ
- キューが最大長に達した
- メッセージが拒否され、再キューが無効化されている
TTL期限切れによるDLQ転送の実装
// プロデューサー
public class DlqProducer {
private static final String NORMAL_EXCHANGE = "normal_exchange";
public static void main(String[] args) throws Exception {
Channel channel = ConnectionUtil.createChannel();
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
.expiration("10000") // 10秒のTTL
.build();
for (int i = 0; i < 10; i++) {
String msg = "message_" + i;
channel.basicPublish(NORMAL_EXCHANGE, "routing.a", props, msg.getBytes());
}
}
}
// 通常キューのコンシューマー(DLQ設定付き)
public class NormalQueueConsumer {
private static final String NORMAL_EXCHANGE = "normal_exchange";
private static final String DLQ_EXCHANGE = "dlq_exchange";
private static final String NORMAL_QUEUE = "normal_queue";
private static final String DLQ_QUEUE = "dead_letter_queue";
public static void main(String[] args) throws Exception {
Channel channel = ConnectionUtil.createChannel();
// Exchangeの宣言
channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
channel.exchangeDeclare(DLQ_EXCHANGE, BuiltinExchangeType.DIRECT);
// 通常キューの宣言(DLQ設定付き)
Map args = new HashMap<>();
args.put("x-dead-letter-exchange", DLQ_EXCHANGE);
args.put("x-dead-letter-routing-key", "dlq.routing");
channel.queueDeclare(NORMAL_QUEUE, false, false, false, args);
// DLQの宣言
channel.queueDeclare(DLQ_QUEUE, false, false, false, null);
// バインディング
channel.queueBind(NORMAL_QUEUE, NORMAL_EXCHANGE, "routing.a");
channel.queueBind(DLQ_QUEUE, DLQ_EXCHANGE, "dlq.routing");
DeliverCallback callback = (tag, delivery) -> {
System.out.println("通常キュー受信: " + new String(delivery.getBody()));
};
channel.basicConsume(NORMAL_QUEUE, true, callback, tag -> {});
}
}
// DLQコンシューマー
public class DlqConsumer {
private static final String DLQ_QUEUE = "dead_letter_queue";
public static void main(String[] args) throws Exception {
Channel channel = ConnectionUtil.createChannel();
DeliverCallback callback = (tag, delivery) -> {
System.out.println("DLQ受信: " + new String(delivery.getBody()));
};
channel.basicConsume(DLQ_QUEUE, true, callback, tag -> {});
}
}
キュー最大長によるDLQ転送
// キュー最大長の設定
Map queueArgs = new HashMap<>();
queueArgs.put("x-dead-letter-exchange", "dlq_exchange");
queueArgs.put("x-dead-letter-routing-key", "dlq.routing");
queueArgs.put("x-max-length", 6); // 最大6件
channel.queueDeclare("normal_queue", false, false, false, queueArgs);
メッセージ拒否によるDLQ転送
DeliverCallback callback = (tag, delivery) -> {
String msg = new String(delivery.getBody(), StandardCharsets.UTF_8);
if ("reject_me".equals(msg)) {
// 再キューせずに拒否(DLQへ転送)
channel.basicReject(delivery.getEnvelope().getDeliveryTag(), false);
} else {
System.out.println("受信: " + msg);
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
};
channel.basicConsume("normal_queue", false, callback, tag -> {});
遅延キューの実装
TTLとDLQを組み合わせた遅延キュー
Spring Bootとの統合例を示します。
// 設定クラス
@Configuration
public class DelayQueueConfig {
public static final String DELAY_EXCHANGE = "delay.exchange";
public static final String DELAY_QUEUE = "delay.queue";
public static final String DELAY_ROUTING_KEY = "delay.key";
@Bean
public Queue delayQueue() {
return QueueBuilder.durable(DELAY_QUEUE).build();
}
@Bean
public CustomExchange delayExchange() {
Map args = new HashMap<>();
args.put("x-delayed-type", "direct");
return new CustomExchange(DELAY_EXCHANGE, "x-delayed-message", true, false, args);
}
@Bean
public Binding delayBinding(Queue delayQueue, CustomExchange delayExchange) {
return BindingBuilder.bind(delayQueue).to(delayExchange).with(DELAY_ROUTING_KEY).noargs();
}
}
// コントローラー
@RestController
@RequestMapping("/delay")
public class DelayController {
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("/send/{msg}/{delayMs}")
public String sendDelayMessage(@PathVariable String msg, @PathVariable Integer delayMs) {
rabbitTemplate.convertAndSend(DelayQueueConfig.DELAY_EXCHANGE,
DelayQueueConfig.DELAY_ROUTING_KEY, msg, message -> {
message.getMessageProperties().setDelay(delayMs);
return message;
});
return "送信完了: " + msg + ", 遅延: " + delayMs + "ms";
}
}
// コンシューマー
@Component
public class DelayConsumer {
@RabbitListener(queues = DelayQueueConfig.DELAY_QUEUE)
public void handleDelayMessage(Message message) {
String msg = new String(message.getBody());
System.out.println("遅延メッセージ受信: " + msg + ", 時刻: " + LocalDateTime.now());
}
}
パブリッシャー確認メカニズム
コールバック付き確認
application.ymlの設定:
spring:
rabbitmq:
publisher-confirm-type: correlated
publisher-returns: true
template:
mandatory: true
// コールバッククラス
@Component
public class PublisherCallback implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback {
@Autowired
private RabbitTemplate rabbitTemplate;
@PostConstruct
public void init() {
rabbitTemplate.setConfirmCallback(this);
rabbitTemplate.setReturnsCallback(this);
}
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (ack) {
System.out.println("メッセージ確認成功: " + correlationData.getId());
} else {
System.out.println("メッセージ確認失敗: " + correlationData.getId() + ", 原因: " + cause);
}
}
@Override
public void returnedMessage(ReturnedMessage returned) {
System.out.println("メッセージ返却: " + new String(returned.getMessage().getBody()) +
", Exchange: " + returned.getExchange() +
", RoutingKey: " + returned.getRoutingKey());
}
}
バックアップExchange
@Configuration
public class BackupConfig {
public static final String MAIN_EXCHANGE = "main.exchange";
public static final String BACKUP_EXCHANGE = "backup.exchange";
public static final String MAIN_QUEUE = "main.queue";
public static final String BACKUP_QUEUE = "backup.queue";
public static final String WARNING_QUEUE = "warning.queue";
@Bean
public DirectExchange mainExchange() {
return ExchangeBuilder.directExchange(MAIN_EXCHANGE)
.durable(true)
.withArgument("alternate-exchange", BACKUP_EXCHANGE)
.build();
}
@Bean
public FanoutExchange backupExchange() {
return new FanoutExchange(BACKUP_EXCHANGE);
}
@Bean
public Queue mainQueue() {
return QueueBuilder.durable(MAIN_QUEUE).build();
}
@Bean
public Queue backupQueue() {
return QueueBuilder.durable(BACKUP_QUEUE).build();
}
@Bean
public Queue warningQueue() {
return QueueBuilder.durable(WARNING_QUEUE).build();
}
@Bean
public Binding mainBinding(Queue mainQueue, DirectExchange mainExchange) {
return BindingBuilder.bind(mainQueue).to(mainExchange).with("main.key");
}
@Bean
public Binding backupBinding(Queue backupQueue, FanoutExchange backupExchange) {
return BindingBuilder.bind(backupQueue).to(backupExchange);
}
@Bean
public Binding warningBinding(Queue warningQueue, FanoutExchange backupExchange) {
return BindingBuilder.bind(warningQueue).to(backupExchange);
}
}
優先度キュー
// 優先度キューの宣言
Map priorityArgs = new HashMap<>();
priorityArgs.put("x-max-priority", 10);
channel.queueDeclare("priority_queue", true, false, false, priorityArgs);
// メッセージ送信時に優先度を設定
AMQP.BasicProperties highPriority = new AMQP.BasicProperties.Builder()
.priority(10)
.build();
channel.basicPublish("", "priority_queue", highPriority, "緊急メッセージ".getBytes());
AMQP.BasicProperties normalPriority = new AMQP.BasicProperties.Builder()
.priority(1)
.build();
channel.basicPublish("", "priority_queue", normalPriority, "通常メッセージ".getBytes());
レイジーキュー
レイジーキューはメッセージをできるだけディスクに保存し、消費時にメモリにロードします。大量のメッセージ蓄積に適しています。
Map lazyArgs = new HashMap<>();
lazyArgs.put("x-queue-mode", "lazy");
channel.queueDeclare("lazy_queue", false, false, false, lazyArgs);
クラスタ構成
クラスタ構築手順
# 各ノードでCookieを同期
scp /var/lib/rabbitmq/.erlang.cookie root@node2:/var/lib/rabbitmq/
scp /var/lib/rabbitmq/.erlang.cookie root@node3:/var/lib/rabbitmq/
# RabbitMQ起動
rabbitmq-server -detached
# node2でクラスタに参加
rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl join_cluster rabbit@node1
rabbitmqctl start_app
# node3でクラスタに参加
rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl join_cluster rabbit@node2
rabbitmqctl start_app
# クラスタ状態確認
rabbitmqctl cluster_status
# ユーザー設定
rabbitmqctl add_user admin password
rabbitmqctl set_user_tags admin administrator
rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*"
ミラーキューの設定
Web管理画面でPolicyを追加し、ha-modeを「all」に設定することで、全ノードにキューをミラーリングできます。
Federation Exchange
地理的に離れたデータセンター間でExchangeを連携させ、ネットワーク遅延を軽減できます。
# プラグインの有効化
rabbitmq-plugins enable rabbitmq_federation
rabbitmq-plugins enable rabbitmq_federation_management
Shovel
異なるBroker間でメッセージを転送します。
# プラグインの有効化
rabbitmq-plugins enable rabbitmq_shovel
rabbitmq-plugins enable rabbitmq_shovel_management
RabbitMQの高度な機能を活用することで、信頼性の高い分散メッセージングシステムを構築できます。用途に応じて適切なExchangeタイプ、キュー設定、クラスタ構成を選択することが重要です。