PHP RabbitMQ パブリッシュ/サブスクライブ パターン(ファンアウトパターン)
RabbitMQのファンアウトモードは、プロデューサーが送信したメッセージが複数の異なるキューで処理されるパターンであり、以下のアーキテクチャ図に示されています。
ファンアウトエクスチェンジは、メッセージをすべてのバインドされたキューに転送できます。
注意: 使用されるRabbitMQの動作モードに関わらず、違いはエクスチェンジのタイプとルーティングパラメータの使用にあります。
1. 必要なチュートリアル
関連する知識を理解するために、以下の章を読んでください:
- RabbitMQ Basics
- RabbitMQ パブリッシュ/サブスクライブ パターン
- RabbitMQ PHP クイックスタート(後続の章ではコードを繰り返さず、キーコードスニペットのみを表示します)
2. ファンアウトエクスチェンジの定義
チャンネルのexchange_declare関数を使用してエクスチェンジを定義します。
$channel->exchange_declare(
'tizi365.fanout', // エクスチェンジ名、ユニークで重複してはいけない
'fanout', // エクスチェンジの種類
false,
false, // 永続化するかどうか
false
);
注意: メッセージのプロデューサーとコンシューマーの両方でエクスチェンジが必要です。
3. メッセージの送信
私たちはメッセージをエクスチェンジに送信し、エクスチェンジはルーティングルールに基づいてメッセージを対応するキューに配信します。
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
// RabbitMQの接続を作成
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
// チャンネルを作成
$channel = $connection->channel();
// エクスチェンジを宣言
$channel->exchange_declare(
'tizi365.fanout', // エクスチェンジ名、ユニークで重複してはいけない
'fanout', // エクスチェンジの種類
false,
false, // 永続化するかどうか
false
);
// メッセージオブジェクト、メッセージの内容をパラメータとして
$msg = new AMQPMessage("hello tizi365.com");
$channel->basic_publish(
$msg, // メッセージオブジェクト
'tizi365.fanout' // エクスチェンジ名
);
echo ' [x] Sent ', $msg->getBody(), "\n";
// リソースを解放
$channel->close();
$connection->close();
4. メッセージの受信
4.1. キューの定義とエクスチェンジへのバインド
キューメッセージを消費するためには、まずキューを定義し、その後キューをターゲットのエクスチェンジにバインドする必要があります。 以下に、特定のエクスチェンジに定義されたキューとのバインドが示されています。
// キューを宣言します。キュー名が空の場合、自動的に一意のIDが生成され、そのキュー名が返されます
list($queue_name, ,) = $channel->queue_declare(
"", // キュー名。重複することは許可されず、空の場合は自動的に一意のIDが生成され、匿名キューとなります
false,
false, // 永続化するかどうか
true,
false
);
// キューを指定したエクスチェンジにバインドします
$channel->queue_bind(
$queue_name, // キュー名
'tizi365.fanout' // エクスチェンジ名
);
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
// rabbitmq接続を作成
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
// チャンネルを作成
$channel = $connection->channel();
// エクスチェンジを宣言
$channel->exchange_declare(
'tizi365.fanout', // エクスチェンジ名。一意でなければならず、繰り返し使用することはできません
'fanout', // エクスチェンジの種類
false,
false, // 永続化するかどうか
false
);
// キューを宣言
list($queue_name, ,) = $channel->queue_declare(
"", // キュー名。繰り返し使用することができず、空の場合は一意のIDが生成され、匿名キューとなります
false,
false, // 永続化するかどうか
true,
false
);
// キューを指定したエクスチェンジにバインドします
$channel->queue_bind(
$queue_name, // キュー名
'tizi365.fanout' // エクスチェンジ名
);
echo " [*] メッセージを待っています。終了するにはCTRL+Cを押してください。\n";
// メッセージ処理関数を定義します(ここでは無名関数を使用しています)
$callback = function ($msg) {
// メッセージ処理ロジック
echo ' [x] ', $msg->body, "\n";
};
// コンシューマーを作成
$channel->basic_consume(
$queue_name, // コンシューマーが消費するキュー
'', // コンシュマータグ。無視し、一意のIDを生成します
false,
true, // メッセージを自動的に確認するかどうか。つまり、RabbitMQにメッセージが正常に処理されたことを伝えます。
false,
false,
$callback // メッセージ処理関数
);
// チャンネルが閉じられていない場合は、プロセスが終了しないようにブロックし続けます
while ($channel->is_open()) {
$channel->wait();
}
// リソースを解放します
$channel->close();
$connection->close();