PHP Wzorzec publikacji/subskrypcji RabbitMQ (znany również jako wzorzec rozprzestrzeniania)
W trybie rozprzestrzeniania w RabbitMQ wiadomości wysyłane przez producenta są przetwarzane przez wiele konsumentów w różnych kolejkach, jak pokazano na poniższym schemacie architektury.
Wymiana rozprzestrzeniania może przesłać wiadomości do wszystkich powiązanych kolejek.
Uwaga: Bez względu na używany tryb pracy RabbitMQ, różnica polega na rodzaju wymiany i używanych parametrach routingu.
1. Samouczek wstępny
Proszę przeczytać poniższe rozdziały, aby zrozumieć związane zagadnienia:
- Podstawy RabbitMQ
- Wzorzec publikacji/subskrypcji RabbitMQ
- Szybki start z RabbitMQ dla PHP (Obowiązkowe, ponieważ kolejne rozdziały nie będą powtarzać kodu, a jedynie pokażą kluczowe fragmenty kodu)
2. Definiowanie wymiany rozprzestrzeniania
Zdefiniuj wymianę za pomocą funkcji exchange_declare kanału.
$channel->exchange_declare(
'tizi365.fanout', // Nazwa wymiany, musi być unikalna i nie może się powtarzać
'fanout', // Rodzaj wymiany
false,
false, // Czy ma być trwała
false
);
Uwaga: Zarówno producenci wiadomości, jak i konsumenci wymagają wymiany.
3. Wysyłanie wiadomości
Wysyłamy wiadomości do wymiany, a wymiana przekazuje wiadomości do odpowiednich kolejek na podstawie reguł routingu.
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
// Utwórz połączenie RabbitMQ
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
// Utwórz kanał
$channel = $connection->channel();
// Zadeklaruj wymianę
$channel->exchange_declare(
'tizi365.fanout', // Nazwa wymiany, musi być unikalna i nie może się powtarzać
'fanout', // Rodzaj wymiany
false,
false, // Czy ma być trwała
false
);
// Obiekt wiadomości, z treścią wiadomości jako parametrem
$msg = new AMQPMessage("Cześć tizi365.com");
$channel->basic_publish(
$msg, // Obiekt wiadomości
'tizi365.fanout' // Nazwa wymiany
);
echo ' [x] Wysłano ', $msg->getBody(), "\n";
// Zwolnij zasoby
$channel->close();
$connection->close();
4. Odbieranie wiadomości
4.1. Definiowanie kolejki i wiązanie wymiany
Aby konsumować wiadomości z kolejki, należy najpierw zdefiniować kolejkę, a następnie związać ją z docelową wymianą. Poniżej zdefiniowano kolejkę i powiązano ją z określoną wymianą.
// Deklaracja kolejki, jeśli nazwa kolejki jest pusta, automatycznie generowany jest unikalny identyfikator, a nazwa kolejki jest zwracana
list($queue_name, ,) = $channel->queue_declare(
"", // Nazwa kolejki, nie może być zduplikowana, jeśli jest pusta, automatycznie generowany jest unikalny identyfikator, co czyni ją anonimową kolejką
false,
false, // Czy należy zachować
true,
false
);
// Powiązanie kolejki z określoną wymianą
$channel->queue_bind(
$queue_name, // Nazwa kolejki
'tizi365.fanout' // Nazwa wymiany
);
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
// Utworzenie połączenia z rabbitmq
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
// Utworzenie kanału
$channel = $connection->channel();
// Deklaracja wymiany
$channel->exchange_declare(
'tizi365.fanout', // Nazwa wymiany, musi być unikalna i nie może się powtarzać
'fanout', // Typ wymiany
false,
false, // Czy należy zachować
false
);
// Deklaracja kolejki
list($queue_name, ,) = $channel->queue_declare(
"", // Nazwa kolejki, nie może się powtarzać, jeśli jest pusta, generowany jest unikalny identyfikator, wtedy jest to anonimowa kolejka
false,
false, // Czy należy zachować
true,
false
);
// Powiązanie kolejki z określoną wymianą
$channel->queue_bind(
$queue_name, // Nazwa kolejki
'tizi365.fanout' // Nazwa wymiany
);
echo " [*] Oczekiwanie na wiadomość. Aby zakończyć, naciśnij CTRL+C\n";
// Zdefiniowanie funkcji przetwarzania wiadomości (używając funkcji anonimowej tutaj)
$callback = function ($msg) {
// Logika przetwarzania wiadomości
echo ' [x] ', $msg->body, "\n";
};
// Utworzenie konsumenta
$channel->basic_consume(
$queue_name, // Nazwa kolejki, z którą konsument ma pracować
'', // Tag konsumenta, ignorowany, następnie generowany jest unikalny identyfikator
false,
true, // Czy automatycznie potwierdzać wiadomość, czyli informować rabbitmq, że wiadomość została pomyślnie przetworzona
false,
false,
$callback // Funkcja przetwarzania wiadomości
);
// Jeśli kanał nie jest zamknięty, trzymaj proces zablokowany, aby zapobiec jego zakończeniu
while ($channel->is_open()) {
$channel->wait();
}
// Zwolnienie zasobów
$channel->close();
$connection->close();