1. Предварительное чтение
Пожалуйста, сначала ознакомьтесь с следующими разделами, чтобы понять соответствующие знания
- Основные концепции RabbitMQ
- Принцип работы шаблона тем в RabbitMQ
- Глава быстрого старта PHP RabbitMQ (обязательно, потому что последующие главы не будут дублировать код, а покажут только ключевой код)
- Глава шаблона издательской/подписной модели PHP RabbitMQ (обязательно, потому что код почти одинаков, только тип обмена и параметры маршрутизации отличаются)
2. Определение тематического обмена
// Объявление обмена
$channel->exchange_declare(
'tizi365.topic', // Имя обмена, должно быть уникальным, не может повторяться
'topic', // Тип обмена
false,
false, // Является ли он долговечным
false
);
Примечание: Обмены необходимы как для производителей сообщений, так и для потребителей.
3. Отправка сообщения
Мы отправляем сообщения в обмен, и обмен доставляет сообщения в соответствующую очередь на основе правил маршрутизации.
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
// Создаем соединение с RabbitMQ
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
// Создаем канал
$channel = $connection->channel();
// Объявление обмена
$channel->exchange_declare(
'tizi365.topic', // Имя обмена, должно быть уникальным, не может повторяться
'topic', // Тип обмена
false,
false, // Является ли он долговечным
false
);
// Объект сообщения, параметр - содержание сообщения
$msg = new AMQPMessage("hello tizi365.com");
// Отправка сообщения
// Обратите внимание на третий параметр, параметр маршрутизации
$channel->basic_publish(
$msg, // Объект сообщения
'tizi365.topic', // Имя обмена
"www.tizi365.com" // Параметр маршрутизации, может быть произвольно определен в соответствии с требованиями
);
echo ' [x] Sent ', $msg->getBody(), "\n";
// Освобождение ресурсов
$channel->close();
$connection->close();
4.1. Определение очереди и привязка обмена
Для потребления сообщений из очереди сначала необходимо определить саму очередь, а затем привязать её к целевому обмену.
// Объявление анонимной очереди
list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);
// Привязка очереди к указанному обмену
$channel->queue_bind(
$queue_name, // Наименование очереди
'tizi365.topic', // Наименование обмена
"*.tizi365.com" // Параметр привязки маршрутизации, здесь используется метасимвол * (звёздочка), который может сопоставить одно слово
Примечание: Все заданные параметры маршрутизации используют метасимвол * (звёздочка), который может сопоставить одно слово. Если изменить на # (хеш), это позволит сопоставить несколько слов.
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
// Создание соединения с RabbitMQ
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
// Создание канала
$channel = $connection->channel();
// Объявление обмена
$channel->exchange_declare(
'tizi365.topic', // Наименование обмена, должно быть уникальным и не повторяться
'topic', // Тип обмена
false,
false, // Долговечность
false
);
// Объявление анонимной очереди
list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);
// Привязка очереди к указанному обмену
$channel->queue_bind(
$queue_name, // Наименование очереди
'tizi365.topic', // Наименование обмена
"*.tizi365.com" // Ключ привязки, используется метасимвол * здесь, который может сопоставить одно слово
);
echo " [*] Ожидание сообщения. Для выхода нажмите CTRL+C\n";
// Определение функции обработки сообщения (используется анонимная функция)
$callback = function ($msg) {
// Логика обработки сообщения
echo ' [x] ', $msg->body, "\n";
};
// Создание потребителя
$channel->basic_consume(
$queue_name, // Наименование очереди для потребления
'', // Тег потребителя, если игнорируется, будет сгенерирован уникальный идентификатор автоматически
false,
true, // Автоматическое подтверждение сообщения, т.е. автоматическое уведомление RabbitMQ о успешной обработке сообщения
false,
false,
$callback // Функция обработки сообщения
);
// Если канал не закрыт, блокировать процесс, чтобы избежать преждевременного завершения программы
while ($channel->is_open()) {
$channel->wait();
}
// Освобождение ресурсов
$channel->close();
$connection->close();
Поскольку параметр маршрутизации, установленный при привязке обмена, равен *.tizi365.com, он соответствует параметру маршрутизации сообщения (www.tizi365.com), и сообщение может быть получено.