概要
RabbitMQ における Publisher Confirms(发布者确认)は、メッセージがブローカーに確実に到達したことを保証するための拡張機能です。この機能をチャネルで有効にすると、ブローカーはクライアントが公開したメッセージをサーバー側で処理完毕后、非同期で確認応答(ack)を返します。
前提条件
本ガイドでは、RabbitMQ がローカルホストの標準ポート(5672)で稼働していることを前提としています。ホスト名、ポート、認証情報などが異なる場合は、接続設定を適宜変更してください。
チャネルでの確認機能の有効化
Publisher Confirms は AMQP 0.9.1 プロトコルの RabbitMQ 拡張機能であり、デフォルトでは無効になっています。利用するには、チャネルレベルで confirmSelect メソッドを呼び出す必要があります。
Connection conn = factory.newConnection();
Channel ch = conn.createChannel();
ch.confirmSelect();
この設定は、確認機能を使いたいチャネルそれぞれで行う必要があります。また、メッセージを送るたびに有効化するのではなく、一度だけ実行すれば済みます。
実装戦略 1:メッセージごとの同期確認
最も単純な方法は、メッセージを 1 件 publish するたびに、その確認応答を同期的に待つことです。
int messageCount = 50000;
for (int i = 0; i < messageCount; i++) {
String payload = createMessagePayload(i);
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().build();
ch.basicPublish(EXCHANGE_NAME, QUEUE_NAME, props, payload.getBytes());
// 5 秒のタイムアウトを設定して確認を待つ
ch.waitForConfirmsOrDie(5_000);
}
このコードでは、basicPublish の直後に waitForConfirmsOrDie を呼び出し、承認が返されるまでブロックします。タイムアウト内に確認が取れない場合、またはメッセージが否定確認(nack)された場合は例外が発生します。通常、この例外をキャッチしてエラーログを記録するか、再送処理を行います。
クライアントライブラリによって同期処理の実装方法は異なるため、使用するライブラリのドキュメントを参照してください。
この手法は実装が簡単ですが、重大な欠点があります。メッセージごとの確認待ちにより、後続のpublish処理がブロックされるため、スループットが著しく低下します。秒間数百件程度の処理しか見込めないため、高性能が求められる場面には不向きです。
確認は非同期か?
ブローカーからの確認応答自体は非同期で届きますが、上記のコードでは同期的に待機しています。クライアント内部では非同期で通知を受け取り、waitForConfirmsOrDieのブロックを解除する仕組みになっています。
実装戦略 2:バッチ処理による同期確認
スループットを改善するため、複数のメッセージをまとめて送信し、バッチ単位で確認を待つ方法があります。ここでは 100 件を 1 バッチとして扱います。
int BATCH_LIMIT = 100;
int pendingCount = 0;
int totalMessages = 50000;
for (int i = 0; i < totalMessages; i++) {
String payload = createMessagePayload(i);
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().build();
ch.basicPublish(EXCHANGE_NAME, QUEUE_NAME, props, payload.getBytes());
pendingCount++;
if (pendingCount == BATCH_LIMIT) {
ch.waitForConfirmsOrDie(5_000);
pendingCount = 0;
}
}
// 残りのメッセージがある場合も確認を待つ
if (pendingCount > 0) {
ch.waitForConfirmsOrDie(5_000);
}
個別に待つ場合に比べ、バッチ単位で待つことでスループットが大幅に向上します(リモートノードの場合、20〜30 倍の改善が見られることもあります)。ただし、エラーが発生した場合にどのメッセージが失敗したか特定しにくいため、バッチ全体をメモリに保持して再送するなどの対策が必要になります。また、依然として同期処理であるため、publish 処理はブロックされます。
実装戦略 3:非同期確認の処理
ブローカーはメッセージ確認を非同期で行うため、クライアント側でコールバックを登録することで通知を受け取ることができます。
Channel ch = conn.createChannel();
ch.confirmSelect();
ch.addConfirmListener((seqNo, isMultiple) -> {
// 確認成功時の処理
}, (seqNo, isMultiple) -> {
// 否定確認(nack)時の処理
});
確認用と否定確認用の 2 種類のコールバックがあります。それぞれに以下の 2 つの引数が渡されます。
- シーケンス番号:確認されたメッセージを識別する番号。
- isMultiple:布尔値。false の場合は当該番号の 1 件のみ、true の場合は当該番号以下のすべてのメッセージが対象となります。
シーケンス番号は、publish の前に getNextPublishSeqNo で取得できます。
long seqNo = ch.getNextPublishSeqNo();
ch.basicPublish(EXCHANGE_NAME, QUEUE_NAME, props, payload.getBytes());
publish したメッセージとシーケンス番号を紐付けるためには、マップ構造を使用するのが一般的です。例えば、送信済みメッセージを管理する ConcurrentSkipListMap を用意します。
ConcurrentSkipListMap pendingAcks = new ConcurrentSkipListMap<>();
// 送信前にマップへ登録
long seqNo = ch.getNextPublishSeqNo();
pendingAcks.put(seqNo, payload);
ch.basicPublish(EXCHANGE_NAME, QUEUE_NAME, props, payload.getBytes());
確認応答が届いた際に、このマップから該当エントリを削除する処理をコールバック内に実装します。
ConfirmCallback ackHandler = (seqNo, isMultiple) -> {
if (isMultiple) {
ConcurrentSkipListMap confirmed = pendingAcks.headMap(seqNo, true);
confirmed.clear();
} else {
pendingAcks.remove(seqNo);
}
};
ch.addConfirmListener(ackHandler, (seqNo, isMultiple) -> {
String lostBody = pendingAcks.get(seqNo);
System.err.println("Message nack-ed: " + lostBody + ", Seq: " + seqNo);
ackHandler.handle(seqNo, isMultiple);
});
この例では、確認到達時にマップを掃除するコールバックを定義しています。否定確認時にはメッセージ本体を取得してログ出力し、その後同じ掃除処理を適用してマップから削除します。
未確認メッセージの追跡
ConcurrentNavigableMap(またはConcurrentSkipListMap)を使う利点は、シーケンス番号との紐付けが容易なだけでなく、複数の確認をまとめて処理する際に範囲指定で削除しやすい点です。また、確認コールバックはクライアントライブラリ内の別スレッドで実行されるため、スレッドセーフな構造である必要があります。
非同期確認を正しく扱うには、おおむね以下の手順が必要です。
- publish シーケンス番号とメッセージ内容を紐付ける仕組みを用意する。
- チャネルに確認リスナーを登録し、通知を受け取ったら適切な処理(ログ記録や再送など)を行う。
- メッセージ送信前にシーケンス番号を取得し、追跡リストに追加する。
nack されたメッセージの再送について
コールバック内で直ちに再送処理を行いたくなるかもしれませんが、確認コールバックは I/O スレッド上でdispatchされるため、チャネル操作を行うのは避けるべきです。代わりに、ConcurrentLinkedQueueなどのスレッドセーフなキューにメッセージを enqueue し、publish を行うメインスレッドでポーリングして再送する構成が推奨されます。
パフォーマンス比較
実装したコードを実行し、各戦略のパフォーマンスを測定します。ローカル環境で 50,000 件のメッセージを送信した場合の目安は以下の通りです。
個別送信(同期):5,549 ms
バッチ送信(同期):2,331 ms
非同期処理:4,054 ms
ローカル環境では個別送信が遅く、非同期処理がバッチ処理よりやや劣る結果になることもありますが、これはネットワーク遅延の影響が少ないためです。実際の本番環境ではクライアントとサーバーが分離しているため、リモートノードで測定すると結果が異なります。
ConnectionFactory cf = new ConnectionFactory();
cf.setHost("remote-host");
cf.setUsername("remote-user");
cf.setPassword("remote-password");
return cf.newConnection();
リモート環境での測定結果は以下のようになります。
個別送信(同期):231,541 ms
バッチ送信(同期):7,232 ms
非同期処理:6,332 ms
ネットワークを介する場合、個別送信は極端に遅くなります。バッチ処理と非同期処理は同等以上のパフォーマンスを示し、非同期処理がわずかに優位となります。バッチ処理は実装が容易ですが、エラー発生時の特定が困難です。一方、非同期処理は実装コストが高いものの、エラー制御の粒度が細かく、リソース効率も優れています。