PHP RabbitMQ パブリッシュ/サブスクライブ パターン(ファンアウトパターン)

RabbitMQのファンアウトモードは、プロデューサーが送信したメッセージが複数の異なるキューで処理されるパターンであり、以下のアーキテクチャ図に示されています。

RabbitMQ ワークモード

ファンアウトエクスチェンジは、メッセージをすべてのバインドされたキューに転送できます。

注意: 使用されるRabbitMQの動作モードに関わらず、違いはエクスチェンジのタイプとルーティングパラメータの使用にあります。

1. 必要なチュートリアル

関連する知識を理解するために、以下の章を読んでください:

2. ファンアウトエクスチェンジの定義

チャンネルのexchange_declare関数を使用してエクスチェンジを定義します。

$channel->exchange_declare(
    'tizi365.fanout', // エクスチェンジ名、ユニークで重複してはいけない
    'fanout', // エクスチェンジの種類
    false,
    false, // 永続化するかどうか
    false
);

注意: メッセージのプロデューサーとコンシューマーの両方でエクスチェンジが必要です。

3. メッセージの送信

私たちはメッセージをエクスチェンジに送信し、エクスチェンジはルーティングルールに基づいてメッセージを対応するキューに配信します。

<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

// RabbitMQの接続を作成
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
// チャンネルを作成
$channel = $connection->channel();

// エクスチェンジを宣言
$channel->exchange_declare(
    'tizi365.fanout', // エクスチェンジ名、ユニークで重複してはいけない
    'fanout', // エクスチェンジの種類
    false,
    false, // 永続化するかどうか
    false
);
// メッセージオブジェクト、メッセージの内容をパラメータとして
$msg = new AMQPMessage("hello tizi365.com");

$channel->basic_publish(
    $msg, // メッセージオブジェクト
    'tizi365.fanout' // エクスチェンジ名
);

echo ' [x] Sent ', $msg->getBody(), "\n";

// リソースを解放
$channel->close();
$connection->close();

4. メッセージの受信

4.1. キューの定義とエクスチェンジへのバインド

キューメッセージを消費するためには、まずキューを定義し、その後キューをターゲットのエクスチェンジにバインドする必要があります。 以下に、特定のエクスチェンジに定義されたキューとのバインドが示されています。

// キューを宣言します。キュー名が空の場合、自動的に一意のIDが生成され、そのキュー名が返されます
list($queue_name, ,) = $channel->queue_declare(
    "", // キュー名。重複することは許可されず、空の場合は自動的に一意のIDが生成され、匿名キューとなります
    false,
    false, // 永続化するかどうか
    true,
    false
);

// キューを指定したエクスチェンジにバインドします
$channel->queue_bind(
    $queue_name, // キュー名
    'tizi365.fanout' // エクスチェンジ名
);
<?php 
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;

// rabbitmq接続を作成
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
// チャンネルを作成
$channel = $connection->channel();

// エクスチェンジを宣言
$channel->exchange_declare(
    'tizi365.fanout', // エクスチェンジ名。一意でなければならず、繰り返し使用することはできません
    'fanout', // エクスチェンジの種類
    false,
    false, // 永続化するかどうか
    false
);

// キューを宣言
list($queue_name, ,) = $channel->queue_declare(
    "", // キュー名。繰り返し使用することができず、空の場合は一意のIDが生成され、匿名キューとなります
    false,
    false, // 永続化するかどうか
    true,
    false
);

// キューを指定したエクスチェンジにバインドします
$channel->queue_bind(
    $queue_name, // キュー名
    'tizi365.fanout' // エクスチェンジ名
);

echo " [*] メッセージを待っています。終了するにはCTRL+Cを押してください。\n";

// メッセージ処理関数を定義します(ここでは無名関数を使用しています)
$callback = function ($msg) {
    // メッセージ処理ロジック
    echo ' [x] ', $msg->body, "\n";
};

// コンシューマーを作成
$channel->basic_consume(
    $queue_name, // コンシューマーが消費するキュー
    '', // コンシュマータグ。無視し、一意のIDを生成します
    false,
    true, // メッセージを自動的に確認するかどうか。つまり、RabbitMQにメッセージが正常に処理されたことを伝えます。
    false,
    false,
    $callback // メッセージ処理関数
);

// チャンネルが閉じられていない場合は、プロセスが終了しないようにブロックし続けます
while ($channel->is_open()) {
    $channel->wait();
}

// リソースを解放します
$channel->close();
$connection->close();