1. Przed samouczkiem
Proszę najpierw przeczytać następujące sekcje, aby zrozumieć związane z nimi informacje
- Podstawowe pojęcia RabbitMQ
- Zasada wzorca tematu RabbitMQ
- Rozdział Szybki start z PHP RabbitMQ (wymagany, ponieważ kolejne rozdziały nie powielają kodu, pokazują tylko kluczowy kod)
- Rozdział Wzorzec publikowania/subskrybowania dla PHP RabbitMQ (wymagany, ponieważ kod jest prawie identyczny, tylko typ wymiany i parametry trasowania się różnią)
2. Definiuj wymianę tematów
// Deklaruj wymianę
$channel->exchange_declare(
'tizi365.topic', // Nazwa wymiany, musi być unikalna, nie może się powtarzać
'topic', // Typ wymiany
false,
false, // Czy jest trwała
false
);
Uwaga: Zarówno producenci wiadomości, jak i konsumenci muszą mieć wymiany.
3. Wysyłanie wiadomości
Wysyłamy wiadomości do wymiany, a wymiana dostarcza wiadomości do odpowiedniej kolejki na podstawie reguł trasowania.
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
// Utwórz połączenie do RabbitMQ
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
// Utwórz kanał
$channel = $connection->channel();
// Deklaruj wymianę
$channel->exchange_declare(
'tizi365.topic', // Nazwa wymiany, musi być unikalna, nie może się powtarzać
'topic', // Typ wymiany
false,
false, // Czy jest trwała
false
);
// Obiekt wiadomości, parametrem jest treść wiadomości
$msg = new AMQPMessage("Cześć tizi365.com");
// Wyślij wiadomość
// Zwróć uwagę na trzeci parametr, parametr trasowania
$channel->basic_publish(
$msg, // Obiekt wiadomości
'tizi365.topic', // Nazwa wymiany
"www.tizi365.com" // Parametr trasy, można dowolnie definiować zgodnie z wymaganiami
);
echo ' [x] Wysłano ', $msg->getBody(), "\n";
// Zwolnij zasoby
$channel->close();
$connection->close();
4. Odbieranie wiadomości
4.1. Definiowanie kolejki i powiązanie wymiany
Aby konsumować wiadomości z kolejki, najpierw musisz zdefiniować kolejkę, a następnie powiązać ją z docelową wymianą.
// Zadeklaruj anonimową kolejkę
list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);
// Powiąż kolejkę z określoną wymianą
$channel->queue_bind(
$queue_name, // Nazwa kolejki
'tizi365.topic', // Nazwa wymiany
"*.tizi365.com" // Parametr powiązania trasowania, tutaj używany jest znak wieloznaczny * (gwiazdka), który pasuje do pojedynczego słowa
Notatka: Wszystkie ustawione parametry trasowania używają znaku wieloznacznego * (gwiazdka), który pasuje do pojedynczego słowa. Jeśli zmieni się na # (hash), będzie pasować do wielu słów.
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
// Utwórz połączenie rabbitmq
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
// Utwórz kanał
$channel = $connection->channel();
// Zadeklaruj wymianę
$channel->exchange_declare(
'tizi365.topic', // Nazwa wymiany, musi być unikalna i nie może się powtarzać
'topic', // Typ wymiany
false,
false, // Czy jest trwała
false
);
// Zadeklaruj anonimową kolejkę
list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);
// Powiąż kolejkę z określoną wymianą
$channel->queue_bind(
$queue_name, // Nazwa kolejki
'tizi365.topic', // Nazwa wymiany
"*.tizi365.com" // Klucz powiązania trasowania, tutaj używany jest znak wieloznaczny *, który pasuje do pojedynczego słowa
);
echo " [*] Oczekiwanie na wiadomość. Aby zakończyć, naciśnij CTRL+C\n";
// Zdefiniuj funkcję obsługi wiadomości (tutaj używana jest funkcja anonimowa)
$callback = function ($msg) {
// Logika przetwarzania wiadomości
echo ' [x] ', $msg->body, "\n";
};
// Utwórz konsumenta
$channel->basic_consume(
$queue_name, // Nazwa kolejki do konsumowania
'', // Tag konsumenta, jeśli zostanie pominięty, zostanie wygenerowany automatycznie unikalny identyfikator
false,
true, // Czy automatycznie potwierdzać otrzymanie wiadomości, czyli automatycznie informuje rabbitmq o pomyślnym przetworzeniu wiadomości
false,
false,
$callback // Funkcja obsługi wiadomości
);
// Jeśli kanał nie jest zamknięty, blokuj proces, aby uniknąć zakończenia procesu
while ($channel->is_open()) {
$channel->wait();
}
// Zwolnij zasoby
$channel->close();
$connection->close();
Ponieważ parametr trasowania ustawiony podczas powiązywania wymiany to *.tizi365.com, pasuje do parametru trasowania wiadomości (www.tizi365.com), więc wiadomość może być odebrana.