RabbitMQ入門:基本概念からSpring Bootによる実践的メッセージングまで

コアコンセプト

Broker: メッセージの受信と配信を担うサーバーアプリケーション。RabbitMQサーバー自体を指す。

Connection: クライアント(パブリッシャー/コンシューマー)とBroker間のTCP接続。

Channel: Connection内部に確立される論理的な接続。高負荷時に毎回TCP接続を確立するオーバーヘッドを避けるため、スレッドごとにChannelを分けて通信を行う。AMQPプロトコルのチャネルIDにより互いに完全に隔離される。

Exchange: メッセージがBrokerに到達した最初の受け口。ルーティングルールとRouting Keyに基づいて対象キューへメッセージを配信する。主なタイプはdirect, topic, fanoutなど。

Routing Key: プロデューサーがメッセージをExchangeに送信する際に指定するルーティング用のキー。

Binding Key: ExchangeとQueueをバインドする際に設定するキー。送信されたメッセージのRouting Keyと比較され、一致した場合にキューへ配送される。

vHost (仮想ホスト): 1つのRabbitMQサーバー内で複数作成可能な独立したミニBroker環境。それぞれが独自のExchange、Queue、Binding、権限を持ち、単一サーバーで複数アプリケーションを安全に稼働させる。

環境構築

RPMパッケージを用いてインストールし、管理用プラグインを有効化する。

rpm -ivh [パッケージ名].rpm
rabbitmq-plugins enable rabbitmq-management

設定ファイルを所定のディレクトリにコピーし、loopback_usersのコメントアウトを解除・カンマを削除して外部からのアクセスを許可する。その後、サービスを起動する。

systemctl start rabbitmq-server

バックグラウンドでの起動にはrabbitmq-server -detachedを使用する。rabbitmqctl start_appはアプリケーション層のみの起動であり、サーバー全体の起動とは用途が異なる。

メッセージングモデル

Javaクライアントを用いた基本的なモデルを解説する。まずは接続情報を共通化するユーティリティを準備する。

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class MqHelper {
    private static ConnectionFactory factory;

    static {
        factory = new ConnectionFactory();
        factory.setHost("10.0.0.1");
        factory.setPort(5672);
        factory.setVirtualHost("/");
        factory.setUsername("admin");
        factory.setPassword("admin");
    }

    public static Connection createConnection() throws Exception {
        return factory.newConnection();
    }

    public static void closeResources(Channel ch, Connection conn) {
        try { if (ch != null) ch.close(); } catch (Exception ignored) {}
        try { if (conn != null) conn.close(); } catch (Exception ignored) {}
    }
}

Basic Queue (Hello World)

単一のプロデューサー、単一のキュー、単一のコンシューマーによる最もシンプルなモデル。

プロデューサー

Channel channel = MqHelper.createConnection().createChannel();
// キュー宣言: (キュー名, 永続化, 排他, 自動削除, 引数)
channel.queueDeclare("basic_q", true, false, false, null);
// メッセージ送信: (交換機, ルーティングキー, プロパティ, ボディ)
channel.basicPublish("", "basic_q", null, "hello rabbit".getBytes());
MqHelper.closeResources(channel, channel.getConnection());

コンシューマー

Channel channel = MqHelper.createConnection().createChannel();
channel.queueDeclare("basic_q", true, false, false, null);
channel.basicConsume("basic_q", true, new DefaultConsumer(channel) {
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) {
        System.out.println("Received: " + new String(body));
    }
});

メッセージの永続化を行う場合、キュー宣言のdurabletrueにし、送信時のプロパティにMessageProperties.PERSISTENT_TEXT_PLAINを指定する。

Work Queue (タスクキュー)

複数のコンシューマーが1つのキューからメッセージを取り出し、負荷分散を行うモデル。デフォルトではラウンドロビン方式でメッセージが振り分けられる。

手動ACKとQoS設定

メッセージの確実な処理を保証するため、自動ACKを無効化し、処理完了後に手動でACKを返す。また、basicQos(1)を設定することで、未ACKのメッセージが1つである限り新たなメッセージを配送しないようにし、処理の早いコンシューマーほど多くメッセージを受け取る「能者多労」を実現する。

Channel channel = MqHelper.createConnection().createChannel();
channel.basicQos(1);
channel.queueDeclare("work_q", true, false, false, null);
channel.basicConsume("work_q", false, new DefaultConsumer(channel) {
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        try {
            Thread.sleep(1000); // 処理の遅延をシミュレート
            System.out.println("Processed: " + new String(body));
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            // 手動確認: (配信タグ, 複数一括確認フラグ)
            channel.basicAck(envelope.getDeliveryTag(), false);
        }
    }
});

Fanout Exchange (パブリッシュ/サブスクライブ)

Exchangeにバインドされた全てのキューにメッセージをブロードキャストするモデル。

// プロデューサー
channel.exchangeDeclare("broadcast_ex", BuiltinExchangeType.FANOUT);
channel.basicPublish("broadcast_ex", "", null, "log data".getBytes());

// コンシューマー
channel.exchangeDeclare("broadcast_ex", "fanout");
String tempQueue = channel.queueDeclare().getQueue();
channel.queueBind(tempQueue, "broadcast_ex", "");

Direct Exchange (ルーティング)

メッセージのRouting Keyと完全に一致するBinding Keyを持つキューにのみメッセージを配送するモデル。

// プロデューサー
channel.exchangeDeclare("route_ex", BuiltinExchangeType.DIRECT);
channel.basicPublish("route_ex", "error", null, "critical error".getBytes());

// コンシューマー (infoとerrorを購読)
channel.exchangeDeclare("route_ex", BuiltinExchangeType.DIRECT);
String tempQueue = channel.queueDeclare().getQueue();
channel.queueBind(tempQueue, "route_ex", "info");
channel.queueBind(tempQueue, "route_ex", "error");

Topic Exchange (トピック)

Directモデルにワイルドカードを追加したモデル。*は1つの単語、#は0個以上の単語にマッチする。

// プロデューサー
channel.exchangeDeclare("topic_ex", BuiltinExchangeType.TOPIC);
channel.basicPublish("topic_ex", "order.create", null, "new order".getBytes());

// コンシューマー
channel.exchangeDeclare("topic_ex", BuiltinExchangeType.TOPIC);
String tempQueue = channel.queueDeclare().getQueue();
channel.queueBind(tempQueue, "topic_ex", "order.*");

Spring Bootによる実装

spring-boot-starter-amqpを依存関係に追加し、application.ymlで接続情報を設定する。

spring:
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: admin
    password: admin
    virtual-host: /

Basic / Work

RabbitTemplateでメッセージを送信し、@RabbitListenerで受信する。デフォルト交換機へ送信する場合、ルーティングキー名とキュー名が一致する。

// プロデューサー
rabbitTemplate.convertAndSend("simple_q", "hello spring");

// コンシューマー
@Component
public class SimpleConsumer {
    @RabbitListener(queuesToDeclare = @Queue("simple_q"))
    public void process(String payload) {
        System.out.println("Received: " + payload);
    }
}

Fanout

// プロデューサー
rabbitTemplate.convertAndSend("fanout_ex", "", "broadcast msg");

// コンシューマー
@RabbitListener(bindings = @QueueBinding(
    value = @Queue, // 非永続の一時キュー
    exchange = @Exchange(name = "fanout_ex", type = ExchangeTypes.FANOUT)
))
public void process(String payload) { ... }

Direct

// プロデューサー
rabbitTemplate.convertAndSend("direct_ex", "warn", "warning msg");

// コンシューマー
@RabbitListener(bindings = @QueueBinding(
    value = @Queue("direct_q"),
    key = {"info", "warn"},
    exchange = @Exchange(name = "direct_ex", type = ExchangeTypes.DIRECT)
))
public void process(String payload) { ... }

Topic

// プロデューサー
rabbitTemplate.convertAndSend("topic_ex", "sys.error", "topic msg");

// コンシューマー
@RabbitListener(bindings = @QueueBinding(
    value = @Queue,
    key = "sys.*",
    exchange = @Exchange(name = "topic_ex", type = ExchangeTypes.TOPIC)
))
public void process(String payload) { ... }

デッドレターキューと遅延キュー

デッドレターキュー(DLX)は、消費不可能となったメッセージ(TTL切れ、キュー上限超過、拒否/ネグレクト)を格納する特殊なキューである。これを応用したのが遅延キューである。

TTL + DLX による遅延キューの課題

キューにTTLを設定し、期限切れになったメッセージをDLX経由で消費することで遅延処理を実現できる。しかし、メッセージ単位でTTLを設定した場合、RabbitMQはキューの先頭にあるメッセージの期限しかチェックしないため、後続の短いTTLのメッセージが古い長いTTLのメッセージにブロックされる「キューの先頭詰まり」問題が発生する。

プラグインによる遅延キュー

rabbitmq_delayed_message_exchangeプラグインを導入することで、メッセージ粒度の正確な遅延配信が可能になる。このプラグインはExchange上で遅延時間を管理し、期限が来るまでMnesiaテーブルに保持した後、対象キューにルーティングする。

// 設定クラス
@Bean
public CustomExchange delayedExchange() {
    Map<String, Object> args = new HashMap<>();
    args.put("x-delayed-type", ExchangeTypes.DIRECT);
    return new CustomExchange("delayed_ex", "x-delayed-message", true, false, args);
}

// プロデューサー
rabbitTemplate.convertAndSend("delayed_ex", "delay_rk", msg, correlationData -> {
    correlationData.getMessageProperties().setDelay(5000); // 5秒遅延
    return correlationData;
});

// コンシューマー
@RabbitListener(queues = "delayed_q")
public void process(String payload) { ... }

メッセージの信頼性保証

パブリッシャー確認 (Confirm Callback)

メッセージがBrokerのExchangeに到達したかを確認するコールバック。publisher-confirm-type: correlatedを設定して有効化する。

@Component
public class PubConfirmCallback implements RabbitTemplate.ConfirmCallback {
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        if (ack) {
            System.out.println("Exchangeへの到達確認成功: " + correlationData.getId());
        } else {
            System.err.println("Exchangeへの到達失敗: " + cause);
        }
    }
}

リターンコールバック (Return Callback)

ExchangeからQueueへのルーティングに失敗した際に呼ばれるコールバック。publisher-returns: trueを設定して有効化する。

@Component
public class PubReturnCallback implements RabbitTemplate.ReturnsCallback {
    @Override
    public void returnedMessage(ReturnedMessage returned) {
        System.err.println("ルーティング失敗メッセージ: " + new String(returned.getMessage().getBody()));
    }
}

これらのコールバックをRabbitTemplateに設定することで、メッセージのロストを防ぐ仕組みを構築できる。

メッセージ蓄積と損失防止の基本戦略

メッセージが蓄積した場合の対策として、コンシューマーのスケールアウト、キューの分割、処理の非同期化、メッセージのTTL設定や期限切れデータの破棄、QoSによるフェアディスパッチの導入が考えられる。

メッセージ損失を防ぐためには、以下の3点を担保する必要がある。

  • プロデューサー側: Confirm/ReturnコールバックによるBrokerへの確実な到達確認。
  • Broker側: ExchangeとQueueの永続化(durable)、メッセージの永続化(PERSISTENT)。
  • コンシューマー側: 手動ACK(acknowledge-mode: manual)による確実な処理完了通知。

タグ: RabbitMQ Spring Boot AMQP メッセージキュー デッドレター

5月20日 10:41 投稿