RabbitMQのメッセージング機構と実装パターン

RabbitMQは、分散システム間での信頼性のあるメッセージングを実現するオープンソースのメッセージブローカーです。本記事では、その基本概念と実際の利用例を解説します。

主な使用シナリオ

  • システム統合と分散アーキテクチャ:オンラインショッピングサイトの注文処理システムと在庫管理システムの連携。メッセージキューを介して非同期処理を行うことで、システムの負荷分散と耐障害性を向上させます。
  • 非同期処理の実装:ユーザー登録時のメール送信処理を別プロセスで実行し、クライアントへのレスポンス速度を向上させます。
  • 高負荷時のリクエスト管理:セール期間中の大量アクセスをメッセージキューでバッファリングし、徐々に処理することで、システムの安定性を確保します。

AMQPプロトコル

Advanced Message Queuing Protocol(AMQP)は、オープンなアプリケーション層のメッセージングプロトコルです。これにより、異なる言語やプラットフォーム間での互換性が保たれます。

メッセージングモデルの違い

  • キューモデル:各メッセージは複数の消費者の間で均等に配分され、各メッセージは1つの消費者によって処理されます。
  • トピックモデル:メッセージは複数の消費者にブロードキャストされ、各消費者が全メッセージを受信します。

インストール手順

# EPELリポジトリの追加
rpm -Uvh https://dl.fedoraproject.org/pub/epel/epel-release-latest-7.noarch.rpm

# Erlangのインストール
yum install erlang

# RabbitMQのインストール
wget https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.8.15/rabbitmq-server-3.8.15-1.el7.noarch.rpm
rpm --import https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.8.15/rabbitmq-signing-key-public.asc
yum install rabbitmq-server-3.8.15-1.el7.noarch.rpm

基本的な使用例:Producer-Consumer

以下のPythonコードは、シンプルなメッセージ送受信の例です。

# 送信側(producer.py)
import pika

connection_params = pika.ConnectionParameters(
    host='192.168.1.100',
    port=5672,
    virtual_host='/',
    credentials=pika.PlainCredentials('app_user', 'secure_password')
)
connection = pika.BlockingConnection(connection_params)
channel = connection.channel()
channel.queue_declare(queue='order_processing')

channel.basic_publish(
    exchange='',
    routing_key='order_processing',
    body='New order: 12345',
    properties=pika.BasicProperties(delivery_mode=2)
)
print("Order message sent")
connection.close()
# 受信側(consumer.py)
import pika

connection_params = pika.ConnectionParameters(
    host='192.168.1.100',
    port=5672,
    virtual_host='/',
    credentials=pika.PlainCredentials('app_user', 'secure_password')
)
connection = pika.BlockingConnection(connection_params)
channel = connection.channel()
channel.queue_declare(queue='order_processing')

def process_order(ch, method, properties, body):
    print(f"Processing order: {body.decode()}")
    ch.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_consume(
    queue='order_processing',
    on_message_callback=process_order,
    auto_ack=False
)
print("Waiting for orders...")
channel.start_consuming()

ワークキュー(Task Queue)の実装

複数のワーカーでタスクを分散処理する場合の例です。

# タスク送信側(task_sender.py)
import sys
import pika

connection_params = pika.ConnectionParameters(
    host='192.168.1.100',
    port=5672,
    virtual_host='/',
    credentials=pika.PlainCredentials('app_user', 'secure_password')
)
connection = pika.BlockingConnection(connection_params)
channel = connection.channel()
channel.queue_declare(queue='task_queue', durable=True)

for i in range(10):
    task_data = f"Task-{i} | Duration: {i+1} seconds"
    channel.basic_publish(
        exchange='',
        routing_key='task_queue',
        body=task_data,
        properties=pika.BasicProperties(delivery_mode=2)
    )
    print(f"Sent: {task_data}")
connection.close()
# タスク受信側(worker.py)
import time
import pika

connection_params = pika.ConnectionParameters(
    host='192.168.1.100',
    port=5672,
    virtual_host='/',
    credentials=pika.PlainCredentials('app_user', 'secure_password')
)
connection = pika.BlockingConnection(connection_params)
channel = connection.channel()
channel.queue_declare(queue='task_queue', durable=True)

def process_task(ch, method, properties, body):
    task_info = body.decode().split('|')
    task_id = task_info[0].strip()
    duration = int(task_info[1].split(':')[1].strip().split()[0])
    print(f"Processing {task_id} for {duration} seconds")
    time.sleep(duration)
    ch.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_qos(prefetch_count=1)
channel.basic_consume(
    queue='task_queue',
    on_message_callback=process_task,
    auto_ack=False
)
print("Waiting for tasks...")
channel.start_consuming()

ファンアウトエクスチェンジの利用例

ブロードキャスト型メッセージングの例です。

# メッセージ送信側(log_sender.py)
import sys
import pika

connection_params = pika.ConnectionParameters(
    host='192.168.1.100',
    port=5672,
    virtual_host='/',
    credentials=pika.PlainCredentials('app_user', 'secure_password')
)
connection = pika.BlockingConnection(connection_params)
channel = connection.channel()
channel.exchange_declare(exchange='system_logs', exchange_type='fanout')

log_message = ' '.join(sys.argv[1:]) or 'System event detected'
channel.basic_publish(
    exchange='system_logs',
    routing_key='',
    body=log_message
)
print(f"Sent: {log_message}")
connection.close()
# メッセージ受信側(log_receiver.py)
import pika

connection_params = pika.ConnectionParameters(
    host='192.168.1.100',
    port=5672,
    virtual_host='/',
    credentials=pika.PlainCredentials('app_user', 'secure_password')
)
connection = pika.BlockingConnection(connection_params)
channel = connection.channel()
channel.exchange_declare(exchange='system_logs', exchange_type='fanout')

result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
channel.queue_bind(exchange='system_logs', queue=queue_name)

def handle_log(ch, method, properties, body):
    print(f"Received log: {body.decode()}")

channel.basic_consume(
    queue=queue_name,
    on_message_callback=handle_log,
    auto_ack=True
)
print("Waiting for logs...")
channel.start_consuming()

ダイレクトエクスチェンジの利用例

ルーティングキーに基づくメッセージのフィルタリングです。

# メッセージ送信側(direct_sender.py)
import sys
import pika

connection_params = pika.ConnectionParameters(
    host='192.168.1.100',
    port=5672,
    virtual_host='/',
    credentials=pika.PlainCredentials('app_user', 'secure_password')
)
connection = pika.BlockingConnection(connection_params)
channel = connection.channel()
channel.exchange_declare(exchange='log_direct', exchange_type='direct')

severity = sys.argv[1] if len(sys.argv) > 1 else 'info'
message = ' '.join(sys.argv[2:]) or 'Test message'
channel.basic_publish(
    exchange='log_direct',
    routing_key=severity,
    body=message
)
print(f"Sent: {severity} - {message}")
connection.close()
# メッセージ受信側(direct_receiver.py)
import sys
import pika

connection_params = pika.ConnectionParameters(
    host='192.168.1.100',
    port=5672,
    virtual_host='/',
    credentials=pika.PlainCredentials('app_user', 'secure_password')
)
connection = pika.BlockingConnection(connection_params)
channel = connection.channel()
channel.exchange_declare(exchange='log_direct', exchange_type='direct')

result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

severities = sys.argv[1:] or ['info', 'warning', 'error']
for severity in severities:
    channel.queue_bind(exchange='log_direct', queue=queue_name, routing_key=severity)

def process_log(ch, method, properties, body):
    print(f"Received: {method.routing_key} - {body.decode()}")

channel.basic_consume(
    queue=queue_name,
    on_message_callback=process_log,
    auto_ack=True
)
print("Waiting for logs...")
channel.start_consuming()

トピックエクスチェンジの利用例

ワイルドカードを用いた柔軟なルーティングです。

# メッセージ送信側(topic_sender.py)
import pika

connection_params = pika.ConnectionParameters(
    host='192.168.1.100',
    port=5672,
    virtual_host='/',
    credentials=pika.PlainCredentials('app_user', 'secure_password')
)
connection = pika.BlockingConnection(connection_params)
channel = connection.channel()
channel.exchange_declare(exchange='weather_topic', exchange_type='topic')

routing_key = 'weather.japan.tokyo'
message = 'Tokyo weather: sunny'
channel.basic_publish(
    exchange='weather_topic',
    routing_key=routing_key,
    body=message
)
print(f"Sent: {routing_key} - {message}")
connection.close()
# メッセージ受信側(topic_receiver.py)
import pika

connection_params = pika.ConnectionParameters(
    host='192.168.1.100',
    port=5672,
    virtual_host='/',
    credentials=pika.PlainCredentials('app_user', 'secure_password')
)
connection = pika.BlockingConnection(connection_params)
channel = connection.channel()
channel.exchange_declare(exchange='weather_topic', exchange_type='topic')

result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

# バインディングキーの設定
channel.queue_bind(exchange='weather_topic', queue=queue_name, routing_key='weather.japan.*')

def handle_weather(ch, method, properties, body):
    print(f"Received: {method.routing_key} - {body.decode()}")

channel.basic_consume(
    queue=queue_name,
    on_message_callback=handle_weather,
    auto_ack=True
)
print("Waiting for weather updates...")
channel.start_consuming()

RPC(リモートプロシージャコール)の実装

RabbitMQを用いたRPCの基本的な実装例です。

# RPCサーバー(rpc_server.py)
import pika
import math

connection_params = pika.ConnectionParameters(
    host='192.168.1.100',
    port=5672,
    virtual_host='/',
    credentials=pika.PlainCredentials('app_user', 'secure_password')
)
connection = pika.BlockingConnection(connection_params)
channel = connection.channel()
channel.queue_declare(queue='rpc_queue')

def calculate_factorial(n):
    return math.factorial(n)

def on_request(ch, method, props, body):
    n = int(body)
    print(f"Calculating factorial of {n}")
    response = calculate_factorial(n)
    ch.basic_publish(
        exchange='',
        routing_key=props.reply_to,
        properties=pika.BasicProperties(correlation_id=props.correlation_id),
        body=str(response)
    )
    ch.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_qos(prefetch_count=1)
channel.basic_consume(on_request, queue='rpc_queue')
print("Awaiting RPC requests...")
channel.start_consuming()
# RPCクライアント(rpc_client.py)
import pika
import uuid

class FactorialRpcClient:
    def __init__(self):
        self.connection = pika.BlockingConnection(pika.ConnectionParameters(
            host='192.168.1.100',
            port=5672,
            virtual_host='/',
            credentials=pika.PlainCredentials('app_user', 'secure_password')
        ))
        self.channel = self.connection.channel()
        result = self.channel.queue_declare(exclusive=True)
        self.callback_queue = result.method.queue
        self.channel.basic_consume(
            queue=self.callback_queue,
            on_message_callback=self.on_response,
            auto_ack=True
        )
        self.response = None
        self.corr_id = None

    def on_response(self, ch, method, props, body):
        if self.corr_id == props.correlation_id:
            self.response = body

    def call(self, n):
        self.response = None
        self.corr_id = str(uuid.uuid4())
        self.channel.basic_publish(
            exchange='',
            routing_key='rpc_queue',
            properties=pika.BasicProperties(
                reply_to=self.callback_queue,
                correlation_id=self.corr_id
            ),
            body=str(n)
        )
        while self.response is None:
            self.connection.process_data_events()
        return int(self.response)

client = FactorialRpcClient()
print(f"Factorial of 5: {client.call(5)}")

基本概念のまとめ

  • ConnectionFactory、Connection、Channel:RabbitMQのAPIの基本要素。Connectionはソケット接続を管理し、Channelは具体的な操作を行うインターフェースです。
  • Exchange:メッセージをキューにルーティングするためのコンポーネント。種類にはdirect、topic、fanoutなどがあります。
  • Queue:メッセージを一時的に格納する領域です。
  • Message acknowledgment:メッセージの処理完了を確認する仕組み。ACKを送信しないと、メッセージは再送されます。
  • Message durability:永続化設定により、サーバー再起動後もメッセージが保持されます。
  • Prefetch count:消費者が一度に処理できるメッセージ数を制限し、負荷分散を最適化します。
  • Binding:ExchangeとQueueの関連付けを定義し、ルーティングルールを設定します。

タグ: RabbitMQ AMQP message-broker distributed-systems async-messaging

6月16日 22:31 投稿