The RabbitMQ routing pattern (Direct mode) in Java uses the DirectExchange type of exchange. The difference from the publish-subscribe pattern is that the Direct exchange delivers messages to queues with routing parameters that fully match. The architecture is as shown in the image below:
Tip: Regardless of the RabbitMQ working mode used, the difference lies in the type of exchange used and the routing parameters.
1. Prerequisite Tutorials
Please read the following sections to understand the related knowledge:
- RabbitMQ Basic Concepts
- RabbitMQ Routing Mode Principle
- Quick Start for Java with RabbitMQ (A must-read, as subsequent sections will not duplicate code, only display key code)
- Publish-Subscribe Pattern for Java with RabbitMQ (A must-read, as the code structure is almost the same, only the exchange type and routing parameters differ)
2. Define the Direct Exchange
In Spring AMQP, the class corresponding to the Direct exchange is DirectExchange. We define the exchange through a Spring Boot configuration class.
package com.tizi365.rabbitmq.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class QueueConfig {
@Bean
public DirectExchange direct() {
// Define the exchange
// The parameter is the exchange name, which must be unique
return new DirectExchange("tizi365.direct");
}
}
Tip: Both the message producer and consumer require an exchange.
3. Send Messages
We send messages to the exchange, which then delivers the messages to the corresponding queues based on the routing rules.
package com.tizi365.rabbitmq.service;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
@Service
public class SendService {
@Autowired
private RabbitTemplate template;
@Autowired
private DirectExchange direct;
// For demonstration, we use a scheduled task to send a message every second
@Scheduled(fixedDelay = 1000, initialDelay = 1000)
public void send() {
// Message content
String message = "Hello World!";
// Send the message
// The first parameter is the exchange name
// The second parameter is the routing key. With a direct exchange, the message is delivered to the queue whose routing key matches "tizi365"
// The third parameter is the message content, which supports any type as long as it can be serialized
template.convertAndSend(direct.getName(), "tizi365", message);
System.out.println("Sent message '" + message + "'");
}
}
Tip: Pay attention to the second parameter in the convertAndSend method, as this is a critical parameter.
4. Receive Messages
4.1 Define Queues and Bind Exchange
To consume messages from a queue, you need to first define a queue, and then bind the queue to the target exchange.
The following code defines two queues and binds them to the same exchange:
package com.tizi365.rabbitmq.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class QueueConfig {
@Bean
public DirectExchange direct() {
// Define the exchange
// The parameter is the exchange name, which must be unique
return new DirectExchange("tizi365.direct");
}
@Bean
public Queue queue1() {
// Define queue 1
return new Queue("tizi365.direct.queue1");
}
@Bean
public Queue queue2() {
// Define queue 2
return new Queue("tizi365.direct.queue2");
}
@Bean
public Binding binding1(DirectExchange direct, Queue queue1) {
// Define a binding to bind queue 1 to the direct exchange with a routing key of "tizi365"
// When the routing key matches "tizi365", the exchange will deliver messages to queue 1
return BindingBuilder.bind(queue1).to(direct).with("tizi365");
}
@Bean
public Binding binding2(DirectExchange direct, Queue queue2) {
// Define a binding to bind queue 2 to the direct exchange with a routing key of "baidu"
// When the routing key matches "baidu", the exchange will deliver messages to queue 2
return BindingBuilder.bind(queue2).to(direct).with("baidu");
}
}
4.2 Define Queue Listeners
Define message listeners using the RabbitListener annotation to consume messages from specific queues:
package com.tizi365.rabbitmq.listener;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;
// Let Spring manage the current class
@Component
public class DemoListener {
// Define a listener to consume messages from queue1
@RabbitListener(queues = "tizi365.direct.queue1")
public void receive1(String msg) {
System.out.println("Received message from queue1: " + msg);
}
// Define a listener to consume messages from queue2
@RabbitListener(queues = "tizi365.direct.queue2")
public void receive2(String msg) {
System.out.println("Received message from queue2: " + msg);
}
}
Since only queue 1 has a routing key of “tizi365” when binding to the exchange, only queue 1 can receive messages. Queue 2, because the routing key does not match, will not receive any messages.