この記事では、RabbitMQを用いてPHPでタスクキュー(ワーカーキュー)を構築し、複数のワーカープロセス間で重い処理を効率的に分散する方法を解説します。メッセージの永続化や確認応答(acknowledgement)、公平なディスパッチといった信頼性向上のための機能も実装します。
前提条件
RabbitMQがローカルホストの標準ポート(5672)で動作していることを前提とします。認証情報や接続先が異なる場合は、コード内の接続設定を適宜変更してください。
タスクの送信(new_task.php)
タスクは文字列として表現され、その中に含まれるドット(.)の数に応じて処理時間が決まります(1ドット = 1秒)。以下はタスクをキューに送信するスクリプトです:
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
// 永続的なキューを宣言
$channel->queue_declare('task_queue', false, true, false, false);
$data = implode(' ', array_slice($argv, 1)) ?: "Hello World!";
$msg = new AMQPMessage(
$data,
['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]
);
$channel->basic_publish($msg, '', 'task_queue');
echo " [x] Sent '$data'\n";
$channel->close();
$connection->close();
?>
タスクの処理(worker.php)
ワーカーはキューからメッセージを取得し、ドットの数だけsleep()で処理をシミュレートします。処理完了後に明示的にACKを返すことで、メッセージの信頼性を確保します。
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$channel->queue_declare('task_queue', false, true, false, false);
echo " [*] Waiting for messages. To exit press CTRL+C\n";
$callback = function ($msg) {
echo " [x] Received '{$msg->body}'\n";
sleep(substr_count($msg->body, '.'));
echo " [x] Done\n";
// ACKを送信
$msg->ack();
};
// ワーカー1つにつき未処理メッセージを1つまで許可
$channel->basic_qos(null, 1, null);
$channel->basic_consume('task_queue', '', false, false, false, false, $callback);
while ($channel->is_consuming()) {
$channel->wait();
}
$channel->close();
$connection->close();
?>
キューとメッセージの永続化
RabbitMQサーバーが再起動してもタスクが失われないようにするため、キューとメッセージの両方を永続化します:
- キュー宣言時に
durable=trueを指定(queue_declareの第3引数) - メッセージ作成時に
delivery_mode=2(AMQPMessage::DELIVERY_MODE_PERSISTENT)を設定
注意:既存の非永続キューと同じ名前で永続キューを宣言するとエラーになります。そのため、新しいキュー名(例:task_queue)を使用します。
メッセージ確認(Acknowledgement)
ワーカーがタスク処理中にクラッシュした場合でもメッセージが失われないように、RabbitMQはACKを受け取るまでメッセージを再配信しません。basic_consumeの第4引数をfalse(auto_ack=false)にし、処理完了後に手動でbasic_ackを呼び出します。
公平なディスパッチ(Fair Dispatch)
デフォルトのラウンドロビン方式では、処理負荷が偏ることがあります。これを防ぐため、basic_qos(null, 1, null)を設定して、各ワーカーが一度に1つのメッセージしか受け取らないようにします。これにより、空いているワーカーにのみ新しいタスクが割り当てられます。