1. プリチュートリアル

以下のセクションを最初に読んで、関連する知識を理解してください。

2. トピックエクスチェンジを定義

// エクスチェンジを宣言
$channel->exchange_declare(
    'tizi365.topic', // エクスチェンジ名、ユニークであり、繰り返すことはできません
    'topic', // エクスチェンジのタイプ
    false,
    false, // 耐久性
    false
);

注意:メッセージの生産者と消費者の両方がエクスチェンジが必要です。

3. メッセージを送信

エクスチェンジにメッセージを送信し、エクスチェンジはルーティングルールに基づいてメッセージを対応するキューに配信します。

<?php

require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

// RabbitMQ接続を作成
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
// チャネルを作成
$channel = $connection->channel();

// エクスチェンジを宣言
$channel->exchange_declare(
    'tizi365.topic', // エクスチェンジ名、ユニークであり、繰り返すことはできません
    'topic', // エクスチェンジのタイプ
    false,
    false, // 耐久性
    false
);


// メッセージオブジェクト、パラメータはメッセージの内容です
$msg = new AMQPMessage("hello tizi365.com");

// メッセージを送信
// 第3パラメータに注意してください、ルーティングパラメータです
$channel->basic_publish(
    $msg, // メッセージオブジェクト
    'tizi365.topic', // エクスチェンジ名
    "www.tizi365.com" // ルーティングパラメータ、要件に応じて任意に定義できます
);

echo ' [x] 送信しました ', $msg->getBody(), "\n";

// リソースを解放
$channel->close();
$connection->close();

4. メッセージを受信

4.1. キューの定義とエクスチェンジのバインド

キューメッセージを消費するためには、まずキューを定義し、その後そのキューを対象のエクスチェンジにバインドする必要があります。

// 匿名キューを宣言する
list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);

// キューを指定のエクスチェンジにバインドする
$channel->queue_bind(
    $queue_name, // キュー名
    'tizi365.topic', // エクスチェンジ名
    "*.tizi365.com" // バインドルーティングパラメーター。ここではワイルドカード*(アスタリスク)を使用し、一つの単語に一致させることができます

注:設定されたルーティングパラメーターはすべて*(アスタリスク)ワイルドカードを使用しており、単語一つに一致します。もし#(ハッシュ)に変更すると、複数の単語に一致させることができます。

<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;

// RabbitMQ接続を作成
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
// チャンネルを作成
$channel = $connection->channel();

// エクスチェンジを宣言
$channel->exchange_declare(
    'tizi365.topic', // エクスチェンジ名。一意であり、繰り返しができない必要があります
    'topic', // エクスチェンジタイプ
    false,
    false, // 耐久性があるかどうか
    false
);

// 匿名キューを宣言する
list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);

// キューを指定のエクスチェンジにバインドする
$channel->queue_bind(
    $queue_name, // キュー名
    'tizi365.topic', // エクスチェンジ名
    "*.tizi365.com" // バインドルーティングキー。ここではワイルドカード*を使用し、一つの単語に一致させることができます
);

echo " [*] メッセージを待機中。終了するにはCTRL+Cを押してください\n";

// メッセージ処理関数を定義する(ここでは無名関数を使用しています)
$callback = function ($msg) {
    // メッセージ処理ロジック
    echo ' [x] ', $msg->body, "\n";
};

// コンシューマを作成
$channel->basic_consume(
    $queue_name, // コンシュームするキュー名
    '', // コンシュマータグ。無視すると自動的に一意のIDが生成されます
    false,
    true, // メッセージを自動的にアクローリッジメントするかどうか、つまり、RabbitMQに自動的にメッセージが正常に処理されたことを伝えます
    false,
    false,
    $callback // メッセージ処理関数
);

// チャンネルが閉じていない場合、プロセスをブロックし続けてプロセスが終了しないようにします
while ($channel->is_open()) {
    $channel->wait();
}

// リソースを解放する
$channel->close();
$connection->close();

エクスチェンジをバインドする際のルーティングパラメーターは*.tizi365.comとなっており、これはメッセージのルーティングパラメーター(www.tizi365.com)に一致するため、メッセージを受信できます。