サービス間の非同期通信
RabbitMQを用いたメッセージングにおいては、以下の課題が発生しうる:
- メッセージの信頼性
- 死信交換機(DLX)の活用
- 遅延キューの実現
- 大量メッセージ蓄積への対応
- クラスタ構成による可用性向上
1. メッセージ信頼性の確保
メッセージが送信元から受信先まで確実に到達するためには、以下の機構を組み合わせる必要がある。
1.1 パブリッシャー確認(Publisher Confirm)
RabbitMQは、メッセージがExchangeに正しく配信されたかどうかを通知するConfirmメカニズムを提供する。さらに、Exchangeに到達したもののQueueにルーティングされなかった場合に通知するReturnメカニズムも利用可能である。
設定例(application.yml):
spring:
rabbitmq:
publisher-confirm-type: correlated
publisher-returns: true
template:
mandatory: true
ReturnCallbackの設定:
@Configuration
public class RabbitConfig implements ApplicationContextAware {
@Override
public void setApplicationContext(ApplicationContext ctx) throws BeansException {
RabbitTemplate template = ctx.getBean(RabbitTemplate.class);
template.setReturnsCallback(returned -> {
log.error("Return - Code: {}, Reason: {}, Exchange: {}, RoutingKey: {}",
returned.getReplyCode(),
returned.getReplyText(),
returned.getExchange(),
returned.getRoutingKey());
});
}
}
ConfirmCallbackの使用例:
CorrelationData corr = new CorrelationData(UUID.randomUUID().toString());
corr.getFuture().addCallback(
confirm -> {
if (confirm.isAck()) {
log.info("メッセージ送信成功");
} else {
log.warn("メッセージ送信失敗");
}
},
ex -> log.error("送信中に例外発生", ex)
);
rabbitTemplate.convertAndSend("exchange.name", "routing.key", "payload", corr);
1.2 メッセージの永続化
MQサーバーの再起動時にもメッセージを保持するためには、以下の3要素を永続化する必要がある:
- Exchange(デフォルトで永続化)
- Queue(
QueueBuilder.durable()で指定) - メッセージ本体(
MessageDeliveryMode.PERSISTENT)
Spring AMQPでは、明示的に指定しなくてもメッセージはデフォルトで永続化される。
1.3 コンシューマー確認モード
コンシューマーのACK戦略には以下の3種類がある:
manual: 手動でACK/NACKを返すauto: 例外が発生しなければ自動ACK(推奨)none: ACKなし(メッセージ喪失リスクあり)
設定例:
spring:
rabbitmq:
listener:
simple:
acknowledge-mode: auto
prefetch: 1
1.4 失敗時のリトライと復旧戦略
コンシューマー処理失敗時に無限ループを避けるため、ローカルリトライを有効化する:
spring:
rabbitmq:
listener:
simple:
retry:
enabled: true
initial-interval: 1000
max-attempts: 3
最大リトライ回数を超えた場合の挙動は、MessageRecovererでカスタマイズ可能。特にRepublishMessageRecovererを使うことで、失敗メッセージを専用のエラーキューに転送できる。
@Bean
public MessageRecoverer republishRecoverer(RabbitTemplate template) {
return new RepublishMessageRecoverer(template, "error.exchange", "error.routing");
}
2. 死信交換機(Dead Letter Exchange)
以下の場合、メッセージは「死信」となり、設定されたDLXに転送される:
- コンシューマーが
reject/nackかつrequeue=false - メッセージのTTL(Time-To-Live)が経過
- キューが満杯で新しいメッセージを格納できない
DLX付きキューの宣言例:
@Bean
public Queue ttlQueue() {
return QueueBuilder.durable("ttl.queue")
.ttl(10000)
.deadLetterExchange("dlx.exchange")
.deadLetterRoutingKey("dl")
.build();
}
3. 遅延キューの実現
標準的なRabbitMQでは、TTL + DLXの組み合わせで遅延処理を実現できる。しかし、公式プラグインdelayed-message-exchangeを使用すると、よりシンプルに実装可能。
Delayed Exchangeの宣言:
@Bean
public CustomExchange delayedExchange() {
return new CustomExchange("delay.exchange", "x-delayed-message", true, false,
Collections.singletonMap("x-delayed-type", "direct"));
}
遅延メッセージの送信:
Message msg = MessageBuilder.withBody("delayed payload".getBytes())
.setHeader("x-delay", 5000)
.build();
rabbitTemplate.send("delay.exchange", "routing.key", msg);
4. レイジーキュー(Lazy Queue)
大量のメッセージ蓄積に対応するため、RabbitMQ 3.6.0以降ではレイジーキューがサポートされている。このキューはメッセージをディスクに保存し、消費時にメモリに読み込む。
レイジーキューの宣言:
@Bean
public Queue lazyQueue() {
return QueueBuilder.durable("lazy.queue")
.lazy()
.build();
}
またはアノテーションで:
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "lazy.queue", arguments = @Argument(name = "x-queue-mode", value = "lazy")),
exchange = @Exchange("lazy.exchange"),
key = "lazy"
))
public void handleLazy(String msg) { /* ... */ }
5. RabbitMQクラスタ構成
RabbitMQは以下のクラスタモードをサポート:
- クラシッククラスタ: ExchangeやQueueのメタ情報のみ共有。メッセージはノードローカルに保持。
- ミラーリングキュー: 複数ノードにキュー内容をレプリケート(非推奨)。
- クォーラムキュー(Quorum Queue): Raftプロトコルに基づく強整合性を持つ高可用キュー(推奨)。
クォーラムキューの宣言:
@Bean
public Queue quorumQueue() {
return QueueBuilder.durable("quorum.queue").quorum().build();
}
クラスタ接続設定:
spring:
rabbitmq:
addresses: host1:5672,host2:5672,host3:5672
username: user
password: pass