Patrón Publicar/Suscribir de RabbitMQ en PHP (también conocido como Patrón de Distribución)
El modo de distribución en RabbitMQ es un patrón en el que los mensajes enviados por un productor son procesados por múltiples consumidores en diferentes colas, como se muestra en el diagrama de arquitectura a continuación.
El intercambio de distribución puede enviar mensajes a todas las colas vinculadas.
Nota: Independientemente del modo de trabajo de RabbitMQ utilizado, la diferencia radica en el tipo de intercambio y los parámetros de enrutamiento utilizados.
1. Tutorial de Prerrequisitos
Por favor, lee los siguientes capítulos para entender el conocimiento relevante:
- Conceptos Básicos de RabbitMQ
- Patrón Publicar/Suscribir de RabbitMQ
- Inicio Rápido de RabbitMQ en PHP (Obligatorio, ya que los capítulos posteriores no repetirán el código, sino que mostrarán solo fragmentos clave)
2. Definir un Intercambio de Distribución
Define el intercambio a través de la función exchange_declare del canal.
$channel->exchange_declare(
'tizi365.fanout', // Nombre del intercambio, debe ser único y no puede duplicarse
'fanout', // Tipo de intercambio
false,
false, // Si persistir
false
);
Nota: Tanto los productores de mensajes como los consumidores requieren el intercambio.
3. Envío de Mensajes
Enviamos mensajes al intercambio, y el intercambio entrega los mensajes a las colas correspondientes según las reglas de enrutamiento.
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
// Crear una conexión a RabbitMQ
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
// Crear un canal
$channel = $connection->channel();
// Declarar el intercambio
$channel->exchange_declare(
'tizi365.fanout', // Nombre del intercambio, debe ser único y no puede duplicarse
'fanout', // Tipo de intercambio
false,
false, // Si persistir
false
);
// Objeto mensaje, con el contenido del mensaje como parámetro
$msg = new AMQPMessage("Hola tizi365.com");
$channel->basic_publish(
$msg, // Objeto mensaje
'tizi365.fanout' // Nombre del intercambio
);
echo ' [x] Enviado ', $msg->getBody(), "\n";
// Liberar recursos
$channel->close();
$connection->close();
4. Recepción de Mensajes
4.1. Definición de una Cola y Vinculación del Intercambio
Para consumir mensajes de la cola, primero necesitas definir una cola y luego vincularla al intercambio de destino. A continuación, se define una cola y se vincula a un intercambio específico.
// Declara una cola, si el nombre de la cola está vacío, se genera automáticamente un ID único y se devuelve el nombre de la cola
list($queue_name, ,) = $channel->queue_declare(
"", // Nombre de la cola, no se permite duplicar, si está vacío, se genera automáticamente un ID único, convirtiéndola en una cola anónima
false,
false, // Si se debe persistir
true,
false
);
// Vincula la cola al intercambio especificado
$channel->queue_bind(
$queue_name, // Nombre de la cola
'tizi365.fanout' // Nombre del intercambio
);
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
// Crea una conexión a rabbitmq
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
// Crea un canal
$channel = $connection->channel();
// Declara un intercambio
$channel->exchange_declare(
'tizi365.fanout', // Nombre del intercambio, debe ser único y no puede repetirse
'fanout', // Tipo de intercambio
false,
false, // Si debe persistirse
false
);
// Declara una cola
list($queue_name, ,) = $channel->queue_declare(
"", // Nombre de la cola, no se permite repetir, si está vacío se genera un ID único, convirtiéndola en una cola anónima
false,
false, // Si debe persistirse
true,
false
);
// Vincula la cola al intercambio especificado
$channel->queue_bind(
$queue_name, // Nombre de la cola
'tizi365.fanout' // Nombre del intercambio
);
echo " [*] Esperando mensaje. Para salir, presiona CTRL+C\n";
// Define la función de procesamiento de mensajes (usando una función anónima aquí)
$callback = function ($msg) {
// Lógica de procesamiento de mensajes
echo ' [x] ', $msg->body, "\n";
};
// Crea un consumidor
$channel->basic_consume(
$queue_name, // Nombre de la cola, la cola a consumir
'', // Etiqueta del consumidor, ignora, luego genera un ID único
false,
true, // Si debe reconocer automáticamente el mensaje, es decir, indicar a rabbitmq que el mensaje se ha procesado correctamente.
false,
false,
$callback // Función de procesamiento de mensajes
);
// Si el canal no está cerrado, permanece bloqueado el proceso para evitar que se salga
while ($channel->is_open()) {
$channel->wait();
}
// Liberar recursos
$channel->close();
$connection->close();