RabbitMQのExchangeとDead Letter QueueおよびDelayed Queueの実践ガイド

Exchangeの基礎と種類

Exchangeとは

RabbitMQのメッセージングモデルにおいて、プロデューサーはメッセージを直接キューに送信しません。プロデューサーはExchangeにメッセージを送信し、Exchangeがルーティングルールに基づいてキューにメッセージを配信します。

Exchangeの種類

Direct Exchange

ルーティングキーと完全一致するキューにメッセージを配信します。例えば、ルーティングキー「order.created」でバインドされたキューは、「order.created」というルーティングキーのメッセージのみを受信します。

Topic Exchange

パターンマッチングによるルーティングが可能です。「*」は1単語、「#」は0個以上の単語にマッチします。「order.*.created」は「order.payment.created」にマッチしますが、「order.payment.user.created」にはマッチしません。「order.#」は両方にマッチします。

Headers Exchange

ルーティングキーではなく、メッセージのヘッダー属性でマッチングを行います。x-match属性で「all」(全てのヘッダーが一致)または「any」(いずれかのヘッダーが一致)を指定できます。

Fanout Exchange

バインドされた全てのキューにメッセージをブロードキャストします。ルーティングキーを無視するため、最も高速な配信が可能です。

Fanout Exchangeの実装例

ログシステムを構築し、コンソール出力とファイル書き込みの2つのコンシューマーにメッセージを配信する例を示します。

// プロデューサー
public class LogProducer {
    private static final String EXCHANGE_NAME = "logs_exchange";

    public static void main(String[] args) throws Exception {
        Channel channel = ConnectionUtil.createChannel();
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);

        Scanner scanner = new Scanner(System.in);
        while (scanner.hasNext()) {
            String msg = scanner.next();
            channel.basicPublish(EXCHANGE_NAME, "", null, msg.getBytes(StandardCharsets.UTF_8));
            System.out.println("メッセージ送信完了: " + msg);
        }
    }
}

// コンシューマー(コンソール出力用)
public class ConsoleLogConsumer {
    private static final String EXCHANGE_NAME = "logs_exchange";

    public static void main(String[] args) throws Exception {
        Channel channel = ConnectionUtil.createChannel();
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);

        String tempQueue = channel.queueDeclare().getQueue();
        channel.queueBind(tempQueue, EXCHANGE_NAME, "");

        System.out.println("コンソールログ待機中...");
        DeliverCallback callback = (tag, delivery) -> {
            String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
            System.out.println("コンソール出力: " + message);
        };
        channel.basicConsume(tempQueue, true, callback, tag -> {});
    }
}

Direct Exchangeの実装例

ログレベルに応じて異なるキューにルーティングする例です。

// プロデューサー
public class DirectLogProducer {
    private static final String EXCHANGE = "direct_logs";

    public static void main(String[] args) throws Exception {
        Channel channel = ConnectionUtil.createChannel();
        channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.DIRECT);

        Map logData = new HashMap<>();
        logData.put("info", "システム情報ログ");
        logData.put("warning", "警告ログ");
        logData.put("error", "エラーログ");

        for (Map.Entry entry : logData.entrySet()) {
            channel.basicPublish(EXCHANGE, entry.getKey(), null, entry.getValue().getBytes());
            System.out.println("送信完了 - レベル: " + entry.getKey() + ", 内容: " + entry.getValue());
        }
    }
}

// エラーログ専用コンシューマー
public class ErrorLogConsumer {
    private static final String EXCHANGE = "direct_logs";
    private static final String QUEUE = "error_queue";

    public static void main(String[] args) throws Exception {
        Channel channel = ConnectionUtil.createChannel();
        channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.DIRECT);
        channel.queueDeclare(QUEUE, false, false, false, null);
        channel.queueBind(QUEUE, EXCHANGE, "error");

        DeliverCallback callback = (tag, delivery) -> {
            String msg = new String(delivery.getBody(), StandardCharsets.UTF_8);
            System.out.println("エラーログ受信: " + msg);
        };
        channel.basicConsume(QUEUE, true, callback, tag -> {});
    }
}

Topic Exchangeの実装例

// プロデューサー
public class TopicLogProducer {
    private static final String EXCHANGE = "topic_logs";

    public static void main(String[] args) throws Exception {
        Channel channel = ConnectionUtil.createChannel();
        channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.TOPIC);

        Map routingData = new LinkedHashMap<>();
        routingData.put("quick.orange.rabbit", "Q1とQ2が受信");
        routingData.put("lazy.orange.elephant", "Q1とQ2が受信");
        routingData.put("quick.brown.fox", "マッチなし、破棄");
        routingData.put("lazy.pink.rabbit", "Q2のみ受信");

        for (Map.Entry entry : routingData.entrySet()) {
            channel.basicPublish(EXCHANGE, entry.getKey(), null, entry.getValue().getBytes());
        }
    }
}

// コンシューマー(*.orange.* パターン)
public class OrangeTopicConsumer {
    private static final String EXCHANGE = "topic_logs";
    private static final String QUEUE = "orange_queue";

    public static void main(String[] args) throws Exception {
        Channel channel = ConnectionUtil.createChannel();
        channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.TOPIC);
        channel.queueDeclare(QUEUE, false, false, false, null);
        channel.queueBind(QUEUE, EXCHANGE, "*.orange.*");

        DeliverCallback callback = (tag, delivery) -> {
            String msg = new String(delivery.getBody(), StandardCharsets.UTF_8);
            System.out.println("受信キュー: " + QUEUE + ", ルーティングキー: " + delivery.getEnvelope().getRoutingKey());
        };
        channel.basicConsume(QUEUE, true, callback, tag -> {});
    }
}

Dead Letter Queue(死文字キュー)

概念と用途

Dead Letter Queue(DLQ)は、正常に消費されなかったメッセージを格納するための特別なキューです。メッセージがDLQに送られる主な理由は以下の通りです:

  • メッセージのTTL(Time To Live)が期限切れ
  • キューが最大長に達した
  • メッセージが拒否され、再キューが無効化されている

TTL期限切れによるDLQ転送の実装

// プロデューサー
public class DlqProducer {
    private static final String NORMAL_EXCHANGE = "normal_exchange";

    public static void main(String[] args) throws Exception {
        Channel channel = ConnectionUtil.createChannel();

        AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
                .expiration("10000") // 10秒のTTL
                .build();

        for (int i = 0; i < 10; i++) {
            String msg = "message_" + i;
            channel.basicPublish(NORMAL_EXCHANGE, "routing.a", props, msg.getBytes());
        }
    }
}

// 通常キューのコンシューマー(DLQ設定付き)
public class NormalQueueConsumer {
    private static final String NORMAL_EXCHANGE = "normal_exchange";
    private static final String DLQ_EXCHANGE = "dlq_exchange";
    private static final String NORMAL_QUEUE = "normal_queue";
    private static final String DLQ_QUEUE = "dead_letter_queue";

    public static void main(String[] args) throws Exception {
        Channel channel = ConnectionUtil.createChannel();

        // Exchangeの宣言
        channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
        channel.exchangeDeclare(DLQ_EXCHANGE, BuiltinExchangeType.DIRECT);

        // 通常キューの宣言(DLQ設定付き)
        Map args = new HashMap<>();
        args.put("x-dead-letter-exchange", DLQ_EXCHANGE);
        args.put("x-dead-letter-routing-key", "dlq.routing");
        channel.queueDeclare(NORMAL_QUEUE, false, false, false, args);

        // DLQの宣言
        channel.queueDeclare(DLQ_QUEUE, false, false, false, null);

        // バインディング
        channel.queueBind(NORMAL_QUEUE, NORMAL_EXCHANGE, "routing.a");
        channel.queueBind(DLQ_QUEUE, DLQ_EXCHANGE, "dlq.routing");

        DeliverCallback callback = (tag, delivery) -> {
            System.out.println("通常キュー受信: " + new String(delivery.getBody()));
        };
        channel.basicConsume(NORMAL_QUEUE, true, callback, tag -> {});
    }
}

// DLQコンシューマー
public class DlqConsumer {
    private static final String DLQ_QUEUE = "dead_letter_queue";

    public static void main(String[] args) throws Exception {
        Channel channel = ConnectionUtil.createChannel();

        DeliverCallback callback = (tag, delivery) -> {
            System.out.println("DLQ受信: " + new String(delivery.getBody()));
        };
        channel.basicConsume(DLQ_QUEUE, true, callback, tag -> {});
    }
}

キュー最大長によるDLQ転送

// キュー最大長の設定
Map queueArgs = new HashMap<>();
queueArgs.put("x-dead-letter-exchange", "dlq_exchange");
queueArgs.put("x-dead-letter-routing-key", "dlq.routing");
queueArgs.put("x-max-length", 6); // 最大6件
channel.queueDeclare("normal_queue", false, false, false, queueArgs);

メッセージ拒否によるDLQ転送

DeliverCallback callback = (tag, delivery) -> {
    String msg = new String(delivery.getBody(), StandardCharsets.UTF_8);
    if ("reject_me".equals(msg)) {
        // 再キューせずに拒否(DLQへ転送)
        channel.basicReject(delivery.getEnvelope().getDeliveryTag(), false);
    } else {
        System.out.println("受信: " + msg);
        channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
    }
};
channel.basicConsume("normal_queue", false, callback, tag -> {});

遅延キューの実装

TTLとDLQを組み合わせた遅延キュー

Spring Bootとの統合例を示します。

// 設定クラス
@Configuration
public class DelayQueueConfig {
    public static final String DELAY_EXCHANGE = "delay.exchange";
    public static final String DELAY_QUEUE = "delay.queue";
    public static final String DELAY_ROUTING_KEY = "delay.key";

    @Bean
    public Queue delayQueue() {
        return QueueBuilder.durable(DELAY_QUEUE).build();
    }

    @Bean
    public CustomExchange delayExchange() {
        Map args = new HashMap<>();
        args.put("x-delayed-type", "direct");
        return new CustomExchange(DELAY_EXCHANGE, "x-delayed-message", true, false, args);
    }

    @Bean
    public Binding delayBinding(Queue delayQueue, CustomExchange delayExchange) {
        return BindingBuilder.bind(delayQueue).to(delayExchange).with(DELAY_ROUTING_KEY).noargs();
    }
}

// コントローラー
@RestController
@RequestMapping("/delay")
public class DelayController {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @GetMapping("/send/{msg}/{delayMs}")
    public String sendDelayMessage(@PathVariable String msg, @PathVariable Integer delayMs) {
        rabbitTemplate.convertAndSend(DelayQueueConfig.DELAY_EXCHANGE,
                DelayQueueConfig.DELAY_ROUTING_KEY, msg, message -> {
                    message.getMessageProperties().setDelay(delayMs);
                    return message;
                });
        return "送信完了: " + msg + ", 遅延: " + delayMs + "ms";
    }
}

// コンシューマー
@Component
public class DelayConsumer {
    @RabbitListener(queues = DelayQueueConfig.DELAY_QUEUE)
    public void handleDelayMessage(Message message) {
        String msg = new String(message.getBody());
        System.out.println("遅延メッセージ受信: " + msg + ", 時刻: " + LocalDateTime.now());
    }
}

パブリッシャー確認メカニズム

コールバック付き確認

application.ymlの設定:

spring:
  rabbitmq:
    publisher-confirm-type: correlated
    publisher-returns: true
    template:
      mandatory: true
// コールバッククラス
@Component
public class PublisherCallback implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @PostConstruct
    public void init() {
        rabbitTemplate.setConfirmCallback(this);
        rabbitTemplate.setReturnsCallback(this);
    }

    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        if (ack) {
            System.out.println("メッセージ確認成功: " + correlationData.getId());
        } else {
            System.out.println("メッセージ確認失敗: " + correlationData.getId() + ", 原因: " + cause);
        }
    }

    @Override
    public void returnedMessage(ReturnedMessage returned) {
        System.out.println("メッセージ返却: " + new String(returned.getMessage().getBody()) +
                ", Exchange: " + returned.getExchange() +
                ", RoutingKey: " + returned.getRoutingKey());
    }
}

バックアップExchange

@Configuration
public class BackupConfig {
    public static final String MAIN_EXCHANGE = "main.exchange";
    public static final String BACKUP_EXCHANGE = "backup.exchange";
    public static final String MAIN_QUEUE = "main.queue";
    public static final String BACKUP_QUEUE = "backup.queue";
    public static final String WARNING_QUEUE = "warning.queue";

    @Bean
    public DirectExchange mainExchange() {
        return ExchangeBuilder.directExchange(MAIN_EXCHANGE)
                .durable(true)
                .withArgument("alternate-exchange", BACKUP_EXCHANGE)
                .build();
    }

    @Bean
    public FanoutExchange backupExchange() {
        return new FanoutExchange(BACKUP_EXCHANGE);
    }

    @Bean
    public Queue mainQueue() {
        return QueueBuilder.durable(MAIN_QUEUE).build();
    }

    @Bean
    public Queue backupQueue() {
        return QueueBuilder.durable(BACKUP_QUEUE).build();
    }

    @Bean
    public Queue warningQueue() {
        return QueueBuilder.durable(WARNING_QUEUE).build();
    }

    @Bean
    public Binding mainBinding(Queue mainQueue, DirectExchange mainExchange) {
        return BindingBuilder.bind(mainQueue).to(mainExchange).with("main.key");
    }

    @Bean
    public Binding backupBinding(Queue backupQueue, FanoutExchange backupExchange) {
        return BindingBuilder.bind(backupQueue).to(backupExchange);
    }

    @Bean
    public Binding warningBinding(Queue warningQueue, FanoutExchange backupExchange) {
        return BindingBuilder.bind(warningQueue).to(backupExchange);
    }
}

優先度キュー

// 優先度キューの宣言
Map priorityArgs = new HashMap<>();
priorityArgs.put("x-max-priority", 10);
channel.queueDeclare("priority_queue", true, false, false, priorityArgs);

// メッセージ送信時に優先度を設定
AMQP.BasicProperties highPriority = new AMQP.BasicProperties.Builder()
        .priority(10)
        .build();
channel.basicPublish("", "priority_queue", highPriority, "緊急メッセージ".getBytes());

AMQP.BasicProperties normalPriority = new AMQP.BasicProperties.Builder()
        .priority(1)
        .build();
channel.basicPublish("", "priority_queue", normalPriority, "通常メッセージ".getBytes());

レイジーキュー

レイジーキューはメッセージをできるだけディスクに保存し、消費時にメモリにロードします。大量のメッセージ蓄積に適しています。

Map lazyArgs = new HashMap<>();
lazyArgs.put("x-queue-mode", "lazy");
channel.queueDeclare("lazy_queue", false, false, false, lazyArgs);

クラスタ構成

クラスタ構築手順

# 各ノードでCookieを同期
scp /var/lib/rabbitmq/.erlang.cookie root@node2:/var/lib/rabbitmq/
scp /var/lib/rabbitmq/.erlang.cookie root@node3:/var/lib/rabbitmq/

# RabbitMQ起動
rabbitmq-server -detached

# node2でクラスタに参加
rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl join_cluster rabbit@node1
rabbitmqctl start_app

# node3でクラスタに参加
rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl join_cluster rabbit@node2
rabbitmqctl start_app

# クラスタ状態確認
rabbitmqctl cluster_status

# ユーザー設定
rabbitmqctl add_user admin password
rabbitmqctl set_user_tags admin administrator
rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*"

ミラーキューの設定

Web管理画面でPolicyを追加し、ha-modeを「all」に設定することで、全ノードにキューをミラーリングできます。

Federation Exchange

地理的に離れたデータセンター間でExchangeを連携させ、ネットワーク遅延を軽減できます。

# プラグインの有効化
rabbitmq-plugins enable rabbitmq_federation
rabbitmq-plugins enable rabbitmq_federation_management

Shovel

異なるBroker間でメッセージを転送します。

# プラグインの有効化
rabbitmq-plugins enable rabbitmq_shovel
rabbitmq-plugins enable rabbitmq_shovel_management

RabbitMQの高度な機能を活用することで、信頼性の高い分散メッセージングシステムを構築できます。用途に応じて適切なExchangeタイプ、キュー設定、クラスタ構成を選択することが重要です。

タグ: RabbitMQ Message Queue Dead Letter Queue Delayed Queue Cluster

6月1日 19:37 投稿