PHP RabbitMQ Routing-Modus (Direktmodus)
Im Routing-Modus von RabbitMQ wird der Typ des verwendeten Exchanges als "direct" verwendet. Der Hauptunterschied zum Publish-Subscribe-Modus besteht darin, dass der direkte Austausch Nachrichten an Queues liefert, deren Routing-Parameter vollständig übereinstimmen. Die Architektur sieht wie folgt aus:
Hinweis: Unabhängig vom Arbeitsmodus in RabbitMQ liegt der Unterschied im Typ des verwendeten Exchanges und den Routing-Parametern.
1. Vorbereitendes Tutorial
Bitte lesen Sie zuerst die folgenden Abschnitte, um das relevante Wissen zu verstehen:
- Grundkonzepte von RabbitMQ
- Grundlagen des Routing-Modus von RabbitMQ
- Schnellstart für PHP mit RabbitMQ (erforderlich, da die nachfolgenden Abschnitte den Code nicht wiederholen, sondern nur den Schlüsselcode anzeigen)
- PHP RabbitMQ Publish-Subscribe Musterabschnitt (erforderlich, da die Code-Struktur fast identisch ist, abgesehen vom Typ des Exchanges und den Routing-Parametern)
2. Direkten Exchange definieren
// Exchange deklarieren
$channel->exchange_declare(
'tizi365.direct', // Eindeutiger Exchange-Name
'direct', // Exchange-Typ
false,
false, // Ob persistent
false
);
Hinweis: Sowohl Nachrichtenproduzenten als auch -konsumenten benötigen einen Exchange.
3. Nachrichten senden
Wir senden Nachrichten an den Exchange, und der Exchange liefert Nachrichten basierend auf den Routing-Regeln an die entsprechende Queue.
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
// Eine RabbitMQ-Verbindung erstellen
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
// Einen Kanal erstellen
$channel = $connection->channel();
// Exchange deklarieren
$channel->exchange_declare(
'tizi365.direct', // Eindeutiger Exchange-Name
'direct', // Exchange-Typ
false,
false, // Ob persistent
false
);
// Nachrichtenobjekt, Nachrichteninhalt ist der Parameter
$msg = new AMQPMessage("Hallo tizi365.com");
// Achten Sie auf den dritten Parameter, den Routing-Parameter
$channel->basic_publish(
$msg, // Nachrichtenobjekt
'tizi365.direct', // Exchange-Name
"tizi365" // Routing-Parameter, kann bei Bedarf beliebig definiert werden
);
echo ' [x] Gesendet ', $msg->getBody(), "\n";
// Ressourcen freigeben
$channel->close();
$connection->close();
Hinweis: Der dritte Parameter in der
basic_publish
-Methode ist ein Schlüsselparameter.
4. Nachrichten empfangen
4.1. Warteschlange definieren & Exchange binden
Um Warteschlangennachrichten zu verbrauchen, müssen Sie zuerst eine Warteschlange definieren und dann die Warteschlange an die Ziel-Exchange binden.
// Definiere eine anonyme Warteschlange
list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);
// Binde die Warteschlange an die angegebene Exchange
$channel->queue_bind(
$queue_name, // Warteschlangenname
'tizi365.fanout', // Exchange-Name
"tizi365" // Bindungs-Routing-Parameter, binde hier 'tizi365'
);
Hinweis: Gemäß den Regeln der direkten Exchange, wenn der Routing-Parameter, der beim Senden einer Nachricht übermittelt wird, mit dem Routing-Parameter übereinstimmt, der beim Binden der Warteschlange an die Exchange festgelegt ist, wird die Nachricht an diese Warteschlange übermittelt.
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
// Erstelle eine RabbitMQ-Verbindung
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
// Erstelle einen Channel
$channel = $connection->channel();
// Exchange deklarieren
$channel->exchange_declare(
'tizi365.direct', // Exchange-Name, muss eindeutig sein und darf nicht wiederholt werden
'direct', // Exchange-Typ
false,
false, // Ob es persistent ist
false
);
// Definiere eine anonyme Warteschlange
list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);
// Binde die Warteschlange an eine bestimmte Exchange
$channel->queue_bind(
$queue_name, // Warteschlangenname
'tizi365.fanout', // Exchange-Name
"tizi365" // Bindungs-Routing-Parameter, binde 'tizi365' hier
);
echo " [*] Warte auf Nachrichten. Drücken Sie STRG+C, um zu beenden\n";
// Definiere die Nachrichtenverarbeitungsfunktion (hier wird eine anonyme Funktion verwendet)
$callback = function ($msg) {
// Nachrichtenverarbeitungslogik
echo ' [x] ', $msg->body, "\n";
};
// Erstelle einen Consumer
$channel->basic_consume(
$queue_name, // Warteschlangenname, der Name der zu verbrauchenden Warteschlange
'', // Consumer-Tag, wird ignoriert, eine eindeutige ID wird automatisch generiert
false,
true, // Ob die Nachricht automatisch bestätigt werden soll, d.h. ob automatisch an RabbitMQ gemeldet werden soll, dass die Nachricht erfolgreich verarbeitet wurde.
false,
false,
$callback // Nachrichtenverarbeitungsfunktion
);
// Blockiere den Prozess, bis der Channel nicht geschlossen ist, um zu verhindern, dass der Prozess beendet wird
while ($channel->is_open()) {
$channel->wait();
}
// Ressourcen freigeben
$channel->close();
$connection->close();