Patrón Publicar/Suscribir de RabbitMQ en PHP (también conocido como Patrón de Distribución)

El modo de distribución en RabbitMQ es un patrón en el que los mensajes enviados por un productor son procesados por múltiples consumidores en diferentes colas, como se muestra en el diagrama de arquitectura a continuación.

Modo de trabajo de RabbitMQ

El intercambio de distribución puede enviar mensajes a todas las colas vinculadas.

Nota: Independientemente del modo de trabajo de RabbitMQ utilizado, la diferencia radica en el tipo de intercambio y los parámetros de enrutamiento utilizados.

1. Tutorial de Prerrequisitos

Por favor, lee los siguientes capítulos para entender el conocimiento relevante:

2. Definir un Intercambio de Distribución

Define el intercambio a través de la función exchange_declare del canal.

$channel->exchange_declare(
    'tizi365.fanout', // Nombre del intercambio, debe ser único y no puede duplicarse
    'fanout', // Tipo de intercambio
    false,
    false, // Si persistir
    false
);

Nota: Tanto los productores de mensajes como los consumidores requieren el intercambio.

3. Envío de Mensajes

Enviamos mensajes al intercambio, y el intercambio entrega los mensajes a las colas correspondientes según las reglas de enrutamiento.

<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

// Crear una conexión a RabbitMQ
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
// Crear un canal
$channel = $connection->channel();

// Declarar el intercambio
$channel->exchange_declare(
    'tizi365.fanout', // Nombre del intercambio, debe ser único y no puede duplicarse
    'fanout', // Tipo de intercambio
    false,
    false, // Si persistir
    false
);
// Objeto mensaje, con el contenido del mensaje como parámetro
$msg = new AMQPMessage("Hola tizi365.com");

$channel->basic_publish(
    $msg, // Objeto mensaje
    'tizi365.fanout' // Nombre del intercambio
);

echo ' [x] Enviado ', $msg->getBody(), "\n";

// Liberar recursos
$channel->close();
$connection->close();

4. Recepción de Mensajes

4.1. Definición de una Cola y Vinculación del Intercambio

Para consumir mensajes de la cola, primero necesitas definir una cola y luego vincularla al intercambio de destino. A continuación, se define una cola y se vincula a un intercambio específico.

// Declara una cola, si el nombre de la cola está vacío, se genera automáticamente un ID único y se devuelve el nombre de la cola
list($queue_name, ,) = $channel->queue_declare(
    "", // Nombre de la cola, no se permite duplicar, si está vacío, se genera automáticamente un ID único, convirtiéndola en una cola anónima
    false,
    false, // Si se debe persistir
    true,
    false
);

// Vincula la cola al intercambio especificado
$channel->queue_bind(
    $queue_name, // Nombre de la cola
    'tizi365.fanout' // Nombre del intercambio
);
<?php 
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;

// Crea una conexión a rabbitmq
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
// Crea un canal
$channel = $connection->channel();

// Declara un intercambio
$channel->exchange_declare(
    'tizi365.fanout', // Nombre del intercambio, debe ser único y no puede repetirse
    'fanout', // Tipo de intercambio
    false,
    false, // Si debe persistirse
    false
);

// Declara una cola
list($queue_name, ,) = $channel->queue_declare(
    "", // Nombre de la cola, no se permite repetir, si está vacío se genera un ID único, convirtiéndola en una cola anónima
    false,
    false, // Si debe persistirse
    true,
    false
);

// Vincula la cola al intercambio especificado
$channel->queue_bind(
    $queue_name, // Nombre de la cola
    'tizi365.fanout' // Nombre del intercambio
);

echo " [*] Esperando mensaje. Para salir, presiona CTRL+C\n";

// Define la función de procesamiento de mensajes (usando una función anónima aquí)
$callback = function ($msg) {
    // Lógica de procesamiento de mensajes
    echo ' [x] ', $msg->body, "\n";
};

// Crea un consumidor
$channel->basic_consume(
    $queue_name, // Nombre de la cola, la cola a consumir
    '', // Etiqueta del consumidor, ignora, luego genera un ID único
    false,
    true, // Si debe reconocer automáticamente el mensaje, es decir, indicar a rabbitmq que el mensaje se ha procesado correctamente.
    false,
    false,
    $callback // Función de procesamiento de mensajes
);

// Si el canal no está cerrado, permanece bloqueado el proceso para evitar que se salga
while ($channel->is_open()) {
    $channel->wait();
}

// Liberar recursos
$channel->close();
$connection->close();