コアコンセプト
Broker: メッセージの受信と配信を担うサーバーアプリケーション。RabbitMQサーバー自体を指す。
Connection: クライアント(パブリッシャー/コンシューマー)とBroker間のTCP接続。
Channel: Connection内部に確立される論理的な接続。高負荷時に毎回TCP接続を確立するオーバーヘッドを避けるため、スレッドごとにChannelを分けて通信を行う。AMQPプロトコルのチャネルIDにより互いに完全に隔離される。
Exchange: メッセージがBrokerに到達した最初の受け口。ルーティングルールとRouting Keyに基づいて対象キューへメッセージを配信する。主なタイプはdirect, topic, fanoutなど。
Routing Key: プロデューサーがメッセージをExchangeに送信する際に指定するルーティング用のキー。
Binding Key: ExchangeとQueueをバインドする際に設定するキー。送信されたメッセージのRouting Keyと比較され、一致した場合にキューへ配送される。
vHost (仮想ホスト): 1つのRabbitMQサーバー内で複数作成可能な独立したミニBroker環境。それぞれが独自のExchange、Queue、Binding、権限を持ち、単一サーバーで複数アプリケーションを安全に稼働させる。
環境構築
RPMパッケージを用いてインストールし、管理用プラグインを有効化する。
rpm -ivh [パッケージ名].rpm
rabbitmq-plugins enable rabbitmq-management
設定ファイルを所定のディレクトリにコピーし、loopback_usersのコメントアウトを解除・カンマを削除して外部からのアクセスを許可する。その後、サービスを起動する。
systemctl start rabbitmq-server
バックグラウンドでの起動にはrabbitmq-server -detachedを使用する。rabbitmqctl start_appはアプリケーション層のみの起動であり、サーバー全体の起動とは用途が異なる。
メッセージングモデル
Javaクライアントを用いた基本的なモデルを解説する。まずは接続情報を共通化するユーティリティを準備する。
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class MqHelper {
private static ConnectionFactory factory;
static {
factory = new ConnectionFactory();
factory.setHost("10.0.0.1");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("admin");
factory.setPassword("admin");
}
public static Connection createConnection() throws Exception {
return factory.newConnection();
}
public static void closeResources(Channel ch, Connection conn) {
try { if (ch != null) ch.close(); } catch (Exception ignored) {}
try { if (conn != null) conn.close(); } catch (Exception ignored) {}
}
}
Basic Queue (Hello World)
単一のプロデューサー、単一のキュー、単一のコンシューマーによる最もシンプルなモデル。
プロデューサー
Channel channel = MqHelper.createConnection().createChannel();
// キュー宣言: (キュー名, 永続化, 排他, 自動削除, 引数)
channel.queueDeclare("basic_q", true, false, false, null);
// メッセージ送信: (交換機, ルーティングキー, プロパティ, ボディ)
channel.basicPublish("", "basic_q", null, "hello rabbit".getBytes());
MqHelper.closeResources(channel, channel.getConnection());
コンシューマー
Channel channel = MqHelper.createConnection().createChannel();
channel.queueDeclare("basic_q", true, false, false, null);
channel.basicConsume("basic_q", true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) {
System.out.println("Received: " + new String(body));
}
});
メッセージの永続化を行う場合、キュー宣言のdurableをtrueにし、送信時のプロパティにMessageProperties.PERSISTENT_TEXT_PLAINを指定する。
Work Queue (タスクキュー)
複数のコンシューマーが1つのキューからメッセージを取り出し、負荷分散を行うモデル。デフォルトではラウンドロビン方式でメッセージが振り分けられる。
手動ACKとQoS設定
メッセージの確実な処理を保証するため、自動ACKを無効化し、処理完了後に手動でACKを返す。また、basicQos(1)を設定することで、未ACKのメッセージが1つである限り新たなメッセージを配送しないようにし、処理の早いコンシューマーほど多くメッセージを受け取る「能者多労」を実現する。
Channel channel = MqHelper.createConnection().createChannel();
channel.basicQos(1);
channel.queueDeclare("work_q", true, false, false, null);
channel.basicConsume("work_q", false, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
try {
Thread.sleep(1000); // 処理の遅延をシミュレート
System.out.println("Processed: " + new String(body));
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
// 手動確認: (配信タグ, 複数一括確認フラグ)
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
});
Fanout Exchange (パブリッシュ/サブスクライブ)
Exchangeにバインドされた全てのキューにメッセージをブロードキャストするモデル。
// プロデューサー
channel.exchangeDeclare("broadcast_ex", BuiltinExchangeType.FANOUT);
channel.basicPublish("broadcast_ex", "", null, "log data".getBytes());
// コンシューマー
channel.exchangeDeclare("broadcast_ex", "fanout");
String tempQueue = channel.queueDeclare().getQueue();
channel.queueBind(tempQueue, "broadcast_ex", "");
Direct Exchange (ルーティング)
メッセージのRouting Keyと完全に一致するBinding Keyを持つキューにのみメッセージを配送するモデル。
// プロデューサー
channel.exchangeDeclare("route_ex", BuiltinExchangeType.DIRECT);
channel.basicPublish("route_ex", "error", null, "critical error".getBytes());
// コンシューマー (infoとerrorを購読)
channel.exchangeDeclare("route_ex", BuiltinExchangeType.DIRECT);
String tempQueue = channel.queueDeclare().getQueue();
channel.queueBind(tempQueue, "route_ex", "info");
channel.queueBind(tempQueue, "route_ex", "error");
Topic Exchange (トピック)
Directモデルにワイルドカードを追加したモデル。*は1つの単語、#は0個以上の単語にマッチする。
// プロデューサー
channel.exchangeDeclare("topic_ex", BuiltinExchangeType.TOPIC);
channel.basicPublish("topic_ex", "order.create", null, "new order".getBytes());
// コンシューマー
channel.exchangeDeclare("topic_ex", BuiltinExchangeType.TOPIC);
String tempQueue = channel.queueDeclare().getQueue();
channel.queueBind(tempQueue, "topic_ex", "order.*");
Spring Bootによる実装
spring-boot-starter-amqpを依存関係に追加し、application.ymlで接続情報を設定する。
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
username: admin
password: admin
virtual-host: /
Basic / Work
RabbitTemplateでメッセージを送信し、@RabbitListenerで受信する。デフォルト交換機へ送信する場合、ルーティングキー名とキュー名が一致する。
// プロデューサー
rabbitTemplate.convertAndSend("simple_q", "hello spring");
// コンシューマー
@Component
public class SimpleConsumer {
@RabbitListener(queuesToDeclare = @Queue("simple_q"))
public void process(String payload) {
System.out.println("Received: " + payload);
}
}
Fanout
// プロデューサー
rabbitTemplate.convertAndSend("fanout_ex", "", "broadcast msg");
// コンシューマー
@RabbitListener(bindings = @QueueBinding(
value = @Queue, // 非永続の一時キュー
exchange = @Exchange(name = "fanout_ex", type = ExchangeTypes.FANOUT)
))
public void process(String payload) { ... }
Direct
// プロデューサー
rabbitTemplate.convertAndSend("direct_ex", "warn", "warning msg");
// コンシューマー
@RabbitListener(bindings = @QueueBinding(
value = @Queue("direct_q"),
key = {"info", "warn"},
exchange = @Exchange(name = "direct_ex", type = ExchangeTypes.DIRECT)
))
public void process(String payload) { ... }
Topic
// プロデューサー
rabbitTemplate.convertAndSend("topic_ex", "sys.error", "topic msg");
// コンシューマー
@RabbitListener(bindings = @QueueBinding(
value = @Queue,
key = "sys.*",
exchange = @Exchange(name = "topic_ex", type = ExchangeTypes.TOPIC)
))
public void process(String payload) { ... }
デッドレターキューと遅延キュー
デッドレターキュー(DLX)は、消費不可能となったメッセージ(TTL切れ、キュー上限超過、拒否/ネグレクト)を格納する特殊なキューである。これを応用したのが遅延キューである。
TTL + DLX による遅延キューの課題
キューにTTLを設定し、期限切れになったメッセージをDLX経由で消費することで遅延処理を実現できる。しかし、メッセージ単位でTTLを設定した場合、RabbitMQはキューの先頭にあるメッセージの期限しかチェックしないため、後続の短いTTLのメッセージが古い長いTTLのメッセージにブロックされる「キューの先頭詰まり」問題が発生する。
プラグインによる遅延キュー
rabbitmq_delayed_message_exchangeプラグインを導入することで、メッセージ粒度の正確な遅延配信が可能になる。このプラグインはExchange上で遅延時間を管理し、期限が来るまでMnesiaテーブルに保持した後、対象キューにルーティングする。
// 設定クラス
@Bean
public CustomExchange delayedExchange() {
Map<String, Object> args = new HashMap<>();
args.put("x-delayed-type", ExchangeTypes.DIRECT);
return new CustomExchange("delayed_ex", "x-delayed-message", true, false, args);
}
// プロデューサー
rabbitTemplate.convertAndSend("delayed_ex", "delay_rk", msg, correlationData -> {
correlationData.getMessageProperties().setDelay(5000); // 5秒遅延
return correlationData;
});
// コンシューマー
@RabbitListener(queues = "delayed_q")
public void process(String payload) { ... }
メッセージの信頼性保証
パブリッシャー確認 (Confirm Callback)
メッセージがBrokerのExchangeに到達したかを確認するコールバック。publisher-confirm-type: correlatedを設定して有効化する。
@Component
public class PubConfirmCallback implements RabbitTemplate.ConfirmCallback {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (ack) {
System.out.println("Exchangeへの到達確認成功: " + correlationData.getId());
} else {
System.err.println("Exchangeへの到達失敗: " + cause);
}
}
}
リターンコールバック (Return Callback)
ExchangeからQueueへのルーティングに失敗した際に呼ばれるコールバック。publisher-returns: trueを設定して有効化する。
@Component
public class PubReturnCallback implements RabbitTemplate.ReturnsCallback {
@Override
public void returnedMessage(ReturnedMessage returned) {
System.err.println("ルーティング失敗メッセージ: " + new String(returned.getMessage().getBody()));
}
}
これらのコールバックをRabbitTemplateに設定することで、メッセージのロストを防ぐ仕組みを構築できる。
メッセージ蓄積と損失防止の基本戦略
メッセージが蓄積した場合の対策として、コンシューマーのスケールアウト、キューの分割、処理の非同期化、メッセージのTTL設定や期限切れデータの破棄、QoSによるフェアディスパッチの導入が考えられる。
メッセージ損失を防ぐためには、以下の3点を担保する必要がある。
- プロデューサー側: Confirm/ReturnコールバックによるBrokerへの確実な到達確認。
- Broker側: ExchangeとQueueの永続化(durable)、メッセージの永続化(PERSISTENT)。
- コンシューマー側: 手動ACK(acknowledge-mode: manual)による確実な処理完了通知。