Mode de routage RabbitMQ (mode direct)
Dans le mode de routage de RabbitMQ, le type d'échange utilisé est "direct". La principale différence par rapport au mode publication-abonnement est que l'échange direct envoie les messages aux files d'attente dont les paramètres de routage correspondent totalement. L'architecture est illustrée dans le graphique suivant :
Remarque : Peu importe le mode de travail utilisé dans RabbitMQ, la différence réside dans le type d'échange utilisé et les paramètres de routage.
1. Tutoriel préparatoire
Veuillez lire les sections suivantes en premier pour comprendre les connaissances pertinentes :
- Concepts de base de RabbitMQ
- Principes du mode de routage RabbitMQ
- Démarrage rapide pour PHP avec RabbitMQ (requis, car les sections suivantes ne répéteront pas le code, mais afficheront uniquement le code clé)
- Section du modèle publication-abonnement RabbitMQ pour PHP (requis, car la structure du code est presque la même, à l'exception du type d'échange et des paramètres de routage)
2. Définir un échange direct
// Déclarer l'échange
$channel->exchange_declare(
'tizi365.direct', // Nom d'échange unique
'direct', // Type d'échange
false,
false, // Persistant ou non
false
);
Remarque : Les producteurs et les consommateurs de messages ont besoin d'un échange.
3. Envoyer des messages
Nous envoyons des messages à l'échange, et l'échange délivre les messages à la file d'attente correspondante en fonction des règles de routage.
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
// Créer une connexion RabbitMQ
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
// Créer un canal
$channel = $connection->channel();
// Déclarer l'échange
$channel->exchange_declare(
'tizi365.direct', // Nom d'échange unique
'direct', // Type d'échange
false,
false, // Persistant ou non
false
);
// Objet message, le contenu du message est le paramètre
$msg = new AMQPMessage("bonjour tizi365.com");
// Faites attention au troisième paramètre, le paramètre de routage
$channel->basic_publish(
$msg, // Objet message
'tizi365.direct', // Nom de l'échange
"tizi365" // Paramètre de routage, pouvant être défini arbitrairement selon les besoins
);
echo ' [x] Envoyé ', $msg->getBody(), "\n";
// Libérer les ressources
$channel->close();
$connection->close();
Remarque : Le troisième paramètre dans la méthode
basic_publish
est un paramètre clé.
4. Recevoir des messages
4.1. Définir la file d'attente et lier l'échange
Pour consommer les messages de la file d'attente, vous devez d'abord définir une file d'attente, puis lier la file d'attente à l'échange cible.
// Déclarer une file d'attente anonyme
list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);
// Lier la file d'attente à l'échange spécifié
$channel->queue_bind(
$queue_name, // Nom de la file d'attente
'tizi365.fanout', // Nom de l'échange
"tizi365" // Paramètre de routage de liaison, lier 'tizi365' ici
);
Remarque : Selon les règles de l'échange direct, si le paramètre de routage transmis lors de l'envoi d'un message correspond au paramètre de routage défini lors de la liaison de la file d'attente à l'échange, le message sera livré à cette file d'attente.
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
// Créer une connexion rabbitmq
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
// Créer un canal
$channel = $connection->channel();
// Déclarer un échange
$channel->exchange_declare(
'tizi365.direct', // Nom de l'échange, doit être unique et ne peut pas être répété
'direct', // Type d'échange
false,
false, // Si c'est durable
false
);
// Déclarer une file d'attente anonyme
list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);
// Lier la file d'attente à un échange spécifique
$channel->queue_bind(
$queue_name, // Nom de la file d'attente
'tizi365.fanout', // Nom de l'échange
"tizi365" // Paramètre de routage de liaison, lie 'tizi365' ici
);
echo " [*] En attente du message. Appuyez sur CTRL+C pour sortir\n";
// Définir la fonction de gestion des messages (utilisation d'une fonction anonyme ici)
$callback = function ($msg) {
// Logique de traitement des messages
echo ' [x] ', $msg->body, "\n";
};
// Créer un consommateur
$channel->basic_consume(
$queue_name, // Nom de la file d'attente à consommer
'', // Tag du consommateur, ignoré, un ID unique est généré automatiquement
false,
true, // Reconnaître automatiquement le message, c'est-à-dire informer automatiquement rabbitmq que le message a été traité avec succès.
false,
false,
$callback // Fonction de traitement des messages
);
// Bloquer le processus jusqu'à ce que le canal ne soit pas fermé, pour éviter que le processus ne se ferme
while ($channel->is_open()) {
$channel->wait();
}
// Libérer les ressources
$channel->close();
$connection->close();