PHP RabbitMQルーティングモード(ダイレクトモード)

RabbitMQのルーティングモードでは、「direct」タイプの交換が使用されます。パブリッシュ/サブスクライブモードとの主な違いは、ダイレクト交換がルーティングパラメーターと完全に一致するキューにメッセージを配信することです。アーキテクチャは以下のグラフに示されています。

RabbitMQ Direct Mode

注意:RabbitMQで使用される作業モードに関係なく、違いは使用される交換タイプとルーティングパラメーターにあります。

1. 準備チュートリアル

関連する知識を理解するために、まず以下のセクションを読んでください。

2. ダイレクト交換の定義

// 交換を宣言する
$channel->exchange_declare(
    'tizi365.direct', // ユニークな交換名
    'direct', // 交換のタイプ
    false,
    false, // 保存するかどうか
    false
);

注意:メッセージの生産者と消費者の両方が交換を必要とします。

3. メッセージの送信

メッセージを交換に送信し、ルーティングルールに基づいて交換が対応するキューにメッセージを配信します。

<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

// RabbitMQ接続を作成
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
// チャンネルを作成
$channel = $connection->channel();

// 交換を宣言する
$channel->exchange_declare(
    'tizi365.direct', // ユニークな交換名
    'direct', // 交換のタイプ
    false,
    false, // 保存するかどうか
    false
);

// メッセージオブジェクトを作成し、メッセージの内容をパラメーターとして指定
$msg = new AMQPMessage("こんにちは、tizi365.com");

// 注意:basic_publishメソッドの第3パラメーターはルーティングパラメーターです
$channel->basic_publish(
    $msg, // メッセージオブジェクト
    'tizi365.direct', // 交換名
    "tizi365" // ルーティングパラメーターは必要に応じて任意に定義できます
);

echo ' [x] 送信しました ', $msg->getBody(), "\n";

// リソースを解放する
$channel->close();
$connection->close();

注意:basic_publishメソッドの第3パラメーターはキーのパラメーターです。

4. メッセージの受信

4.1. キューの定義とエクスチェンジのバインド

キューメッセージを消費するには、まずキューを定義し、その後、キューをターゲットエクスチェンジにバインドする必要があります。

// 匿名のキューを宣言
list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);

// キューを指定したエクスチェンジにバインドする
$channel->queue_bind(
    $queue_name, // キュー名
    'tizi365.fanout', // エクスチェンジ名
    "tizi365" // バインドするルーティングパラメーター、ここでは 'tizi365' をバインド
);

注: ダイレクトエクスチェンジのルールによると、メッセージを送信する際に携帯されるルーティングパラメーターが、キューをエクスチェンジにバインドする際に設定されたルーティングパラメーターと一致する場合、そのメッセージはこのキューに配信されます。

<?php

require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;

// RabbitMQ接続を作成
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
// チャンネルを作成
$channel = $connection->channel();

// エクスチェンジを宣言
$channel->exchange_declare(
    'tizi365.direct', // エクスチェンジ名、ユニークでなければならず、繰り返してはならない
    'direct', // エクスチェンジの種類
    false,
    false, // 耐久性があるかどうか
    false
);

// 匿名のキューを宣言
list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);

// キューを特定のエクスチェンジにバインドする
$channel->queue_bind(
    $queue_name, // キュー名
    'tizi365.fanout', // エクスチェンジ名
    "tizi365" // バインドするルーティングパラメーター、ここでは 'tizi365' をバインド
);

echo " [*] メッセージを待っています。終了するにはCTRL+Cを押してください。\n";

// メッセージ処理関数を定義(ここでは無名関数を使用)
$callback = function ($msg) {
    // メッセージの処理ロジック
    echo ' [x] ', $msg->body, "\n";
};

// コンシューマーを作成
$channel->basic_consume(
    $queue_name, // 消費するキューの名前
    '', // コンシューマータグ、無視され、自動的に一意のIDが生成されます
    false,
    true, // メッセージを自動的に確認するかどうか、つまりRabbitMQにメッセージの処理が成功したことを自動的に通知します。
    false,
    false,
    $callback // メッセージ処理関数
);

// チャンネルが閉じられるまでプロセスをブロックし、プロセスが終了しないようにします
while ($channel->is_open()) {
    $channel->wait();
}

// リソースを解放
$channel->close();
$connection->close();