PHP RabbitMQ Publish/Subscribe Pattern (также известный как шаблон рассылки)

Режим Fanout в RabbitMQ - это шаблон, при котором сообщения, отправленные производителем, обрабатываются несколькими потребителями в различных очередях, как показано на архитектурной диаграмме ниже.

RabbitMQ Режим работы

Fanout exchange может направлять сообщения во все привязанные очереди.

Примечание: Независимо от использованного режима работы RabbitMQ, разница заключается в типе обмена и используемых параметрах маршрутизации.

1. Учебное пособие перед началом

Пожалуйста, прочтите следующие разделы, чтобы понять соответствующие знания:

2. Определение Fanout Exchange

Определяем обмен через функцию exchange_declare канала.

$channel->exchange_declare(
    'tizi365.fanout', // Название обмена, должно быть уникальным и не может дублироваться
    'fanout', // Тип обмена
    false,
    false, // Сохранять ли
    false
);

Примечание: Обмен требуется как производителю сообщений, так и потребителям.

3. Отправка сообщений

Отправляем сообщения в обмен, и обмен доставляет сообщения в соответствующие очереди на основе правил маршрутизации.

<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

// Создаем соединение с RabbitMQ
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
// Создаем канал
$channel = $connection->channel();

// Объявляем обмен
$channel->exchange_declare(
    'tizi365.fanout', // Название обмена, должно быть уникальным и не может дублироваться
    'fanout', // Тип обмена
    false,
    false, // Сохранять ли
    false
);
// Объект сообщения с содержимым сообщения в качестве параметра
$msg = new AMQPMessage("привет tizi365.com");

$channel->basic_publish(
    $msg, // Объект сообщения
    'tizi365.fanout' // Название обмена
);

echo ' [x] Отправлено ', $msg->getBody(), "\n";

// Освобождаем ресурсы
$channel->close();
$connection->close();

4. Получение сообщений

4.1. Определение очереди и привязка к обмену

Для получения сообщений из очереди необходимо сначала определить очередь, а затем привязать очередь к целевому обмену. Ниже приведено определение очереди и привязка ее к определенному обмену.

// Объявление очереди, если имя очереди пусто, автоматически генерируется уникальный ID, и возвращается имя очереди
list($queue_name, ,) = $channel->queue_declare(
    "", // Имя очереди, не допускается повторение, если пусто, автоматически генерируется уникальный ID, создавая анонимную очередь
    false,
    false, // Нужно ли сохранять сообщения по окончании соединения
    true,
    false
);

// Привязка очереди к указанному обмену
$channel->queue_bind(
    $queue_name, // Имя очереди
    'tizi365.fanout' // Имя обмена
);
<?php 
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;

// Создание соединения с RabbitMQ
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
// Создание канала
$channel = $connection->channel();

// Объявление обмена
$channel->exchange_declare(
    'tizi365.fanout', // Имя обмена, должно быть уникальным и не повторяться
    'fanout', // Тип обмена
    false,
    false, // Нужно ли сохранять обмен по окончании соединения
    false
);

// Объявление очереди
list($queue_name, ,) = $channel->queue_declare(
    "", // Имя очереди, не допускается повторение, если пусто, то генерируется уникальный ID и создается анонимная очередь
    false,
    false, // Нужно ли сохранять сообщения по окончании соединения
    true,
    false
);

// Привязка очереди к указанному обмену
$channel->queue_bind(
    $queue_name, // Имя очереди
    'tizi365.fanout' // Имя обмена
);

echo " [*] Ожидание сообщений. Чтобы выйти, нажмите CTRL+C\n";

// Определение функции обработки сообщения (здесь используется анонимная функция)
$callback = function ($msg) {
    // Логика обработки сообщения
    echo ' [x] ', $msg->body, "\n";
};

// Создание потребителя
$channel->basic_consume(
    $queue_name, // Имя очереди, из которой нужно потреблять
    '', // Метка потребителя, не учитывается, генерируется уникальный ID
    false,
    true, // Автоматическое подтверждение сообщения, то есть информирование RabbitMQ о том, что сообщение успешно обработано
    false,
    false,
    $callback // Функция обработки сообщения
);

// Если канал не закрыт, продолжаем блокировать процесс, чтобы предотвратить его завершение
while ($channel->is_open()) {
    $channel->wait();
}

// Освобождение ресурсов
$channel->close();
$connection->close();