1. メッセージ応答
1.1 概念
メッセージ応答メカニズムは、消費者がメッセージを消費した後、RabbitMQにそのメッセージが正常に処理されたことを確認(acknowledge)する仕組みです。このメカニズムにより、メッセージが消費者によって処理された後に適切にキューから削除され、メッセージの消失を防ぐことができます。
2つのメッセージ応答メカニズム
- 自動応答(Auto Acknowledgment): 自動応答モードでは、メッセージが消費者によって受信された時点で、RabbitMQはすぐにメッセージを消費済みとしてマークします。消費者が明示的にRabbitMQに確認を送る必要はありません。
- 自動応答の有効化:消費者がキューをサブスクライブする際に、
autoAckパラメータをtrueに設定します。
channel.basicConsume(queueName, true, consumer);
- 利点:シンプルで手動確認が必要ありません。
- 適用場面:メッセージの処理タイミングや信頼性に対する要求が高くない場合、ある程度のメッセージの消失を許容できる場合。
- 手動応答(Manual Acknowledgment): 手動応答モードでは、消費者はメッセージを処理した後、RabbitMQに明示的な確認信号を送り、メッセージが安全に削除できることを伝えます。
- 自動応答の無効化:消費者がキューをサブスクライブする際に、
autoAckパラメータをfalseに設定します。
channel.basicConsume(queueName, false, consumer);
- 手動確認の実行:消費者がメッセージを処理した後、RabbitMQに確認信号を送ります。
channel.basicAck(deliveryTag, false);
- 利点:
- メッセージの処理タイミングと信頼性をより細かく制御できます。
- バッチ応答が可能であり、ネットワークの混雑を軽減できます。
- 適用場面:メッセージの信頼性伝達に対する要求が高く、メッセージが成功裏に処理された後にのみ確認が必要な場合。
1.2 手動応答の例
プロデューサー:
package com.example.rabbitmq;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.MessageProperties;
import java.nio.charset.StandardCharsets;
import java.util.Scanner;
public class TaskProducer {
public static final String TASK_QUEUE_NAME = "ack_queue";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
boolean durable = true;
channel.queueDeclare(TASK_QUEUE_NAME, durable, false, false, null);
Scanner scanner = new Scanner(System.in);
while (scanner.hasNext()) {
String message = scanner.next();
channel.basicPublish("", TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes(StandardCharsets.UTF_8));
System.out.println("プロデューサーがメッセージを送信しました: " + message);
}
}
}
コンシューマー:
package com.example.rabbitmq;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import java.util.concurrent.TimeUnit;
public class TaskConsumer {
public static final String TASK_QUEUE_NAME = "ack_queue";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
System.out.println("C1は短い時間でメッセージを受信して処理します");
DeliverCallback deliverCallback = (consumerTag, message) -> {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
System.out.println("受信したメッセージ: " + new String(message.getBody()));
channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
};
boolean autoAck = false;
channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, (consumerTag -> System.out.println(consumerTag + "メッセージの消費がキャンセルされました")));
}
}
2. 永続化
キューの永続化
// 永続的なキューを宣言
boolean durable = true;
channel.queueDeclare(TASK_QUEUE_NAME, durable, false, false, null);
メッセージの永続化
// プロデューサーが永続的なメッセージを送信する
channel.basicPublish("", TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes(StandardCharsets.UTF_8));
メッセージが永続化マークされているとしても、すべての障害シナリオでメッセージが絶対に失われないとは限りません。例えば、OSがディスクへの書き込み時にキャッシュを使用している場合、データがディスクに書き込まれる前にハードウェア障害やOSクラッシュが発生すると、キャッシュ内のデータが失われる可能性があります。
RabbitMQの配布メカニズム
- ラウンドロビン配布(不公平配布):これはRabbitMQのデフォルトの配布戦略です。このモードでは、メッセージは順番に各消費者に送られます。各メッセージが確認された後、キューから削除され、次のメッセージが次の消費者に送られます。これにより、メッセージの公平な配布が保証されますが、消費者の処理能力や負荷状況は考慮されません。これにより、一部の消費者が高速に処理し、他の消費者が遅くなる可能性があり、全体のメッセージ処理効率に影響を与える可能性があります。
- フェア配布(能者多勞):このモードでは、メッセージの配布は消費者の処理能力に基づいて決定されます。つまり、処理速度が速い消費者はより多くのメッセージを受け取り、処理速度が遅い消費者は少ないメッセージを受け取ります。この方法は、全体の処理効率を向上させ、各消費者が自身の処理能力に基づいて適切な数のメッセージを受け取れるようにすることを目指しています。
// basicQosを1に設定することでフェア配布を有効化
int prefetchCount = 1;
channel.basicQos(prefetchCount);
プレフェッチ値
RabbitMQのプレフェッチ(prefetch)は、消費者がキューからメッセージを取得する前に、キューが消費者に送ることができるメッセージの数を制御します。プレフェッチ値を設定することで、消費者がメッセージを過度に早く消費することを防ぎ、他の消費者がメッセージを適切に処理できるようにします。
RabbitMQでは、basicQosメソッドを使用してプレフェッチ値を設定できます。
// プレフェッチ値を設定し、一度に5つのメッセージを消費者に送信する
int prefetchCount = 5;
channel.basicQos(prefetchCount);