RabbitMQは、分散システム間での信頼性のあるメッセージングを実現するオープンソースのメッセージブローカーです。本記事では、その基本概念と実際の利用例を解説します。
主な使用シナリオ
- システム統合と分散アーキテクチャ:オンラインショッピングサイトの注文処理システムと在庫管理システムの連携。メッセージキューを介して非同期処理を行うことで、システムの負荷分散と耐障害性を向上させます。
- 非同期処理の実装:ユーザー登録時のメール送信処理を別プロセスで実行し、クライアントへのレスポンス速度を向上させます。
- 高負荷時のリクエスト管理:セール期間中の大量アクセスをメッセージキューでバッファリングし、徐々に処理することで、システムの安定性を確保します。
AMQPプロトコル
Advanced Message Queuing Protocol(AMQP)は、オープンなアプリケーション層のメッセージングプロトコルです。これにより、異なる言語やプラットフォーム間での互換性が保たれます。
メッセージングモデルの違い
- キューモデル:各メッセージは複数の消費者の間で均等に配分され、各メッセージは1つの消費者によって処理されます。
- トピックモデル:メッセージは複数の消費者にブロードキャストされ、各消費者が全メッセージを受信します。
インストール手順
# EPELリポジトリの追加
rpm -Uvh https://dl.fedoraproject.org/pub/epel/epel-release-latest-7.noarch.rpm
# Erlangのインストール
yum install erlang
# RabbitMQのインストール
wget https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.8.15/rabbitmq-server-3.8.15-1.el7.noarch.rpm
rpm --import https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.8.15/rabbitmq-signing-key-public.asc
yum install rabbitmq-server-3.8.15-1.el7.noarch.rpm
基本的な使用例:Producer-Consumer
以下のPythonコードは、シンプルなメッセージ送受信の例です。
# 送信側(producer.py)
import pika
connection_params = pika.ConnectionParameters(
host='192.168.1.100',
port=5672,
virtual_host='/',
credentials=pika.PlainCredentials('app_user', 'secure_password')
)
connection = pika.BlockingConnection(connection_params)
channel = connection.channel()
channel.queue_declare(queue='order_processing')
channel.basic_publish(
exchange='',
routing_key='order_processing',
body='New order: 12345',
properties=pika.BasicProperties(delivery_mode=2)
)
print("Order message sent")
connection.close()
# 受信側(consumer.py)
import pika
connection_params = pika.ConnectionParameters(
host='192.168.1.100',
port=5672,
virtual_host='/',
credentials=pika.PlainCredentials('app_user', 'secure_password')
)
connection = pika.BlockingConnection(connection_params)
channel = connection.channel()
channel.queue_declare(queue='order_processing')
def process_order(ch, method, properties, body):
print(f"Processing order: {body.decode()}")
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_consume(
queue='order_processing',
on_message_callback=process_order,
auto_ack=False
)
print("Waiting for orders...")
channel.start_consuming()
ワークキュー(Task Queue)の実装
複数のワーカーでタスクを分散処理する場合の例です。
# タスク送信側(task_sender.py)
import sys
import pika
connection_params = pika.ConnectionParameters(
host='192.168.1.100',
port=5672,
virtual_host='/',
credentials=pika.PlainCredentials('app_user', 'secure_password')
)
connection = pika.BlockingConnection(connection_params)
channel = connection.channel()
channel.queue_declare(queue='task_queue', durable=True)
for i in range(10):
task_data = f"Task-{i} | Duration: {i+1} seconds"
channel.basic_publish(
exchange='',
routing_key='task_queue',
body=task_data,
properties=pika.BasicProperties(delivery_mode=2)
)
print(f"Sent: {task_data}")
connection.close()
# タスク受信側(worker.py)
import time
import pika
connection_params = pika.ConnectionParameters(
host='192.168.1.100',
port=5672,
virtual_host='/',
credentials=pika.PlainCredentials('app_user', 'secure_password')
)
connection = pika.BlockingConnection(connection_params)
channel = connection.channel()
channel.queue_declare(queue='task_queue', durable=True)
def process_task(ch, method, properties, body):
task_info = body.decode().split('|')
task_id = task_info[0].strip()
duration = int(task_info[1].split(':')[1].strip().split()[0])
print(f"Processing {task_id} for {duration} seconds")
time.sleep(duration)
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(
queue='task_queue',
on_message_callback=process_task,
auto_ack=False
)
print("Waiting for tasks...")
channel.start_consuming()
ファンアウトエクスチェンジの利用例
ブロードキャスト型メッセージングの例です。
# メッセージ送信側(log_sender.py)
import sys
import pika
connection_params = pika.ConnectionParameters(
host='192.168.1.100',
port=5672,
virtual_host='/',
credentials=pika.PlainCredentials('app_user', 'secure_password')
)
connection = pika.BlockingConnection(connection_params)
channel = connection.channel()
channel.exchange_declare(exchange='system_logs', exchange_type='fanout')
log_message = ' '.join(sys.argv[1:]) or 'System event detected'
channel.basic_publish(
exchange='system_logs',
routing_key='',
body=log_message
)
print(f"Sent: {log_message}")
connection.close()
# メッセージ受信側(log_receiver.py)
import pika
connection_params = pika.ConnectionParameters(
host='192.168.1.100',
port=5672,
virtual_host='/',
credentials=pika.PlainCredentials('app_user', 'secure_password')
)
connection = pika.BlockingConnection(connection_params)
channel = connection.channel()
channel.exchange_declare(exchange='system_logs', exchange_type='fanout')
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
channel.queue_bind(exchange='system_logs', queue=queue_name)
def handle_log(ch, method, properties, body):
print(f"Received log: {body.decode()}")
channel.basic_consume(
queue=queue_name,
on_message_callback=handle_log,
auto_ack=True
)
print("Waiting for logs...")
channel.start_consuming()
ダイレクトエクスチェンジの利用例
ルーティングキーに基づくメッセージのフィルタリングです。
# メッセージ送信側(direct_sender.py)
import sys
import pika
connection_params = pika.ConnectionParameters(
host='192.168.1.100',
port=5672,
virtual_host='/',
credentials=pika.PlainCredentials('app_user', 'secure_password')
)
connection = pika.BlockingConnection(connection_params)
channel = connection.channel()
channel.exchange_declare(exchange='log_direct', exchange_type='direct')
severity = sys.argv[1] if len(sys.argv) > 1 else 'info'
message = ' '.join(sys.argv[2:]) or 'Test message'
channel.basic_publish(
exchange='log_direct',
routing_key=severity,
body=message
)
print(f"Sent: {severity} - {message}")
connection.close()
# メッセージ受信側(direct_receiver.py)
import sys
import pika
connection_params = pika.ConnectionParameters(
host='192.168.1.100',
port=5672,
virtual_host='/',
credentials=pika.PlainCredentials('app_user', 'secure_password')
)
connection = pika.BlockingConnection(connection_params)
channel = connection.channel()
channel.exchange_declare(exchange='log_direct', exchange_type='direct')
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
severities = sys.argv[1:] or ['info', 'warning', 'error']
for severity in severities:
channel.queue_bind(exchange='log_direct', queue=queue_name, routing_key=severity)
def process_log(ch, method, properties, body):
print(f"Received: {method.routing_key} - {body.decode()}")
channel.basic_consume(
queue=queue_name,
on_message_callback=process_log,
auto_ack=True
)
print("Waiting for logs...")
channel.start_consuming()
トピックエクスチェンジの利用例
ワイルドカードを用いた柔軟なルーティングです。
# メッセージ送信側(topic_sender.py)
import pika
connection_params = pika.ConnectionParameters(
host='192.168.1.100',
port=5672,
virtual_host='/',
credentials=pika.PlainCredentials('app_user', 'secure_password')
)
connection = pika.BlockingConnection(connection_params)
channel = connection.channel()
channel.exchange_declare(exchange='weather_topic', exchange_type='topic')
routing_key = 'weather.japan.tokyo'
message = 'Tokyo weather: sunny'
channel.basic_publish(
exchange='weather_topic',
routing_key=routing_key,
body=message
)
print(f"Sent: {routing_key} - {message}")
connection.close()
# メッセージ受信側(topic_receiver.py)
import pika
connection_params = pika.ConnectionParameters(
host='192.168.1.100',
port=5672,
virtual_host='/',
credentials=pika.PlainCredentials('app_user', 'secure_password')
)
connection = pika.BlockingConnection(connection_params)
channel = connection.channel()
channel.exchange_declare(exchange='weather_topic', exchange_type='topic')
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
# バインディングキーの設定
channel.queue_bind(exchange='weather_topic', queue=queue_name, routing_key='weather.japan.*')
def handle_weather(ch, method, properties, body):
print(f"Received: {method.routing_key} - {body.decode()}")
channel.basic_consume(
queue=queue_name,
on_message_callback=handle_weather,
auto_ack=True
)
print("Waiting for weather updates...")
channel.start_consuming()
RPC(リモートプロシージャコール)の実装
RabbitMQを用いたRPCの基本的な実装例です。
# RPCサーバー(rpc_server.py)
import pika
import math
connection_params = pika.ConnectionParameters(
host='192.168.1.100',
port=5672,
virtual_host='/',
credentials=pika.PlainCredentials('app_user', 'secure_password')
)
connection = pika.BlockingConnection(connection_params)
channel = connection.channel()
channel.queue_declare(queue='rpc_queue')
def calculate_factorial(n):
return math.factorial(n)
def on_request(ch, method, props, body):
n = int(body)
print(f"Calculating factorial of {n}")
response = calculate_factorial(n)
ch.basic_publish(
exchange='',
routing_key=props.reply_to,
properties=pika.BasicProperties(correlation_id=props.correlation_id),
body=str(response)
)
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(on_request, queue='rpc_queue')
print("Awaiting RPC requests...")
channel.start_consuming()
# RPCクライアント(rpc_client.py)
import pika
import uuid
class FactorialRpcClient:
def __init__(self):
self.connection = pika.BlockingConnection(pika.ConnectionParameters(
host='192.168.1.100',
port=5672,
virtual_host='/',
credentials=pika.PlainCredentials('app_user', 'secure_password')
))
self.channel = self.connection.channel()
result = self.channel.queue_declare(exclusive=True)
self.callback_queue = result.method.queue
self.channel.basic_consume(
queue=self.callback_queue,
on_message_callback=self.on_response,
auto_ack=True
)
self.response = None
self.corr_id = None
def on_response(self, ch, method, props, body):
if self.corr_id == props.correlation_id:
self.response = body
def call(self, n):
self.response = None
self.corr_id = str(uuid.uuid4())
self.channel.basic_publish(
exchange='',
routing_key='rpc_queue',
properties=pika.BasicProperties(
reply_to=self.callback_queue,
correlation_id=self.corr_id
),
body=str(n)
)
while self.response is None:
self.connection.process_data_events()
return int(self.response)
client = FactorialRpcClient()
print(f"Factorial of 5: {client.call(5)}")
基本概念のまとめ
- ConnectionFactory、Connection、Channel:RabbitMQのAPIの基本要素。Connectionはソケット接続を管理し、Channelは具体的な操作を行うインターフェースです。
- Exchange:メッセージをキューにルーティングするためのコンポーネント。種類にはdirect、topic、fanoutなどがあります。
- Queue:メッセージを一時的に格納する領域です。
- Message acknowledgment:メッセージの処理完了を確認する仕組み。ACKを送信しないと、メッセージは再送されます。
- Message durability:永続化設定により、サーバー再起動後もメッセージが保持されます。
- Prefetch count:消費者が一度に処理できるメッセージ数を制限し、負荷分散を最適化します。
- Binding:ExchangeとQueueの関連付けを定義し、ルーティングルールを設定します。