Padrão de Publicação/Assinatura do RabbitMQ em PHP (também conhecido como Padrão de Fanout)
O modo Fanout no RabbitMQ é um padrão no qual mensagens enviadas por um produtor são processadas por vários consumidores em filas diferentes, como mostrado no diagrama de arquitetura abaixo.
A exchange de Fanout pode encaminhar mensagens para todas as filas vinculadas.
Nota: Independentemente do modo de trabalho do RabbitMQ usado, a diferença está no tipo de exchange e nos parâmetros de roteamento usados.
1. Tutorial Prévio
Por favor, leia os seguintes capítulos para entender o conhecimento relevante:
- Conceitos Básicos do RabbitMQ
- Padrão de Publicação/Assinatura do RabbitMQ
- Início Rápido do RabbitMQ em PHP (Obrigatório, pois os capítulos subsequentes não repetirão o código, apenas mostrarão trechos de código-chave)
2. Definindo uma Exchange de Fanout
Defina a exchange através da função exchange_declare do canal.
$channel->exchange_declare(
'tizi365.fanout', // Nome da exchange, deve ser único e não pode ser duplicado
'fanout', // Tipo de exchange
false,
false, // Se deve persistir
false
);
Nota: Produtores e consumidores de mensagens requerem a exchange.
3. Enviando Mensagens
Enviamos mensagens para a exchange, e a exchange entrega as mensagens para as filas correspondentes com base nas regras de roteamento.
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
// Criar uma conexão RabbitMQ
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
// Criar um canal
$channel = $connection->channel();
// Declarar a exchange
$channel->exchange_declare(
'tizi365.fanout', // Nome da exchange, deve ser único e não pode ser duplicado
'fanout', // Tipo de exchange
false,
false, // Se deve persistir
false
);
// Objeto de mensagem, com o conteúdo da mensagem como parâmetro
$msg = new AMQPMessage("Olá tizi365.com");
$channel->basic_publish(
$msg, // Objeto de mensagem
'tizi365.fanout' // Nome da exchange
);
echo ' [x] Enviado ', $msg->getBody(), "\n";
// Liberar recursos
$channel->close();
$connection->close();
4. Recebendo Mensagens
4.1. Definindo uma Fila e Vinculando a Troca
Para consumir mensagens da fila, você precisa primeiro definir uma fila e, em seguida, vincular a fila à troca de destino. Abaixo, uma fila é definida e vinculada a uma troca específica.
// Declara uma fila, se o nome da fila estiver vazio, um ID único é gerado automaticamente, e o nome da fila é retornado
list($queue_name, ,) = $channel->queue_declare(
"", // Nome da fila, não pode ser duplicado, se estiver vazio, um ID único é gerado automaticamente, tornando-o uma fila anônima
false,
false, // Se deve persistir
true,
false
);
// Vincula a fila à troca especificada
$channel->queue_bind(
$queue_name, // Nome da fila
'tizi365.fanout' // Nome da troca
);
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
// Cria uma conexão rabbitmq
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
// Cria um Canal
$channel = $connection->channel();
// Declara uma troca
$channel->exchange_declare(
'tizi365.fanout', // Nome da troca, precisa ser único e não pode se repetir
'fanout', // Tipo da troca
false,
false, // Se deve persistir
false
);
// Declara uma fila
list($queue_name, ,) = $channel->queue_declare(
"", // Nome da fila, não pode ser repetido, se vazio gera um ID único, então é uma fila anônima
false,
false, // Se deve persistir
true,
false
);
// Vincula a fila à troca especificada
$channel->queue_bind(
$queue_name, // Nome da fila
'tizi365.fanout' // Nome da troca
);
echo " [*] Aguardando mensagem. Pressione CTRL+C para sair\n";
// Define a função de processamento de mensagens (usando uma função anônima aqui)
$callback = function ($msg) {
// Lógica de processamento de mensagens
echo ' [x] ', $msg->body, "\n";
};
// Cria um consumidor
$channel->basic_consume(
$queue_name, // Nome da fila, a fila a ser consumida
'', // Tag do consumidor, ignorada, então gera um ID único
false,
true, // Se deve reconhecer automaticamente a mensagem, ou seja, dizer ao rabbitmq que a mensagem foi processada com sucesso.
false,
false,
$callback // Função de processamento de mensagens
);
// Se o canal não estiver fechado, continue bloqueando o processo para evitar que ele saia
while ($channel->is_open()) {
$channel->wait();
}
// Libera os recursos
$channel->close();
$connection->close();