RabbitMQにおけるメッセージ信頼性と高度なキュー機能

サービス間の非同期通信

RabbitMQを用いたメッセージングにおいては、以下の課題が発生しうる:

  • メッセージの信頼性
  • 死信交換機(DLX)の活用
  • 遅延キューの実現
  • 大量メッセージ蓄積への対応
  • クラスタ構成による可用性向上

1. メッセージ信頼性の確保

メッセージが送信元から受信先まで確実に到達するためには、以下の機構を組み合わせる必要がある。

1.1 パブリッシャー確認(Publisher Confirm)

RabbitMQは、メッセージがExchangeに正しく配信されたかどうかを通知するConfirmメカニズムを提供する。さらに、Exchangeに到達したもののQueueにルーティングされなかった場合に通知するReturnメカニズムも利用可能である。

設定例(application.yml):

spring:
  rabbitmq:
    publisher-confirm-type: correlated
    publisher-returns: true
    template:
      mandatory: true

ReturnCallbackの設定:

@Configuration
public class RabbitConfig implements ApplicationContextAware {
    @Override
    public void setApplicationContext(ApplicationContext ctx) throws BeansException {
        RabbitTemplate template = ctx.getBean(RabbitTemplate.class);
        template.setReturnsCallback(returned -> {
            log.error("Return - Code: {}, Reason: {}, Exchange: {}, RoutingKey: {}",
                returned.getReplyCode(),
                returned.getReplyText(),
                returned.getExchange(),
                returned.getRoutingKey());
        });
    }
}

ConfirmCallbackの使用例:

CorrelationData corr = new CorrelationData(UUID.randomUUID().toString());
corr.getFuture().addCallback(
    confirm -> {
        if (confirm.isAck()) {
            log.info("メッセージ送信成功");
        } else {
            log.warn("メッセージ送信失敗");
        }
    },
    ex -> log.error("送信中に例外発生", ex)
);

rabbitTemplate.convertAndSend("exchange.name", "routing.key", "payload", corr);

1.2 メッセージの永続化

MQサーバーの再起動時にもメッセージを保持するためには、以下の3要素を永続化する必要がある:

  • Exchange(デフォルトで永続化)
  • Queue(QueueBuilder.durable()で指定)
  • メッセージ本体(MessageDeliveryMode.PERSISTENT

Spring AMQPでは、明示的に指定しなくてもメッセージはデフォルトで永続化される。

1.3 コンシューマー確認モード

コンシューマーのACK戦略には以下の3種類がある:

  • manual: 手動でACK/NACKを返す
  • auto: 例外が発生しなければ自動ACK(推奨)
  • none: ACKなし(メッセージ喪失リスクあり)

設定例:

spring:
  rabbitmq:
    listener:
      simple:
        acknowledge-mode: auto
        prefetch: 1

1.4 失敗時のリトライと復旧戦略

コンシューマー処理失敗時に無限ループを避けるため、ローカルリトライを有効化する:

spring:
  rabbitmq:
    listener:
      simple:
        retry:
          enabled: true
          initial-interval: 1000
          max-attempts: 3

最大リトライ回数を超えた場合の挙動は、MessageRecovererでカスタマイズ可能。特にRepublishMessageRecovererを使うことで、失敗メッセージを専用のエラーキューに転送できる。

@Bean
public MessageRecoverer republishRecoverer(RabbitTemplate template) {
    return new RepublishMessageRecoverer(template, "error.exchange", "error.routing");
}

2. 死信交換機(Dead Letter Exchange)

以下の場合、メッセージは「死信」となり、設定されたDLXに転送される:

  • コンシューマーがreject/nackかつrequeue=false
  • メッセージのTTL(Time-To-Live)が経過
  • キューが満杯で新しいメッセージを格納できない

DLX付きキューの宣言例:

@Bean
public Queue ttlQueue() {
    return QueueBuilder.durable("ttl.queue")
        .ttl(10000)
        .deadLetterExchange("dlx.exchange")
        .deadLetterRoutingKey("dl")
        .build();
}

3. 遅延キューの実現

標準的なRabbitMQでは、TTL + DLXの組み合わせで遅延処理を実現できる。しかし、公式プラグインdelayed-message-exchangeを使用すると、よりシンプルに実装可能。

Delayed Exchangeの宣言:

@Bean
public CustomExchange delayedExchange() {
    return new CustomExchange("delay.exchange", "x-delayed-message", true, false,
        Collections.singletonMap("x-delayed-type", "direct"));
}

遅延メッセージの送信:

Message msg = MessageBuilder.withBody("delayed payload".getBytes())
    .setHeader("x-delay", 5000)
    .build();
rabbitTemplate.send("delay.exchange", "routing.key", msg);

4. レイジーキュー(Lazy Queue)

大量のメッセージ蓄積に対応するため、RabbitMQ 3.6.0以降ではレイジーキューがサポートされている。このキューはメッセージをディスクに保存し、消費時にメモリに読み込む。

レイジーキューの宣言:

@Bean
public Queue lazyQueue() {
    return QueueBuilder.durable("lazy.queue")
        .lazy()
        .build();
}

またはアノテーションで:

@RabbitListener(bindings = @QueueBinding(
    value = @Queue(name = "lazy.queue", arguments = @Argument(name = "x-queue-mode", value = "lazy")),
    exchange = @Exchange("lazy.exchange"),
    key = "lazy"
))
public void handleLazy(String msg) { /* ... */ }

5. RabbitMQクラスタ構成

RabbitMQは以下のクラスタモードをサポート:

  • クラシッククラスタ: ExchangeやQueueのメタ情報のみ共有。メッセージはノードローカルに保持。
  • ミラーリングキュー: 複数ノードにキュー内容をレプリケート(非推奨)。
  • クォーラムキュー(Quorum Queue): Raftプロトコルに基づく強整合性を持つ高可用キュー(推奨)。

クォーラムキューの宣言:

@Bean
public Queue quorumQueue() {
    return QueueBuilder.durable("quorum.queue").quorum().build();
}

クラスタ接続設定:

spring:
  rabbitmq:
    addresses: host1:5672,host2:5672,host3:5672
    username: user
    password: pass

タグ: RabbitMQ SpringAMQP メッセージキュー 死信交換機 遅延キュー

5月26日 20:03 投稿