PHP RabbitMQ-Publish/Subscribe-Muster (auch bekannt als Fanout-Muster)
Der Fanout-Modus in RabbitMQ ist ein Muster, bei dem von einem Produzenten gesendete Nachrichten von mehreren Verbrauchern in verschiedenen Warteschlangen verarbeitet werden, wie in der folgenden Architekturdiagramm dargestellt.
Die Fanout-Börse kann Nachrichten an alle gebundenen Warteschlangen weiterleiten.
Hinweis: Unabhängig vom verwendeten RabbitMQ-Arbeitsmodus liegt der Unterschied in der Art des Austauschs und der verwendeten Routing-Parameter.
1. Voraussetzung Tutorial
Bitte lesen Sie die folgenden Kapitel, um das relevante Wissen zu verstehen:
- RabbitMQ Basics
- RabbitMQ Publish/Subscribe-Muster
- RabbitMQ PHP Schnellstart (Obligatorisch, da spätere Kapitel den Code nicht wiederholen, sondern nur wichtige Codeausschnitte zeigen)
2. Definition eines Fanout-Austauschs
Definieren Sie den Austausch über die Funktion exchange_declare des Kanals.
$channel->exchange_declare(
'tizi365.fanout', // Austauschname, muss eindeutig und darf nicht dupliziert werden.
'fanout', // Austauschtyp
false,
false, // Ob zu behalten
false
);
Hinweis: Sowohl Nachrichtenproduzenten als auch Verbraucher benötigen den Austausch.
3. Senden von Nachrichten
Wir senden Nachrichten an den Austausch, und der Austausch liefert die Nachrichten basierend auf den Routingregeln an die entsprechenden Warteschlangen.
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
// Erstellen einer RabbitMQ-Verbindung
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
// Kanal erstellen
$channel = $connection->channel();
// Austausch deklarieren
$channel->exchange_declare(
'tizi365.fanout', // Austauschname, muss eindeutig und darf nicht dupliziert werden.
'fanout', // Austauschtyp
false,
false, // Ob zu behalten
false
);
// Nachrichtenobjekt mit dem Nachrichteninhalt als Parameter
$msg = new AMQPMessage("Hallo tizi365.com");
$channel->basic_publish(
$msg, // Nachrichtenobjekt
'tizi365.fanout' // Austauschname
);
echo ' [x] Gesendet ', $msg->getBody(), "\n";
// Ressourcen freigeben
$channel->close();
$connection->close();
4. Empfangen von Nachrichten
4.1. Definieren einer Warteschlange & Binden der Exchange
Um Warteschlangennachrichten zu verbrauchen, müssen Sie zuerst eine Warteschlange definieren und sie dann an die Ziel-Exchange binden. Nachfolgend wird eine Warteschlange definiert und an eine spezifische Exchange gebunden.
// Definiere eine Warteschlange. Wenn der Warteschlangenname leer ist, wird automatisch eine eindeutige ID generiert, und der Warteschlangenname wird zurückgegeben
list($queue_name, ,) = $channel->queue_declare(
"", // Warteschlangenname, darf nicht dupliziert werden. Wenn leer, wird automatisch eine eindeutige ID generiert, und es entsteht eine anonyme Warteschlange
false,
false, // Ob persistieren
true,
false
);
// Binde die Warteschlange an die angegebene Exchange
$channel->queue_bind(
$queue_name, // Warteschlangenname
'tizi365.fanout' // Exchange-Name
);
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
// Erstelle eine RabbitMQ-Verbindung
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
// Erstelle einen Kanal
$channel = $connection->channel();
// Erkläre eine Exchange
$channel->exchange_declare(
'tizi365.fanout', // Exchange-Name, muss eindeutig sein und darf nicht wiederholt werden
'fanout', // Exchange-Typ
false,
false, // Ob persistieren
false
);
// Erkläre eine Warteschlange
list($queue_name, ,) = $channel->queue_declare(
"", // Warteschlangenname, darf nicht wiederholt werden. Wenn leer, dann generiere eine eindeutige ID, dann ist es eine anonyme Warteschlange
false,
false, // Ob persistieren
true,
false
);
// Binde die Warteschlange an die angegebene Exchange
$channel->queue_bind(
$queue_name, // Warteschlangenname
'tizi365.fanout' // Exchange-Name
);
echo " [*] Warte auf Nachrichten. Um zu beenden, drücke STRG+C\n";
// Definiere die Funktion zur Nachrichtenverarbeitung (hier wird eine anonyme Funktion verwendet)
$callback = function ($msg) {
// Logik zur Nachrichtenverarbeitung
echo ' [x] ', $msg->body, "\n";
};
// Erstelle einen Konsumenten
$channel->basic_consume(
$queue_name, // Warteschlangenname, die zu verbrauchende Warteschlange
'', // Konsumenten-Tag, ignorieren, dann eine eindeutige ID generieren
false,
true, // Ob die Nachricht automatisch bestätigt werden soll, d.h. RabbitMQ mitteilen, dass die Nachricht erfolgreich verarbeitet wurde
false,
false,
$callback // Funktion zur Nachrichtenverarbeitung
);
// Wenn der Kanal nicht geschlossen ist, blockiere den Prozess, um ein Beenden zu verhindern
while ($channel->is_open()) {
$channel->wait();
}
// Ressourcen freigeben
$channel->close();
$connection->close();