PHP Wzorzec publikacji/subskrypcji RabbitMQ (znany również jako wzorzec rozprzestrzeniania)

W trybie rozprzestrzeniania w RabbitMQ wiadomości wysyłane przez producenta są przetwarzane przez wiele konsumentów w różnych kolejkach, jak pokazano na poniższym schemacie architektury.

Tryb pracy RabbitMQ

Wymiana rozprzestrzeniania może przesłać wiadomości do wszystkich powiązanych kolejek.

Uwaga: Bez względu na używany tryb pracy RabbitMQ, różnica polega na rodzaju wymiany i używanych parametrach routingu.

1. Samouczek wstępny

Proszę przeczytać poniższe rozdziały, aby zrozumieć związane zagadnienia:

2. Definiowanie wymiany rozprzestrzeniania

Zdefiniuj wymianę za pomocą funkcji exchange_declare kanału.

$channel->exchange_declare(
    'tizi365.fanout', // Nazwa wymiany, musi być unikalna i nie może się powtarzać
    'fanout', // Rodzaj wymiany
    false,
    false, // Czy ma być trwała
    false
);

Uwaga: Zarówno producenci wiadomości, jak i konsumenci wymagają wymiany.

3. Wysyłanie wiadomości

Wysyłamy wiadomości do wymiany, a wymiana przekazuje wiadomości do odpowiednich kolejek na podstawie reguł routingu.

<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

// Utwórz połączenie RabbitMQ
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
// Utwórz kanał
$channel = $connection->channel();

// Zadeklaruj wymianę
$channel->exchange_declare(
    'tizi365.fanout', // Nazwa wymiany, musi być unikalna i nie może się powtarzać
    'fanout', // Rodzaj wymiany
    false,
    false, // Czy ma być trwała
    false
);
// Obiekt wiadomości, z treścią wiadomości jako parametrem
$msg = new AMQPMessage("Cześć tizi365.com");

$channel->basic_publish(
    $msg, // Obiekt wiadomości
    'tizi365.fanout' // Nazwa wymiany
);

echo ' [x] Wysłano ', $msg->getBody(), "\n";

// Zwolnij zasoby
$channel->close();
$connection->close();

4. Odbieranie wiadomości

4.1. Definiowanie kolejki i wiązanie wymiany

Aby konsumować wiadomości z kolejki, należy najpierw zdefiniować kolejkę, a następnie związać ją z docelową wymianą. Poniżej zdefiniowano kolejkę i powiązano ją z określoną wymianą.

// Deklaracja kolejki, jeśli nazwa kolejki jest pusta, automatycznie generowany jest unikalny identyfikator, a nazwa kolejki jest zwracana
list($queue_name, ,) = $channel->queue_declare(
    "", // Nazwa kolejki, nie może być zduplikowana, jeśli jest pusta, automatycznie generowany jest unikalny identyfikator, co czyni ją anonimową kolejką
    false,
    false, // Czy należy zachować
    true,
    false
);

// Powiązanie kolejki z określoną wymianą
$channel->queue_bind(
    $queue_name, // Nazwa kolejki
    'tizi365.fanout' // Nazwa wymiany
);
<?php 
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;

// Utworzenie połączenia z rabbitmq
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
// Utworzenie kanału
$channel = $connection->channel();

// Deklaracja wymiany
$channel->exchange_declare(
    'tizi365.fanout', // Nazwa wymiany, musi być unikalna i nie może się powtarzać
    'fanout', // Typ wymiany
    false,
    false, // Czy należy zachować
    false
);

// Deklaracja kolejki
list($queue_name, ,) = $channel->queue_declare(
    "", // Nazwa kolejki, nie może się powtarzać, jeśli jest pusta, generowany jest unikalny identyfikator, wtedy jest to anonimowa kolejka
    false,
    false, // Czy należy zachować
    true,
    false
);

// Powiązanie kolejki z określoną wymianą
$channel->queue_bind(
    $queue_name, // Nazwa kolejki
    'tizi365.fanout' // Nazwa wymiany
);

echo " [*] Oczekiwanie na wiadomość. Aby zakończyć, naciśnij CTRL+C\n";

// Zdefiniowanie funkcji przetwarzania wiadomości (używając funkcji anonimowej tutaj)
$callback = function ($msg) {
    // Logika przetwarzania wiadomości
    echo ' [x] ', $msg->body, "\n";
};

// Utworzenie konsumenta
$channel->basic_consume(
    $queue_name, // Nazwa kolejki, z którą konsument ma pracować
    '', // Tag konsumenta, ignorowany, następnie generowany jest unikalny identyfikator
    false,
    true, // Czy automatycznie potwierdzać wiadomość, czyli informować rabbitmq, że wiadomość została pomyślnie przetworzona
    false,
    false,
    $callback // Funkcja przetwarzania wiadomości
);

// Jeśli kanał nie jest zamknięty, trzymaj proces zablokowany, aby zapobiec jego zakończeniu
while ($channel->is_open()) {
    $channel->wait();
}

// Zwolnienie zasobów
$channel->close();
$connection->close();