メッセージミドルウェアの概要
メッセージミドルウェアは、メッセージの転送過程においてメッセージを保持するコンテナです。メッセージを送信元から宛先へ中継する際に仲介者の役割を果たします。キューの主な目的はルーティングとメッセージ配信の保証です。受信者が利用できない場合、メッセージキューはメッセージを保持し、正常に配信できるまで待機します。もちろん、メッセージの保存には期限も設定されます。
メッセージミドルウェアの特徴
1. 非同期処理モデルの採用
メッセージ送信者は応答を待つことなくメッセージを送信できます。送信者は仮想チャネル(トピックまたはキュー)にメッセージを送信し、受信者はそのチャネルを購読または監視します。1つのメッセージは最終的に1つまたは複数の受信者に転送され、これらの受信者は送信者に同期応答する必要はありません。プロセス全体が非同期です。例えば、ユーザー登録後、一定時間経過してからメールやSMSを送信する場合などです。
2. アプリケーション間の疎結合関係
送信者と受信者は互いを理解する必要がなく、メッセージの確認のみで十分です。送信者と受信者は同時にオンラインである必要もありません。例えば、オンライン決済システムではデータの最終一貫性を保証するため、決済処理完了後に結果をメッセージミドルウェアに格納し、注文システムで注文ステータスを更新します。これにより、2つのシステムはメッセージミドルウェアによって分離されます。
メッセージ伝達モデル
1. ポイントツーポイントモデル
ポイントツーポイントモデルは、メッセージプロデューサーとコンシューマー間の直接通信に使用されます。メッセージプロデューサーは特定の名前で識別されるコンシューマーにメッセージを送信します。この名前は実際にはコンシューマーサービス内のキュー(Queue)を指し、メッセージがコンシューマーに伝達される前にこのキューに保存されます。キューのメッセージはメモリ上に保存されることもあれば、永続化されることもあり、メッセージサービスに障害が発生してもメッセージが配信されることを保証します。
ポイントツーポイントメッセージミドルウェアは通常、メッセージキューサービス、メッセージ伝達サービス、メッセージキュー、およびメッセージアプリケーションインターフェースAPIで構成されます。
特徴:
- 各メッセージは1つのコンシューマーのみが消費
- 送信者と受信者間に時間的依存関係なし
- 受信者はメッセージ受信と処理の成功を確認
2. パブリッシュ/サブスクライブモデル(Pub/Sub)
パブリッシャー/サブスクライバーモデルは、特定のメッセージトピックへのメッセージ生成をサポートします。0個または複数のサブスクライバーが特定のメッセージトピックからのメッセージ受信に関心を持つ可能性があります。このモデルでは、パブリッシャーとサブスクライバーは互いを知りません。この模式は匿名掲示板のようなものです。この模式は「複数のコンシューマーがメッセージを受け取り、パブリッシャーとサブスクライバー間に時間的依存関係がある」と要約されます。パブリッシャーはサブスクライバーが購読できるようにサブスクリプションを確立する必要があります。サブスクライバーは持続的なサブスクリプションを確立しない限り、継続的にアクティブな状態を維持してメッセージを受信する必要があります。この場合、サブスクライバーが接続されていないときに発行されたメッセージは、サブスクライバーが再接続されたときに再発行されます。
パブリッシュ/サブスクライブモデルの特性:
- 各メッセージには複数のサブスクライバーが存在
- クライアントは購読後にのみメッセージを受信可能
- 永続的サブスクリプションと非永続的サブスクリプションの存在
メッセージミドルウェアの適用シナリオ
-
ユーザー登録の非同期処理 ユーザー登録成功時にメールまたはSMSを送信 ネットワークの不安定やサービス停止が発生した場合のタイムアウトメカニズムやリトライ機構の必要性
-
ログ分析 ログの集中収集とPV計算、ユーザー行動分析
-
データ複製
- データをソースから複数の宛先へ順序を保証して複製
- クロスデータセンターデータ転送、検索、オフラインデータ計算など
-
遅延メッセージ送信と一時保存
- メッセージミドルウェアを信頼できる一時保存領域として利用
- 定期的なメッセージ配信、例えばシステムの負荷テスト
-
メッセージブロードキャスト
- キャッシュデータの同期更新
- アプリケーションへのデータプッシュ
- 例えば、ローカルキャッシュの更新、商品価格の変更
RabbitMQの紹介
RabbitMQはAMQP(Advanced Message Queuing Protocol)に基づいた、完全で再利用可能な企業向けメッセージシステムです。Mozilla Public Licenseオープンソースプロトコルに従っています。
RabbitMQは人気のあるオープンソースメッセージキューショステムであり、Erlang言語で開発されています。AMQP標準の実装であり、堅牢な分散型メッセージングを提供します。
RabbitMQの主要コンポーネント
-
サーバー(ブローカー): クライアント接続を受け入れ、AMQPメッセージキューのルーティング機能を実現するプロセス
-
仮想ホスト(Virtual Host): 権限制御グループのような概念的なもの。1つの仮想ホストには複数のExchangeとQueueを含めることができますが、権限制御の最小単位は仮想ホストです
-
Exchange: プロデューサーからのメッセージを受け取り、Bindingルールに従ってメッセージをサーバーのキューにルーティングします。Exchangeのタイプはルーティング動作を決定します
-
Message Queue: コンシューマーによってまだ消費されていないメッセージを保存する場所
-
Message: HeaderとBodyで構成され、Headerはプロデューサーが追加した各種属性の集合です
Exchangeの種類
1. Direct Exchange ルーティングキーを直接処理します。キューをExchangeにバインドする際に、メッセージが特定のルーティングキーと完全に一致する必要があります。これは完全一致です。キューが"dog"というルーティングキーを要求する場合、"dog"とマークされたメッセージのみが転送され、"dog.puppy"や"dog.guard"は転送されません。
2. Fanout Exchange ルーティングキーを無視して、単純にキューをExchangeにバインドします。Exchangeに送信されたメッセージは、そのExchangeにバインドされたすべてのキューに転送されます。サブネットブロードキャストに似ており、サブネット内のすべてのホストがメッセージのコピーを受け取ります。
3. Topic Exchange メッセージのルーティングキーとバインディングキーのパターンマッチングによって、メッセージをバインドされたキューにルーティングします。このルータータイプは、经典的なパブリッシュ/サブスクライブメッセージ伝達モデルをサポートするために使用できます。
RabbitMQのインストールと設定
- インストール(yumを使用)
yum install -y rabbitmq-server
- プラグインの有効化
/usr/lib/rabbitmq/bin/rabbitmq-plugins list
/usr/lib/rabbitmq/bin/rabbitmq-plugins enable rabbitmq_management
- サービスの起動
/etc/init.d/rabbitmq-server start
- 仮想ホストの作成
rabbitmqctl add_vhost test
- ユーザー作成と権限設定
rabbitmqctl add_user admin admin
rabbitmqctl set_permissions -p test admin ".*" ".*" ".*"
PythonでのRabbitMQ使用例
プロデューサー例:
#!/usr/bin/env python
import pika
import time
# 接続の確立
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
# キューの宣言
channel.queue_declare(queue='demo_queue')
# メッセージの送信
for i in range(10):
message = f"メッセージ番号 {i+1}"
channel.basic_publish(exchange='', routing_key='demo_queue', body=message)
print(f" [送信] {message}")
time.sleep(0.5)
# 接続のクローズ
connection.close()
コンシューマー例:
#!/usr/bin/env python
import pika
import json
# 接続の確立
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
# キューの宣言
channel.queue_declare(queue='demo_queue')
# コールバック関数の定義
def process_message(ch, method, properties, body):
try:
message = json.loads(body.decode())
print(f" [受信] {message}")
# メッセージ処理のシミュレーション
time.sleep(1)
print(" [処理完了]")
# アクノウレッジメントの送信
ch.basic_ack(delivery_tag=method.delivery_tag)
except Exception as e:
print(f"エラー: {e}")
# メッセージの消費を開始
channel.basic_consume(queue='demo_queue', on_message_callback=process_message)
print(' [*] メッセージ待機中...CTRL+Cで終了')
channel.start_consuming()
RabbitMQ管理インターフェース Web管理インターフェースは http://127.0.0.1:15672 でアクセスできます。デフォルトのユーザー名とパスワードはguest/guestです。
トラブルシューティング
サービス起動時のエラー: Error: unable to connect to node rabbit@hostname: nodedown
原因: ホスト名が正しく解決できない。解決策として、/etc/hostsファイルにホスト名を追加します。