Patrón de publicación/suscripción de RabbitMQ en Java (Modo de transmisión, Modo de difusión)
El tipo de intercambio FanoutExchange se utiliza para el patrón de publicación/suscripción en RabbitMQ, donde un mensaje enviado por un productor será procesado por múltiples colas de consumidores. La arquitectura se muestra en el siguiente diagrama:
El intercambio Fanout puede enviar mensajes a todas las colas vinculadas.
Consejo: Independientemente del modo de trabajo de RabbitMQ utilizado, la diferencia radica en el tipo de intercambio y los parámetros de enrutamiento utilizados.
1. Tutoriales previos
Por favor, lea primero las siguientes secciones para comprender el conocimiento relevante:
- Conceptos básicos de RabbitMQ
- Patrón de publicación/suscripción de RabbitMQ
- Guía de inicio rápido de RabbitMQ en Java (obligatorio, ya que las secciones posteriores no duplicarán el código, solo mostrarán fragmentos de código clave)
2. Definición del intercambio Fanout
En Spring AMQP, la clase FanoutExchange corresponde al intercambio Fanout. Definimos el intercambio a través de una clase de configuración de Spring Boot.
package com.tizi365.rabbitmq.config;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class QueueConfig {
// Definir el intercambio
@Bean
public FanoutExchange fanout() {
// El parámetro es el nombre del intercambio, que debe ser único
return new FanoutExchange("tizi365.fanout");
}
}
Consejo: Tanto los productores de mensajes como los consumidores requieren un intercambio.
3. Envío de mensajes
Enviamos mensajes al intercambio, que entregará los mensajes a las colas correspondientes según las reglas de enrutamiento.
package com.tizi365.rabbitmq.service;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
@Service
public class SendService {
@Autowired
private RabbitTemplate template;
@Autowired
private FanoutExchange fanout;
// Con fines de demostración, se utiliza una tarea programada para enviar un mensaje cada segundo
@Scheduled(fixedDelay = 1000, initialDelay = 1000)
public void send() {
// Contenido del mensaje
String message = "¡Hola Mundo!";
// Enviar mensaje
// El primer parámetro es el nombre del intercambio
// El segundo parámetro es la clave de enrutamiento; el intercambio fanout ignorará la clave de enrutamiento, por lo que no es necesario establecerla
// El tercer parámetro es el contenido del mensaje, que admite cualquier tipo siempre que se pueda serializar
template.convertAndSend(fanout.getName(), "", message);
System.out.println("Mensaje enviado: '" + message + "'");
}
}
4. Recepción de mensajes
4.1 Definir colas y vincular intercambio
Para consumir mensajes de la cola, primero debes definir una cola y luego vincular la cola al intercambio de destino. A continuación, definimos dos colas y las vinculamos al mismo intercambio.
package com.tizi365.rabbitmq.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class QueueConfig {
@Bean
public FanoutExchange fanout() {
// Define el intercambio
// El parámetro es el nombre del intercambio, que debe ser único
return new FanoutExchange("tizi365.fanout");
}
@Bean
public Queue queue1() {
// Define la cola 1
return new Queue("tizi365.fanout.queue1");
}
@Bean
public Queue queue2() {
// Define la cola 2
return new Queue("tizi365.fanout.queue2");
}
@Bean
public Binding binding1(FanoutExchange fanout, Queue queue1) {
// Define una relación de vinculación, para vincular la cola 1 al intercambio fanout
return BindingBuilder.bind(queue1).to(fanout);
}
@Bean
public Binding binding2(FanoutExchange fanout, Queue queue2) {
// Define una relación de vinculación, para vincular la cola 2 al intercambio fanout
return BindingBuilder.bind(queue2).to(fanout);
}
}
4.2 Definir Oyentes de Cola
Define los oyentes de mensajes a través de la anotación RabbitListener para consumir mensajes de colas específicas.
package com.tizi365.rabbitmq.listener;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
// Hace que la clase actual sea gestionada por Spring
@Component
public class DemoListener {
// Define un oyente y especifica qué cola escuchar utilizando el parámetro queues
@RabbitListener(queues = "tizi365.fanout.queue1")
public void receive1(String msg) {
System.out.println("Mensaje recibido de la cola 1 = " + msg);
}
// Define un oyente y especifica qué cola escuchar utilizando el parámetro queues
@RabbitListener(queues = "tizi365.fanout.queue2")
public void receive2(String msg) {
System.out.println("Mensaje recibido de la cola 2 = " + msg);
}
}
Debido a que el intercambio anterior se definió como tipo fanout, cada mensaje se distribuirá a todas las colas vinculadas al intercambio actual, y los mensajes serán manejados por separado por los dos métodos anteriores.
Nota: La anotación RabbitListener se puede aplicar a una clase o método. Si la anotación RabbitListener está definida en una clase, debe combinarse con la anotación RabbitHandler para marcar qué método de clase ejecutará el manejo del mensaje.
4.3 Definir Oyentes de Cola con Anotación Completa
No necesitas la clase de configuración de spring boot para definir intercambios, colas y relaciones de enlace. Puedes definir directamente las relaciones de enlace, colas e intercambios a través del parámetro bindings de la anotación RabbitListener.
@RabbitListener(
bindings = {
@QueueBinding(
value = @Queue(name = "tizi365.fanout.queue3", durable = "true"),
exchange = @Exchange(name = "tizi365.fanout", durable = "true", type = ExchangeTypes.FANOUT)
)
}
)
public void receive3(String msg) {
System.out.println("Mensaje recibido de la cola 3 = " + msg);
}
Explicación:
- Anotación QueueBinding: Define la relación de enlace entre la cola y el intercambio. El parámetro value se utiliza para definir la cola y el intercambio se utiliza para definir el intercambio.
- Anotación Queue: Define una cola. El parámetro name define el nombre de la cola (que debe ser único) y el parámetro durable indica si debe ser duradera.
- Anotación Exchange: Define un intercambio. El parámetro name define el nombre del intercambio y el parámetro type indica el tipo de intercambio.