Modo de enrutamiento de RabbitMQ con PHP (Modo Directo)
En el modo de enrutamiento de RabbitMQ, se utiliza el tipo de intercambio "directo". La diferencia principal del modo de publicación-suscripción es que el intercambio directo entrega mensajes a colas cuyos parámetros de enrutamiento coinciden completamente. La arquitectura se muestra en el siguiente gráfico:
Nota: Independientemente del modo de trabajo utilizado en RabbitMQ, la diferencia radica en el tipo de intercambio utilizado y los parámetros de enrutamiento.
1. Tutorial Preparatorio
Por favor, lea primero las siguientes secciones para entender el conocimiento relevante:
- Conceptos Básicos de RabbitMQ
- Principios del Modo de Enrutamiento de RabbitMQ
- Inicio Rápido para PHP con RabbitMQ (requerido, ya que las secciones posteriores no repetirán el código, solo mostrarán el código clave)
- Sección de Patrón de Publicación-Suscripción de RabbitMQ para PHP (requerido, ya que la estructura del código es casi la misma, excepto por el tipo de intercambio y los parámetros de enrutamiento)
2. Definir Intercambio Directo
// Declarar el intercambio
$channel->exchange_declare(
'tizi365.direct', // Nombre único del intercambio
'direct', // Tipo de intercambio
false,
false, // Si se debe persistir
false
);
Nota: Tanto los productores como los consumidores de mensajes necesitan un intercambio.
3. Envío de Mensajes
Enviamos mensajes al intercambio, y el intercambio entrega mensajes a la cola correspondiente según las reglas de enrutamiento.
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
// Crear una conexión RabbitMQ
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
// Crear un canal
$channel = $connection->channel();
// Declarar el intercambio
$channel->exchange_declare(
'tizi365.direct', // Nombre único del intercambio
'direct', // Tipo de intercambio
false,
false, // Si se debe persistir
false
);
// Objeto de mensaje, el contenido del mensaje es el parámetro
$msg = new AMQPMessage("hola tizi365.com");
// Prestar atención al tercer parámetro, parámetro de enrutamiento
$channel->basic_publish(
$msg, // Objeto de mensaje
'tizi365.direct', // Nombre del intercambio
"tizi365" // Parámetro de enrutamiento, se puede definir arbitrariamente según sea necesario
);
echo ' [x] Enviado ', $msg->getBody(), "\n";
// Liberar recursos
$channel->close();
$connection->close();
Nota: El tercer parámetro en el método
basic_publish
es un parámetro clave.
4. Recepción de Mensajes
4.1. Definir Cola y Vincular Intercambio
Para consumir mensajes de la cola, primero necesitas definir una cola y luego vincularla al intercambio deseado.
// Declarar una cola anónima
list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);
// Vincular la cola al intercambio especificado
$channel->queue_bind(
$queue_name, // Nombre de la cola
'tizi365.fanout', // Nombre del intercambio
"tizi365" // Parámetro de enrutamiento de vinculación, vincular 'tizi365' aquí
);
Nota: Según las reglas del intercambio directo, si el parámetro de enrutamiento incluido al enviar un mensaje coincide con el parámetro de enrutamiento establecido al vincular la cola al intercambio, el mensaje se entregará a esta cola.
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
// Crear una conexión rabbitmq
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
// Crear un canal
$channel = $connection->channel();
// Declarar un intercambio
$channel->exchange_declare(
'tizi365.direct', // Nombre del intercambio, debe ser único y no puede repetirse
'direct', // Tipo de intercambio
false,
false, // Si es duradero
false
);
// Declarar una cola anónima
list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);
// Vincular la cola a un intercambio específico
$channel->queue_bind(
$queue_name, // Nombre de la cola
'tizi365.fanout', // Nombre del intercambio
"tizi365" // Parámetro de enrutamiento de vinculación, vincula 'tizi365' aquí
);
echo " [*] Esperando mensaje. Para salir, presiona CTRL+C\n";
// Definir la función de manejo de mensajes (usando una función anónima aquí)
$callback = function ($msg) {
// Lógica de manejo del mensaje
echo ' [x] ', $msg->body, "\n";
};
// Crear un consumidor
$channel->basic_consume(
$queue_name, // Nombre de la cola, el nombre de la cola a consumir
'', // Etiqueta del consumidor, ignorada, se genera automáticamente un ID único
false,
true, // Si auto-aceptar automáticamente el mensaje, es decir, informar automáticamente a rabbitmq que el mensaje ha sido procesado con éxito
false,
false,
$callback // Función de manejo del mensaje
);
// Bloquear el proceso hasta que el canal no esté cerrado, para evitar que el proceso se cierre
while ($channel->is_open()) {
$channel->wait();
}
// Liberar recursos
$channel->close();
$connection->close();