RabbitMQ PHPドキュメント:トピック型交換

2019年12月10日10:05:11

原文:https://www.rabbitmq.com/tutorials/tutorial-five-php.html

トピック

(php-amqplibを使用)

前提条件

このチュートリアルでは、RabbitMQが標準ポート(5672)でローカルホスト上で実行されていることを前提としています。別のホスト、ポート、または認証情報を使用する場合は、接続設定を調整する必要があります。

お手伝いが必要ですか

このチュートリアルの途中で困った場合は、メールリストにご連絡ください。

前回のチュートリアルでは、ログシステムを改善しました。ファンアウトエクスチェンジではなくダイレクトエクスチェンジを使用し、ログの選択的受信が可能になりました。

ダイレクトエクスチェンジを使用することでシステムは改善されましたが、複数の条件に基づくルーティングには限界があります。

ログシステムにおいて、ログの重要度だけでなく、ログを生成したソースにも基づいてログを購読したい場合があります。これはsyslogユーティリティで知られる概念です。これは、重要度(info/warn/crit...)とツール(auth/cron/kern...)に基づいてログをルーティングします。

このアプローチにより、例えば「cron」からの重大なエラーのみを監視したり、「kern」からのすべてのログを取得したりすることが可能になります。

この機能をログシステムに実装するために、より高度なトピックエクスチェンジについて学びます。

トピックエクスチェンジ

トピックエクスチェンジに送信されるメッセージのrouting_keyは任意ではありません。ドットで区切られた単語のリストである必要があります。これらの単語は任意の文字列であり、通常はメッセージに関連する機能を示します。有効なrouting_keyの例としては、「stock.usd.nyse」、「nyse.vmw」、「quick.orange.rabbit」などがあります。routing_keyには最大255バイトまで任意の数の単語を含めることができます。

バインドキーも同様の形式でなければなりません。トピックエクスチェンジのロジックは、ダイレクトエクスチェンジと似ています。特定のrouting_keyで送信されたメッセージは、一致するバインドキーを持つすべてのキューに配信されます。ただし、バインドキーには2つの重要な特別なケースがあります:

  • *(アスタリスク)は1つの単語に置き換えられます。
  • #(ハッシュ)は0個以上の単語に置き換えられます。

この説明は例で最も分かりやすいです:

この例では、動物を表すすべてのメッセージを送信します。メッセージは3つの語(2つのドット)を持つrouting_keyで送信されます。routing_keyの最初の語は速度、2番目の語は色、3番目の語は種族を示します:「<speed>.<color>.<species>」。

3つのバインドを作成しました:Q1は「*.orange.*」というバインドキーで、Q2は「*.*.rabbit」と「lazy.#」でバインドされました。

これらのバインドは次のようにまとめられます:

  • Q1はすべてのオレンジ色の動物に興味があります。
  • Q2はラビットに関するすべての情報と、怠け者のすべての情報に興味があります。

routing_keyが「quick.orange.rabbit」のメッセージは両方のキューに送られます。「lazy.orange.elephant」のメッセージも両方に送られます。一方、「quick.orange.fox」は最初のキューのみに、「lazy.brown.fox」は2番目のキューのみに送られます。「lazy.pink.rabbit」は2番目のキューに一度だけ送られますが、2つのバインドが一致します。「quick.brown.fox」はどのバインドにも一致しないため破棄されます。

契約を破って1つまたは4つの語を持つメッセージを送信した場合どうなるでしょうか?「orange」や「quick.orange.male.rabbit」のようなメッセージはいずれもバインドに一致せず、失われます。

一方、「lazy.orange.male.rabbit」は4つの語を持っていますが、最後のバインドに一致し、2番目のキューに送られます。

トピックエクスチェンジ

トピックエクスチェンジは他のエクスチェンジと同様に機能します。

バインドキーが「#」(ハッシュ)でバインドされたキューは、routing_keyに関係なくすべてのメッセージを受け取り、ファンアウトエクスチェンジと同じように動作します。

特殊文字「*」(アスタリスク)と「#」(ハッシュ)を使用しない場合、トピックエクスチェンジはダイレクトエクスチェンジと同じように動作します。

組み合わせる

ログシステムにトピックエクスチェンジを使用します。ログのrouting_keyが2つの語「<facility>.<severity>」を含むと仮定します。

このコードは前回のチュートリアルのコードとほぼ同じです。

emit_log_topic.phpのコード:

<?php

require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

$channel->exchange_declare('topic_logs', 'topic', false, false, false);

$routing_key = isset($argv[1]) && !empty($argv[1]) ? $argv[1] : 'anonymous.info';
$data = implode(' ', array_slice($argv, 2));
if (empty($data)) {
    $data = "Hello World!";
}

$msg = new AMQPMessage($data);

$channel->basic_publish($msg, 'topic_logs', $routing_key);

echo ' [x] Sent ', $routing_key, ':', $data, "\n";

$channel->close();
$connection->close();

receive_logs_topic.phpのコード:

require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

$channel->exchange_declare('topic_logs', 'topic', false, false, false);

list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);

$binding_keys = array_slice($argv, 1);
if (empty($binding_keys)) {
    file_put_contents('php://stderr', "Usage: $argv[0] [binding_key]\n");
    exit(1);
}

foreach ($binding_keys as $binding_key) {
    $channel->queue_bind($queue_name, 'topic_logs', $binding_key);
}

echo " [*] Waiting for logs. To exit press CTRL+C\n";

$callback = function ($msg) {
    echo ' [x] ', $msg->delivery_info['routing_key'], ':', $msg->body, "\n";
};

$channel->basic_consume($queue_name, '', false, true, false, false, $callback);

while ($channel->is_consuming()) {
    $channel->wait();
}

$channel->close();
$connection->close();

すべてのログを受信する:

php receive_logs_topic.php "#"

「kern」ファシリティからのすべてのログを受信する:

php receive_logs_topic.php "kern.*"

または、「critical」ログのみを受信する:

php receive_logs_topic.php "*.critical"

複数のバインドを作成できます:

php receive_logs_topic.php "kern.*" "*.critical"

routing_key「kern.critical」を持つログを送信します:

php emit_log_topic.php "kern.critical" "A critical kernel error"

これらのプログラムを遊んでみてください。このコードはrouting_keyやbinding_keyに何らかの仮定をしていませんので、3つ以上のrouting_key引数を使用することも可能です。

(embed_log_topic.phpとreceive_logs_topic.phpの完全なソースコード)

次のチュートリアル6では、リモートプロシージャコールとして双方向メッセージをどのように処理するかを確認します。

タグ: RabbitMQ PHP トピックエクスチェンジ メッセージング php-amqplib

6月29日 21:10 投稿