PHP RabbitMQルーティングモード(ダイレクトモード)
RabbitMQのルーティングモードでは、「direct」タイプの交換が使用されます。パブリッシュ/サブスクライブモードとの主な違いは、ダイレクト交換がルーティングパラメーターと完全に一致するキューにメッセージを配信することです。アーキテクチャは以下のグラフに示されています。
注意:RabbitMQで使用される作業モードに関係なく、違いは使用される交換タイプとルーティングパラメーターにあります。
1. 準備チュートリアル
関連する知識を理解するために、まず以下のセクションを読んでください。
- RabbitMQ基本概念
- RabbitMQルーティングモードの原則
- PHPでのRabbitMQクイックスタート(後続のセクションではコードを繰り返し提示せず、主要なコードのみを表示します)
- PHP RabbitMQパブリッシュ/サブスクライブパターンセクション(交換のタイプとルーティングパラメーターを除いて、コード構造はほぼ同じです)
2. ダイレクト交換の定義
// 交換を宣言する
$channel->exchange_declare(
'tizi365.direct', // ユニークな交換名
'direct', // 交換のタイプ
false,
false, // 保存するかどうか
false
);
注意:メッセージの生産者と消費者の両方が交換を必要とします。
3. メッセージの送信
メッセージを交換に送信し、ルーティングルールに基づいて交換が対応するキューにメッセージを配信します。
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
// RabbitMQ接続を作成
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
// チャンネルを作成
$channel = $connection->channel();
// 交換を宣言する
$channel->exchange_declare(
'tizi365.direct', // ユニークな交換名
'direct', // 交換のタイプ
false,
false, // 保存するかどうか
false
);
// メッセージオブジェクトを作成し、メッセージの内容をパラメーターとして指定
$msg = new AMQPMessage("こんにちは、tizi365.com");
// 注意:basic_publishメソッドの第3パラメーターはルーティングパラメーターです
$channel->basic_publish(
$msg, // メッセージオブジェクト
'tizi365.direct', // 交換名
"tizi365" // ルーティングパラメーターは必要に応じて任意に定義できます
);
echo ' [x] 送信しました ', $msg->getBody(), "\n";
// リソースを解放する
$channel->close();
$connection->close();
注意:
basic_publish
メソッドの第3パラメーターはキーのパラメーターです。
4. メッセージの受信
4.1. キューの定義とエクスチェンジのバインド
キューメッセージを消費するには、まずキューを定義し、その後、キューをターゲットエクスチェンジにバインドする必要があります。
// 匿名のキューを宣言
list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);
// キューを指定したエクスチェンジにバインドする
$channel->queue_bind(
$queue_name, // キュー名
'tizi365.fanout', // エクスチェンジ名
"tizi365" // バインドするルーティングパラメーター、ここでは 'tizi365' をバインド
);
注: ダイレクトエクスチェンジのルールによると、メッセージを送信する際に携帯されるルーティングパラメーターが、キューをエクスチェンジにバインドする際に設定されたルーティングパラメーターと一致する場合、そのメッセージはこのキューに配信されます。
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
// RabbitMQ接続を作成
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
// チャンネルを作成
$channel = $connection->channel();
// エクスチェンジを宣言
$channel->exchange_declare(
'tizi365.direct', // エクスチェンジ名、ユニークでなければならず、繰り返してはならない
'direct', // エクスチェンジの種類
false,
false, // 耐久性があるかどうか
false
);
// 匿名のキューを宣言
list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);
// キューを特定のエクスチェンジにバインドする
$channel->queue_bind(
$queue_name, // キュー名
'tizi365.fanout', // エクスチェンジ名
"tizi365" // バインドするルーティングパラメーター、ここでは 'tizi365' をバインド
);
echo " [*] メッセージを待っています。終了するにはCTRL+Cを押してください。\n";
// メッセージ処理関数を定義(ここでは無名関数を使用)
$callback = function ($msg) {
// メッセージの処理ロジック
echo ' [x] ', $msg->body, "\n";
};
// コンシューマーを作成
$channel->basic_consume(
$queue_name, // 消費するキューの名前
'', // コンシューマータグ、無視され、自動的に一意のIDが生成されます
false,
true, // メッセージを自動的に確認するかどうか、つまりRabbitMQにメッセージの処理が成功したことを自動的に通知します。
false,
false,
$callback // メッセージ処理関数
);
// チャンネルが閉じられるまでプロセスをブロックし、プロセスが終了しないようにします
while ($channel->is_open()) {
$channel->wait();
}
// リソースを解放
$channel->close();
$connection->close();