El patrón de tema de RabbitMQ en Java, utilizando el tipo de intercambio TopicExchange, difiere del patrón de enrutamiento (Direct) en que los parámetros de enrutamiento admiten coincidencias aproximadas. Debido a que la coincidencia de enrutamiento es más flexible, es un patrón más comúnmente utilizado. La arquitectura se muestra en el siguiente diagrama:
Nota: Independientemente del patrón de funcionamiento de RabbitMQ que se utilice, la diferencia radica en el tipo de intercambio utilizado y los parámetros de enrutamiento.
1. Tutorial previo
Por favor, lea primero las siguientes secciones para comprender el conocimiento relevante:
- Conceptos básicos de RabbitMQ
- Principios del patrón de tema de RabbitMQ
- Inicio rápido para Java con RabbitMQ (lectura obligatoria, ya que las secciones posteriores no repetirán el código, solo mostrarán el código clave)
- Sección de patrón de publicación-suscripción para Java con RabbitMQ (lectura obligatoria, ya que la sintaxis del código es casi la misma, solo difieren el tipo de intercambio y los parámetros de enrutamiento)
2. Definición de intercambio de temas
En Spring AMQP, la clase correspondiente al intercambio directo es TopicExchange. Definimos el intercambio a través de una clase de configuración de Spring Boot.
package com.tizi365.rabbitmq.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class QueueConfig {
@Bean
public TopicExchange topic() {
// Define the exchange
// El parámetro es el nombre del intercambio, que debe ser único
return new TopicExchange("tizi365.topic");
}
}
Nota: Tanto los productores como los consumidores de mensajes necesitan el intercambio.
3. Envío de mensajes
Enviamos mensajes al intercambio, y el intercambio entrega los mensajes a las colas correspondientes según las reglas de enrutamiento.
package com.tizi365.rabbitmq.service;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
@Service
public class SendService {
@Autowired
private RabbitTemplate template;
@Autowired
private TopicExchange topic;
// Con fines de demostración, se utiliza una tarea programada aquí para enviar un mensaje cada segundo
@Scheduled(fixedDelay = 1000, initialDelay = 1000)
public void send() {
// Contenido del mensaje
String message = "¡Hola mundo!";
// Enviar mensaje
// El primer parámetro es el nombre del intercambio
// El segundo parámetro es la clave de enrutamiento. El intercambio de temas entrega mensajes a las colas que coinciden con la clave de enrutamiento.
// El tercer parámetro es el contenido del mensaje, que admite cualquier tipo siempre que pueda serializarse
template.convertAndSend(topic.getName(), "www.tizi365.com", message);
System.out.println("Mensaje enviado: '" + mensaje + "'");
}
}
Nota: Presta atención al segundo parámetro "www.tizi365.com" en el método convertAndSend, ya que es un parámetro crucial.
4. Recepción de mensajes
4.1 Definir Colas y Vincular Intercambios
Para consumir mensajes de las colas, primero necesitas definir una cola y luego vincular la cola al intercambio de destino. A continuación, se definen dos colas que se vinculan al mismo intercambio.
package com.tizi365.rabbitmq.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class QueueConfig {
@Bean
public TopicExchange topic() {
// Define el intercambio
// El parámetro es el nombre del intercambio, que debe ser único
return new TopicExchange("tizi365.topic");
}
@Bean
public Queue queue1() {
// Define la cola 1
return new Queue("tizi365.topic.queue1");
}
@Bean
public Queue queue2() {
// Define la cola 2
return new Queue("tizi365.topic.queue2");
}
@Bean
public Binding binding1(TopicExchange topic, Queue queue1) {
// Define una relación de vinculación, vinculando la cola 1 al intercambio de temas con una clave de enrutamiento de: *.tizi365.com
return BindingBuilder.bind(queue1).to(topic).with("*.tizi365.com");
}
@Bean
public Binding binding2(TopicExchange topic, Queue queue2) {
// Define una relación de vinculación, vinculando la cola 2 al intercambio de temas con una clave de enrutamiento de: *.baidu.com
return BindingBuilder.bind(queue2).to(topic).with("*.baidu.com");
}
}
Consejo: Al vincular la cola 1 y la cola 2 al intercambio, las claves de enrutamiento establecidas utilizan el comodín * (estrella), que coincide con una sola palabra. Si se cambia a # (almohadilla), coincidiría con varias palabras.
4.2 Definir Escuchas de Cola
Define los escuchas de mensajes utilizando la anotación RabbitListener para consumir mensajes de colas específicas.
package com.tizi365.rabbitmq.listener;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;
// Permite a Spring gestionar la clase actual
@Component
public class DemoListener {
// Define un escucha, especificando a qué cola escuchar a través del parámetro queues
@RabbitListener(queues = "tizi365.topic.queue1")
public void receive1(String msg) {
System.out.println("Mensaje recibido de la cola 1: " + msg);
}
// Define un escucha, especificando a qué cola escuchar a través del parámetro queues
@RabbitListener(queues = "tizi365.topic.queue2")
public void receive2(String msg) {
System.out.println("Mensaje recibido de la cola 2: " + msg);
}
}
Dado que solo la cola 1 tiene una clave de enrutamiento de *.tizi365.com al vincularse al intercambio, coincidirá con los mensajes con la clave de enrutamiento (www.tizi365.com). Por lo tanto, solo la cola 1 podrá recibir mensajes, mientras que la cola 2 no recibirá ninguno debido a la clave de enrutamiento no coincidente.