RabbitMQのメッセージ応答と永続化

1. メッセージ応答

1.1 概念

メッセージ応答メカニズムは、消費者がメッセージを消費した後、RabbitMQにそのメッセージが正常に処理されたことを確認(acknowledge)する仕組みです。このメカニズムにより、メッセージが消費者によって処理された後に適切にキューから削除され、メッセージの消失を防ぐことができます。

2つのメッセージ応答メカニズム

  1. 自動応答(Auto Acknowledgment): 自動応答モードでは、メッセージが消費者によって受信された時点で、RabbitMQはすぐにメッセージを消費済みとしてマークします。消費者が明示的にRabbitMQに確認を送る必要はありません。
  • 自動応答の有効化:消費者がキューをサブスクライブする際に、autoAckパラメータをtrueに設定します。
channel.basicConsume(queueName, true, consumer);
  • 利点:シンプルで手動確認が必要ありません。
  • 適用場面:メッセージの処理タイミングや信頼性に対する要求が高くない場合、ある程度のメッセージの消失を許容できる場合。
  1. 手動応答(Manual Acknowledgment): 手動応答モードでは、消費者はメッセージを処理した後、RabbitMQに明示的な確認信号を送り、メッセージが安全に削除できることを伝えます。
  • 自動応答の無効化:消費者がキューをサブスクライブする際に、autoAckパラメータをfalseに設定します。
channel.basicConsume(queueName, false, consumer);
  • 手動確認の実行:消費者がメッセージを処理した後、RabbitMQに確認信号を送ります。
channel.basicAck(deliveryTag, false);
  • 利点:
  1. メッセージの処理タイミングと信頼性をより細かく制御できます。
  2. バッチ応答が可能であり、ネットワークの混雑を軽減できます。
  • 適用場面:メッセージの信頼性伝達に対する要求が高く、メッセージが成功裏に処理された後にのみ確認が必要な場合。

1.2 手動応答の例

プロデューサー:

package com.example.rabbitmq;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.MessageProperties;
import java.nio.charset.StandardCharsets;
import java.util.Scanner;

public class TaskProducer {
    public static final String TASK_QUEUE_NAME = "ack_queue";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();
        boolean durable = true;
        channel.queueDeclare(TASK_QUEUE_NAME, durable, false, false, null);
        Scanner scanner = new Scanner(System.in);
        while (scanner.hasNext()) {
            String message = scanner.next();
            channel.basicPublish("", TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes(StandardCharsets.UTF_8));
            System.out.println("プロデューサーがメッセージを送信しました: " + message);
        }
    }
}

コンシューマー:

package com.example.rabbitmq;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import java.util.concurrent.TimeUnit;

public class TaskConsumer {
    public static final String TASK_QUEUE_NAME = "ack_queue";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();
        System.out.println("C1は短い時間でメッセージを受信して処理します");

        DeliverCallback deliverCallback = (consumerTag, message) -> {
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            System.out.println("受信したメッセージ: " + new String(message.getBody()));
            channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
        };
        boolean autoAck = false;
        channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, (consumerTag -> System.out.println(consumerTag + "メッセージの消費がキャンセルされました")));
    }
}

2. 永続化

キューの永続化

// 永続的なキューを宣言
boolean durable = true;
channel.queueDeclare(TASK_QUEUE_NAME, durable, false, false, null);

メッセージの永続化

// プロデューサーが永続的なメッセージを送信する
channel.basicPublish("", TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes(StandardCharsets.UTF_8));

メッセージが永続化マークされているとしても、すべての障害シナリオでメッセージが絶対に失われないとは限りません。例えば、OSがディスクへの書き込み時にキャッシュを使用している場合、データがディスクに書き込まれる前にハードウェア障害やOSクラッシュが発生すると、キャッシュ内のデータが失われる可能性があります。

RabbitMQの配布メカニズム

  • ラウンドロビン配布(不公平配布):これはRabbitMQのデフォルトの配布戦略です。このモードでは、メッセージは順番に各消費者に送られます。各メッセージが確認された後、キューから削除され、次のメッセージが次の消費者に送られます。これにより、メッセージの公平な配布が保証されますが、消費者の処理能力や負荷状況は考慮されません。これにより、一部の消費者が高速に処理し、他の消費者が遅くなる可能性があり、全体のメッセージ処理効率に影響を与える可能性があります。
  • フェア配布(能者多勞):このモードでは、メッセージの配布は消費者の処理能力に基づいて決定されます。つまり、処理速度が速い消費者はより多くのメッセージを受け取り、処理速度が遅い消費者は少ないメッセージを受け取ります。この方法は、全体の処理効率を向上させ、各消費者が自身の処理能力に基づいて適切な数のメッセージを受け取れるようにすることを目指しています。
// basicQosを1に設定することでフェア配布を有効化
int prefetchCount = 1;
channel.basicQos(prefetchCount);

プレフェッチ値

RabbitMQのプレフェッチ(prefetch)は、消費者がキューからメッセージを取得する前に、キューが消費者に送ることができるメッセージの数を制御します。プレフェッチ値を設定することで、消費者がメッセージを過度に早く消費することを防ぎ、他の消費者がメッセージを適切に処理できるようにします。

RabbitMQでは、basicQosメソッドを使用してプレフェッチ値を設定できます。

// プレフェッチ値を設定し、一度に5つのメッセージを消費者に送信する
int prefetchCount = 5;
channel.basicQos(prefetchCount);

タグ: RabbitMQ メッセージ応答 キュー永続化 手動応答 自動応答

5月22日 13:02 投稿