PHP RabbitMQ Routing-Modus (Direktmodus)

Im Routing-Modus von RabbitMQ wird der Typ des verwendeten Exchanges als "direct" verwendet. Der Hauptunterschied zum Publish-Subscribe-Modus besteht darin, dass der direkte Austausch Nachrichten an Queues liefert, deren Routing-Parameter vollständig übereinstimmen. Die Architektur sieht wie folgt aus:

RabbitMQ Direktmodus

Hinweis: Unabhängig vom Arbeitsmodus in RabbitMQ liegt der Unterschied im Typ des verwendeten Exchanges und den Routing-Parametern.

1. Vorbereitendes Tutorial

Bitte lesen Sie zuerst die folgenden Abschnitte, um das relevante Wissen zu verstehen:

2. Direkten Exchange definieren

// Exchange deklarieren
$channel->exchange_declare(
    'tizi365.direct', // Eindeutiger Exchange-Name
    'direct', // Exchange-Typ
    false,
    false, // Ob persistent
    false
);

Hinweis: Sowohl Nachrichtenproduzenten als auch -konsumenten benötigen einen Exchange.

3. Nachrichten senden

Wir senden Nachrichten an den Exchange, und der Exchange liefert Nachrichten basierend auf den Routing-Regeln an die entsprechende Queue.

<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

// Eine RabbitMQ-Verbindung erstellen
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
// Einen Kanal erstellen
$channel = $connection->channel();

// Exchange deklarieren
$channel->exchange_declare(
    'tizi365.direct', // Eindeutiger Exchange-Name
    'direct', // Exchange-Typ
    false,
    false, // Ob persistent
    false
);

// Nachrichtenobjekt, Nachrichteninhalt ist der Parameter
$msg = new AMQPMessage("Hallo tizi365.com");

// Achten Sie auf den dritten Parameter, den Routing-Parameter
$channel->basic_publish(
    $msg, // Nachrichtenobjekt
    'tizi365.direct', // Exchange-Name
    "tizi365" // Routing-Parameter, kann bei Bedarf beliebig definiert werden
);

echo ' [x] Gesendet ', $msg->getBody(), "\n";

// Ressourcen freigeben
$channel->close();
$connection->close();

Hinweis: Der dritte Parameter in der basic_publish-Methode ist ein Schlüsselparameter.

4. Nachrichten empfangen

4.1. Warteschlange definieren & Exchange binden

Um Warteschlangennachrichten zu verbrauchen, müssen Sie zuerst eine Warteschlange definieren und dann die Warteschlange an die Ziel-Exchange binden.

// Definiere eine anonyme Warteschlange
list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);

// Binde die Warteschlange an die angegebene Exchange
$channel->queue_bind(
    $queue_name, // Warteschlangenname
    'tizi365.fanout', // Exchange-Name
    "tizi365" // Bindungs-Routing-Parameter, binde hier 'tizi365'
);

Hinweis: Gemäß den Regeln der direkten Exchange, wenn der Routing-Parameter, der beim Senden einer Nachricht übermittelt wird, mit dem Routing-Parameter übereinstimmt, der beim Binden der Warteschlange an die Exchange festgelegt ist, wird die Nachricht an diese Warteschlange übermittelt.

<?php

require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;

// Erstelle eine RabbitMQ-Verbindung
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
// Erstelle einen Channel
$channel = $connection->channel();

// Exchange deklarieren
$channel->exchange_declare(
    'tizi365.direct', // Exchange-Name, muss eindeutig sein und darf nicht wiederholt werden
    'direct', // Exchange-Typ
    false,
    false, // Ob es persistent ist
    false
);

// Definiere eine anonyme Warteschlange
list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);

// Binde die Warteschlange an eine bestimmte Exchange
$channel->queue_bind(
    $queue_name, // Warteschlangenname
    'tizi365.fanout', // Exchange-Name
    "tizi365" // Bindungs-Routing-Parameter, binde 'tizi365' hier
);

echo " [*] Warte auf Nachrichten. Drücken Sie STRG+C, um zu beenden\n";

// Definiere die Nachrichtenverarbeitungsfunktion (hier wird eine anonyme Funktion verwendet)
$callback = function ($msg) {
    // Nachrichtenverarbeitungslogik
    echo ' [x] ', $msg->body, "\n";
};

// Erstelle einen Consumer
$channel->basic_consume(
    $queue_name, // Warteschlangenname, der Name der zu verbrauchenden Warteschlange
    '', // Consumer-Tag, wird ignoriert, eine eindeutige ID wird automatisch generiert
    false,
    true, // Ob die Nachricht automatisch bestätigt werden soll, d.h. ob automatisch an RabbitMQ gemeldet werden soll, dass die Nachricht erfolgreich verarbeitet wurde.
    false,
    false,
    $callback // Nachrichtenverarbeitungsfunktion
);

// Blockiere den Prozess, bis der Channel nicht geschlossen ist, um zu verhindern, dass der Prozess beendet wird
while ($channel->is_open()) {
    $channel->wait();
}

// Ressourcen freigeben
$channel->close();
$connection->close();