El patrón de enrutamiento de RabbitMQ (modo Direct) en Java utiliza el tipo de intercambio DirectExchange. La diferencia con el patrón de publicación-suscripción es que el intercambio directo entrega mensajes a colas con parámetros de enrutamiento que coinciden completamente. La arquitectura se muestra en la imagen siguiente:
Consejo: Independientemente del modo de funcionamiento de RabbitMQ utilizado, la diferencia radica en el tipo de intercambio utilizado y los parámetros de enrutamiento.
1. Tutoriales previos
Por favor, lee las siguientes secciones para comprender el conocimiento relacionado:
- Conceptos básicos de RabbitMQ
- Principio del modo de enrutamiento de RabbitMQ
- Inicio rápido para Java con RabbitMQ (Lectura obligatoria, ya que las secciones posteriores no duplicarán el código, solo mostrarán el código clave)
- Patrón de publicación-suscripción para Java con RabbitMQ (Lectura obligatoria, ya que la estructura del código es casi la misma, solo difieren el tipo de intercambio y los parámetros de enrutamiento)
2. Definir el intercambio directo
En Spring AMQP, la clase correspondiente al intercambio directo es DirectExchange. Definimos el intercambio a través de una clase de configuración de Spring Boot.
package com.tizi365.rabbitmq.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class QueueConfig {
@Bean
public DirectExchange direct() {
// Definir el intercambio
// El parámetro es el nombre del intercambio, que debe ser único
return new DirectExchange("tizi365.direct");
}
}
Consejo: Tanto el productor como el consumidor de mensajes requieren un intercambio.
3. Enviar mensajes
Enviamos mensajes al intercambio, que luego entrega los mensajes a las colas correspondientes según las reglas de enrutamiento.
package com.tizi365.rabbitmq.service;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
@Service
public class SendService {
@Autowired
private RabbitTemplate template;
@Autowired
private DirectExchange direct;
// Para la demostración, utilizamos una tarea programada para enviar un mensaje cada segundo
@Scheduled(fixedDelay = 1000, initialDelay = 1000)
public void send() {
// Contenido del mensaje
String message = "¡Hola mundo!";
// Enviar el mensaje
// El primer parámetro es el nombre del intercambio
// El segundo parámetro es la clave de enrutamiento. Con un intercambio directo, el mensaje se entrega a la cola cuya clave de enrutamiento coincida con "tizi365"
// El tercer parámetro es el contenido del mensaje, que admite cualquier tipo siempre que pueda ser serializado
template.convertAndSend(direct.getName(), "tizi365", message);
System.out.println("Mensaje enviado '" + message + "'");
}
}
Consejo: Presta atención al segundo parámetro en el método convertAndSend, ya que este es un parámetro crítico.
4. Recibir mensajes
4.1 Definir Colas y Vincular Intercambio
Para consumir mensajes de una cola, primero necesitas definir una cola y luego vincularla al intercambio de destino. El siguiente código define dos colas y las vincula al mismo intercambio:
package com.tizi365.rabbitmq.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class QueueConfig {
@Bean
public DirectExchange direct() {
// Definir el intercambio
// El parámetro es el nombre del intercambio, que debe ser único
return new DirectExchange("tizi365.direct");
}
@Bean
public Queue queue1() {
// Definir cola 1
return new Queue("tizi365.direct.queue1");
}
@Bean
public Queue queue2() {
// Definir cola 2
return new Queue("tizi365.direct.queue2");
}
@Bean
public Binding binding1(DirectExchange direct, Queue queue1) {
// Definir un enlace para vincular la cola 1 al intercambio direct con una clave de enrutamiento "tizi365"
// Cuando la clave de enrutamiento coincida con "tizi365", el intercambio enviará mensajes a la cola 1
return BindingBuilder.bind(queue1).to(direct).with("tizi365");
}
@Bean
public Binding binding2(DirectExchange direct, Queue queue2) {
// Definir un enlace para vincular la cola 2 al intercambio direct con una clave de enrutamiento "baidu"
// Cuando la clave de enrutamiento coincida con "baidu", el intercambio enviará mensajes a la cola 2
return BindingBuilder.bind(queue2).to(direct).with("baidu");
}
}
4.2 Definir Escuchas de Colas
Define escuchas de mensajes utilizando la anotación RabbitListener para consumir mensajes de colas específicas:
package com.tizi365.rabbitmq.listener;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;
// Permitir que Spring gestione la clase actual
@Component
public class DemoListener {
// Definir un escucha para consumir mensajes de la cola1
@RabbitListener(queues = "tizi365.direct.queue1")
public void recibir1(String msg) {
System.out.println("Recibido mensaje de la cola1: " + msg);
}
// Definir un escucha para consumir mensajes de la cola2
@RabbitListener(queues = "tizi365.direct.queue2")
public void recibir2(String msg) {
System.out.println("Recibido mensaje de la cola2: " + msg);
}
}
Dado que solo la cola 1 tiene una clave de enrutamiento de "tizi365" al vincularse al intercambio, solo la cola 1 puede recibir mensajes. La cola 2, al no coincidir con la clave de enrutamiento, no recibirá ningún mensaje.