Mode de routage RabbitMQ (mode direct)

Dans le mode de routage de RabbitMQ, le type d'échange utilisé est "direct". La principale différence par rapport au mode publication-abonnement est que l'échange direct envoie les messages aux files d'attente dont les paramètres de routage correspondent totalement. L'architecture est illustrée dans le graphique suivant :

Mode direct RabbitMQ

Remarque : Peu importe le mode de travail utilisé dans RabbitMQ, la différence réside dans le type d'échange utilisé et les paramètres de routage.

1. Tutoriel préparatoire

Veuillez lire les sections suivantes en premier pour comprendre les connaissances pertinentes :

2. Définir un échange direct

// Déclarer l'échange
$channel->exchange_declare(
    'tizi365.direct', // Nom d'échange unique
    'direct', // Type d'échange
    false,
    false, // Persistant ou non
    false
);

Remarque : Les producteurs et les consommateurs de messages ont besoin d'un échange.

3. Envoyer des messages

Nous envoyons des messages à l'échange, et l'échange délivre les messages à la file d'attente correspondante en fonction des règles de routage.

<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

// Créer une connexion RabbitMQ
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
// Créer un canal
$channel = $connection->channel();

// Déclarer l'échange
$channel->exchange_declare(
    'tizi365.direct', // Nom d'échange unique
    'direct', // Type d'échange
    false,
    false, // Persistant ou non
    false
);

// Objet message, le contenu du message est le paramètre
$msg = new AMQPMessage("bonjour tizi365.com");

// Faites attention au troisième paramètre, le paramètre de routage
$channel->basic_publish(
    $msg, // Objet message
    'tizi365.direct', // Nom de l'échange
    "tizi365" // Paramètre de routage, pouvant être défini arbitrairement selon les besoins
);

echo ' [x] Envoyé ', $msg->getBody(), "\n";

// Libérer les ressources
$channel->close();
$connection->close();

Remarque : Le troisième paramètre dans la méthode basic_publish est un paramètre clé.

4. Recevoir des messages

4.1. Définir la file d'attente et lier l'échange

Pour consommer les messages de la file d'attente, vous devez d'abord définir une file d'attente, puis lier la file d'attente à l'échange cible.

// Déclarer une file d'attente anonyme
list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);

// Lier la file d'attente à l'échange spécifié
$channel->queue_bind(
    $queue_name, // Nom de la file d'attente
    'tizi365.fanout', // Nom de l'échange
    "tizi365" // Paramètre de routage de liaison, lier 'tizi365' ici
);

Remarque : Selon les règles de l'échange direct, si le paramètre de routage transmis lors de l'envoi d'un message correspond au paramètre de routage défini lors de la liaison de la file d'attente à l'échange, le message sera livré à cette file d'attente.

<?php

require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;

// Créer une connexion rabbitmq
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
// Créer un canal
$channel = $connection->channel();

// Déclarer un échange
$channel->exchange_declare(
    'tizi365.direct', // Nom de l'échange, doit être unique et ne peut pas être répété
    'direct', // Type d'échange
    false,
    false, // Si c'est durable
    false
);

// Déclarer une file d'attente anonyme
list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);

// Lier la file d'attente à un échange spécifique
$channel->queue_bind(
    $queue_name, // Nom de la file d'attente
    'tizi365.fanout', // Nom de l'échange
    "tizi365" // Paramètre de routage de liaison, lie 'tizi365' ici
);

echo " [*] En attente du message. Appuyez sur CTRL+C pour sortir\n";

// Définir la fonction de gestion des messages (utilisation d'une fonction anonyme ici)
$callback = function ($msg) {
    // Logique de traitement des messages
    echo ' [x] ', $msg->body, "\n";
};

// Créer un consommateur
$channel->basic_consume(
    $queue_name, // Nom de la file d'attente à consommer
    '', // Tag du consommateur, ignoré, un ID unique est généré automatiquement
    false,
    true, // Reconnaître automatiquement le message, c'est-à-dire informer automatiquement rabbitmq que le message a été traité avec succès.
    false,
    false,
    $callback // Fonction de traitement des messages
);

// Bloquer le processus jusqu'à ce que le canal ne soit pas fermé, pour éviter que le processus ne se ferme
while ($channel->is_open()) {
    $channel->wait();
}

// Libérer les ressources
$channel->close();
$connection->close();