PHP RabbitMQ-Publish/Subscribe-Muster (auch bekannt als Fanout-Muster)

Der Fanout-Modus in RabbitMQ ist ein Muster, bei dem von einem Produzenten gesendete Nachrichten von mehreren Verbrauchern in verschiedenen Warteschlangen verarbeitet werden, wie in der folgenden Architekturdiagramm dargestellt.

RabbitMQ Arbeitsmodus

Die Fanout-Börse kann Nachrichten an alle gebundenen Warteschlangen weiterleiten.

Hinweis: Unabhängig vom verwendeten RabbitMQ-Arbeitsmodus liegt der Unterschied in der Art des Austauschs und der verwendeten Routing-Parameter.

1. Voraussetzung Tutorial

Bitte lesen Sie die folgenden Kapitel, um das relevante Wissen zu verstehen:

2. Definition eines Fanout-Austauschs

Definieren Sie den Austausch über die Funktion exchange_declare des Kanals.

$channel->exchange_declare(
    'tizi365.fanout', // Austauschname, muss eindeutig und darf nicht dupliziert werden.
    'fanout', // Austauschtyp
    false,
    false, // Ob zu behalten
    false
);

Hinweis: Sowohl Nachrichtenproduzenten als auch Verbraucher benötigen den Austausch.

3. Senden von Nachrichten

Wir senden Nachrichten an den Austausch, und der Austausch liefert die Nachrichten basierend auf den Routingregeln an die entsprechenden Warteschlangen.

<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

// Erstellen einer RabbitMQ-Verbindung
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
// Kanal erstellen
$channel = $connection->channel();

// Austausch deklarieren
$channel->exchange_declare(
    'tizi365.fanout', // Austauschname, muss eindeutig und darf nicht dupliziert werden.
    'fanout', // Austauschtyp
    false,
    false, // Ob zu behalten
    false
);
// Nachrichtenobjekt mit dem Nachrichteninhalt als Parameter
$msg = new AMQPMessage("Hallo tizi365.com");

$channel->basic_publish(
    $msg, // Nachrichtenobjekt
    'tizi365.fanout' // Austauschname
);

echo ' [x] Gesendet ', $msg->getBody(), "\n";

// Ressourcen freigeben
$channel->close();
$connection->close();

4. Empfangen von Nachrichten

4.1. Definieren einer Warteschlange & Binden der Exchange

Um Warteschlangennachrichten zu verbrauchen, müssen Sie zuerst eine Warteschlange definieren und sie dann an die Ziel-Exchange binden. Nachfolgend wird eine Warteschlange definiert und an eine spezifische Exchange gebunden.

// Definiere eine Warteschlange. Wenn der Warteschlangenname leer ist, wird automatisch eine eindeutige ID generiert, und der Warteschlangenname wird zurückgegeben
list($queue_name, ,) = $channel->queue_declare(
    "", // Warteschlangenname, darf nicht dupliziert werden. Wenn leer, wird automatisch eine eindeutige ID generiert, und es entsteht eine anonyme Warteschlange
    false,
    false, // Ob persistieren
    true,
    false
);

// Binde die Warteschlange an die angegebene Exchange
$channel->queue_bind(
    $queue_name, // Warteschlangenname
    'tizi365.fanout' // Exchange-Name
);
<?php 
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;

// Erstelle eine RabbitMQ-Verbindung
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
// Erstelle einen Kanal
$channel = $connection->channel();

// Erkläre eine Exchange
$channel->exchange_declare(
    'tizi365.fanout', // Exchange-Name, muss eindeutig sein und darf nicht wiederholt werden
    'fanout', // Exchange-Typ
    false,
    false, // Ob persistieren
    false
);

// Erkläre eine Warteschlange
list($queue_name, ,) = $channel->queue_declare(
    "", // Warteschlangenname, darf nicht wiederholt werden. Wenn leer, dann generiere eine eindeutige ID, dann ist es eine anonyme Warteschlange
    false,
    false, // Ob persistieren
    true,
    false
);

// Binde die Warteschlange an die angegebene Exchange
$channel->queue_bind(
    $queue_name, // Warteschlangenname
    'tizi365.fanout' // Exchange-Name
);

echo " [*] Warte auf Nachrichten. Um zu beenden, drücke STRG+C\n";

// Definiere die Funktion zur Nachrichtenverarbeitung (hier wird eine anonyme Funktion verwendet)
$callback = function ($msg) {
    // Logik zur Nachrichtenverarbeitung
    echo ' [x] ', $msg->body, "\n";
};

// Erstelle einen Konsumenten
$channel->basic_consume(
    $queue_name, // Warteschlangenname, die zu verbrauchende Warteschlange
    '', // Konsumenten-Tag, ignorieren, dann eine eindeutige ID generieren
    false,
    true, // Ob die Nachricht automatisch bestätigt werden soll, d.h. RabbitMQ mitteilen, dass die Nachricht erfolgreich verarbeitet wurde
    false,
    false,
    $callback // Funktion zur Nachrichtenverarbeitung
);

// Wenn der Kanal nicht geschlossen ist, blockiere den Prozess, um ein Beenden zu verhindern
while ($channel->is_open()) {
    $channel->wait();
}

// Ressourcen freigeben
$channel->close();
$connection->close();