Padrão de Publicação/Assinatura RabbitMQ em Java (Modo de Transmissão, Modo de Transmissão Multi)
O tipo de troca FanoutExchange é usado para o padrão de publicação/assinatura no RabbitMQ, onde uma mensagem enviada por um produtor será processada por múltiplas filas de consumidores. A arquitetura é mostrada no seguinte diagrama:
A troca Fanout pode encaminhar mensagens para todas as filas vinculadas.
Dica: Independentemente do modo de trabalho do RabbitMQ usado, a diferença está no tipo de troca e nos parâmetros de roteamento utilizados.
1. Tutoriais Prévios
Por favor, leia as seções a seguir primeiro para entender o conhecimento relevante:
- Conceitos Básicos do RabbitMQ
- Padrão de Publicação/Assinatura RabbitMQ
- Guia de Início Rápido do RabbitMQ para Java (obrigatório, pois as seções subsequentes não duplicarão o código, apenas mostrarão trechos de código-chave)
2. Definindo a Troca Fanout
No Spring AMQP, a classe FanoutExchange corresponde à troca Fanout. Definimos a troca por meio de uma classe de configuração do Spring Boot.
package com.tizi365.rabbitmq.config;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class QueueConfig {
// Define a troca
@Bean
public FanoutExchange fanout() {
// O parâmetro é o nome da troca, que deve ser único
return new FanoutExchange("tizi365.fanout");
}
}
Dica: Tanto os produtores quanto os consumidores de mensagens requerem uma troca.
3. Enviando Mensagens
Enviamos mensagens para a troca, que entregará as mensagens para as filas correspondentes com base nas regras de roteamento.
package com.tizi365.rabbitmq.service;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
@Service
public class SendService {
@Autowired
private RabbitTemplate template;
@Autowired
private FanoutExchange fanout;
// Para fins de demonstração, é usado uma tarefa agendada para enviar uma mensagem a cada segundo
@Scheduled(fixedDelay = 1000, initialDelay = 1000)
public void send() {
// Conteúdo da mensagem
String mensagem = "Olá Mundo!";
// Enviar mensagem
// O primeiro parâmetro é o nome da troca
// O segundo parâmetro é a chave de roteamento; a troca fanout irá ignorar a chave de roteamento, portanto, não precisa ser definida
// O terceiro parâmetro é o conteúdo da mensagem, que suporta qualquer tipo desde que possa ser serializado
template.convertAndSend(fanout.getName(), "", mensagem);
System.out.println("Mensagem enviada: '" + mensagem + "'");
}
}
4. Recebendo Mensagens
4.1 Definir Filas & Vincular Troca
Para consumir mensagens da fila, é necessário primeiro definir uma fila e, em seguida, vincular a fila à troca de destino. Abaixo, definimos duas filas e as vinculamos à mesma troca.
package com.tizi365.rabbitmq.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class QueueConfig {
@Bean
public FanoutExchange fanout() {
// Definir a troca
// O parâmetro é o nome da troca, que deve ser único
return new FanoutExchange("tizi365.fanout");
}
@Bean
public Queue queue1() {
// Definir fila 1
return new Queue("tizi365.fanout.queue1");
}
@Bean
public Queue queue2() {
// Definir fila 2
return new Queue("tizi365.fanout.queue2");
}
@Bean
public Binding binding1(FanoutExchange fanout, Queue queue1) {
// Definir um relacionamento de vinculação, para vincular a fila 1 à troca fanout
return BindingBuilder.bind(queue1).to(fanout);
}
@Bean
public Binding binding2(FanoutExchange fanout, Queue queue2) {
// Definir um relacionamento de vinculação, para vincular a fila 2 à troca fanout
return BindingBuilder.bind(queue2).to(fanout);
}
}
4.2 Definir Ouvintes de Fila
Defina os ouvintes de mensagem através da anotação RabbitListener para consumir mensagens de filas específicas.
package com.tizi365.rabbitmq.listener;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
// Faça com que a classe atual seja gerenciada pelo Spring
@Component
public class DemoListener {
// Defina um ouvinte e especifique a fila a ser ouvida usando o parâmetro queues
@RabbitListener(queues = "tizi365.fanout.queue1")
public void receive1(String msg) {
System.out.println("Mensagem recebida da fila 1 = " + msg);
}
// Defina um ouvinte e especifique a fila a ser ouvida usando o parâmetro queues
@RabbitListener(queues = "tizi365.fanout.queue2")
public void receive2(String msg) {
System.out.println("Mensagem recebida da fila 2 = " + msg);
}
}
Como o exchange anterior foi definido como tipo fanout, cada mensagem será distribuída para todas as filas vinculadas ao exchange atual, e as mensagens serão tratadas separadamente pelos métodos acima.
Observação: A anotação RabbitListener pode ser aplicada a uma classe ou método. Se a anotação RabbitListener for definida em uma classe, ela precisa ser combinada com a anotação RabbitHandler para marcar qual método da classe executará o tratamento da mensagem.
4.3 Definir Ouvintes de Fila com Anotação Completa
Você não precisa de uma classe de configuração spring boot para definir trocas, filas e relacionamentos de ligação. Você pode definir diretamente os relacionamentos de ligação, filas e trocas através do parâmetro bindings da anotação RabbitListener.
@RabbitListener(
bindings = {
@QueueBinding(
value = @Queue(name = "tizi365.fanout.queue3", durable = "true"),
exchange = @Exchange(name = "tizi365.fanout", durable = "true", type = ExchangeTypes.FANOUT)
)
}
)
public void receive3(String msg) {
System.out.println("Mensagem recebida da fila 3 = " + msg);
}
Explicação:
- Anotação QueueBinding: Define o relacionamento de ligação entre a fila e o exchange. O parâmetro value é usado para definir a fila e o exchange é usado para definir o exchange.
- Anotação Queue: Define uma fila. O parâmetro name define o nome da fila (que precisa ser único) e o parâmetro durable indica se ela precisa ser durável.
- Anotação Exchange: Define um exchange. O parâmetro name define o nome do exchange e o parâmetro type indica o tipo de exchange.