PHP RabbitMQ Publish/Subscribe Pattern (также известный как шаблон рассылки)
Режим Fanout в RabbitMQ - это шаблон, при котором сообщения, отправленные производителем, обрабатываются несколькими потребителями в различных очередях, как показано на архитектурной диаграмме ниже.
Fanout exchange может направлять сообщения во все привязанные очереди.
Примечание: Независимо от использованного режима работы RabbitMQ, разница заключается в типе обмена и используемых параметрах маршрутизации.
1. Учебное пособие перед началом
Пожалуйста, прочтите следующие разделы, чтобы понять соответствующие знания:
- Основы RabbitMQ
- Шаблон Рассылки и Подписки в RabbitMQ
- RabbitMQ Быстрый старт для PHP (Обязательно, так как последующие разделы не будут повторять код, а будут показывать только ключевые фрагменты кода)
2. Определение Fanout Exchange
Определяем обмен через функцию exchange_declare канала.
$channel->exchange_declare(
'tizi365.fanout', // Название обмена, должно быть уникальным и не может дублироваться
'fanout', // Тип обмена
false,
false, // Сохранять ли
false
);
Примечание: Обмен требуется как производителю сообщений, так и потребителям.
3. Отправка сообщений
Отправляем сообщения в обмен, и обмен доставляет сообщения в соответствующие очереди на основе правил маршрутизации.
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
// Создаем соединение с RabbitMQ
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
// Создаем канал
$channel = $connection->channel();
// Объявляем обмен
$channel->exchange_declare(
'tizi365.fanout', // Название обмена, должно быть уникальным и не может дублироваться
'fanout', // Тип обмена
false,
false, // Сохранять ли
false
);
// Объект сообщения с содержимым сообщения в качестве параметра
$msg = new AMQPMessage("привет tizi365.com");
$channel->basic_publish(
$msg, // Объект сообщения
'tizi365.fanout' // Название обмена
);
echo ' [x] Отправлено ', $msg->getBody(), "\n";
// Освобождаем ресурсы
$channel->close();
$connection->close();
4. Получение сообщений
4.1. Определение очереди и привязка к обмену
Для получения сообщений из очереди необходимо сначала определить очередь, а затем привязать очередь к целевому обмену. Ниже приведено определение очереди и привязка ее к определенному обмену.
// Объявление очереди, если имя очереди пусто, автоматически генерируется уникальный ID, и возвращается имя очереди
list($queue_name, ,) = $channel->queue_declare(
"", // Имя очереди, не допускается повторение, если пусто, автоматически генерируется уникальный ID, создавая анонимную очередь
false,
false, // Нужно ли сохранять сообщения по окончании соединения
true,
false
);
// Привязка очереди к указанному обмену
$channel->queue_bind(
$queue_name, // Имя очереди
'tizi365.fanout' // Имя обмена
);
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
// Создание соединения с RabbitMQ
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
// Создание канала
$channel = $connection->channel();
// Объявление обмена
$channel->exchange_declare(
'tizi365.fanout', // Имя обмена, должно быть уникальным и не повторяться
'fanout', // Тип обмена
false,
false, // Нужно ли сохранять обмен по окончании соединения
false
);
// Объявление очереди
list($queue_name, ,) = $channel->queue_declare(
"", // Имя очереди, не допускается повторение, если пусто, то генерируется уникальный ID и создается анонимная очередь
false,
false, // Нужно ли сохранять сообщения по окончании соединения
true,
false
);
// Привязка очереди к указанному обмену
$channel->queue_bind(
$queue_name, // Имя очереди
'tizi365.fanout' // Имя обмена
);
echo " [*] Ожидание сообщений. Чтобы выйти, нажмите CTRL+C\n";
// Определение функции обработки сообщения (здесь используется анонимная функция)
$callback = function ($msg) {
// Логика обработки сообщения
echo ' [x] ', $msg->body, "\n";
};
// Создание потребителя
$channel->basic_consume(
$queue_name, // Имя очереди, из которой нужно потреблять
'', // Метка потребителя, не учитывается, генерируется уникальный ID
false,
true, // Автоматическое подтверждение сообщения, то есть информирование RabbitMQ о том, что сообщение успешно обработано
false,
false,
$callback // Функция обработки сообщения
);
// Если канал не закрыт, продолжаем блокировать процесс, чтобы предотвратить его завершение
while ($channel->is_open()) {
$channel->wait();
}
// Освобождение ресурсов
$channel->close();
$connection->close();