The Java RabbitMQ topic pattern, using the TopicExchange type, differs from the routing pattern (Direct) in that the routing parameters support fuzzy matching. Because routing matching is more flexible, it is a more commonly used pattern. The architecture is as shown in the following diagram:
Hint: Regardless of which RabbitMQ working pattern is used, the difference lies in the type of exchange used and the routing parameters.
1. Prerequisite Tutorial
Please read the following sections first to understand the relevant knowledge:
- RabbitMQ Basics Concepts
- RabbitMQ Topic Pattern Principles
- Quick Start for Java with RabbitMQ (A must-read, as subsequent sections will not repeat the code, only show the key code)
- Java RabbitMQ Publish-Subscribe Pattern Section (A must-read, as the code syntax is almost the same, only the exchange type and routing parameters differ)
2. Defining Topic Exchange
In Spring AMQP, the class corresponding to the Direct exchange is the TopicExchange. We define the exchange through a Spring Boot configuration class.
package com.tizi365.rabbitmq.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class QueueConfig {
@Bean
public TopicExchange topic() {
// Define the exchange
// The parameter is the exchange name, which must be unique
return new TopicExchange("tizi365.topic");
}
}
Hint: Both message producers and consumers need the exchange.
3. Sending Messages
We send messages to the exchange, and the exchange delivers the messages to the corresponding queues based on the routing rules.
package com.tizi365.rabbitmq.service;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
@Service
public class SendService {
@Autowired
private RabbitTemplate template;
@Autowired
private TopicExchange topic;
// For demonstration purposes, a scheduled task is used here to send a message every second
@Scheduled(fixedDelay = 1000, initialDelay = 1000)
public void send() {
// Message content
String message = "Hello World!";
// Send message
// The first parameter is the exchange name
// The second parameter is the routing key. The topic exchange delivers messages to queues that match the routing key
// The third parameter is the message content, supporting any type as long as it can be serialized
template.convertAndSend(topic.getName(), "www.tizi365.com", message);
System.out.println("Message sent: '" + message + "'");
}
}
Hint: Pay attention to the second parameter "www.tizi365.com" in the convertAndSend method, as it is a crucial parameter.
4. Receiving Messages
4.1 Define Queues & Bind Exchanges
To consume messages from queues, you need to first define a queue and then bind the queue to the target exchange. Below, two queues are defined and bound to the same exchange.
package com.tizi365.rabbitmq.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class QueueConfig {
@Bean
public TopicExchange topic() {
// Define the exchange
// The parameter is the exchange name, which must be unique
return new TopicExchange("tizi365.topic");
}
@Bean
public Queue queue1() {
// Define queue 1
return new Queue("tizi365.topic.queue1");
}
@Bean
public Queue queue2() {
// Define queue 2
return new Queue("tizi365.topic.queue2");
}
@Bean
public Binding binding1(TopicExchange topic, Queue queue1) {
// Define a binding relationship, binding queue 1 to the topic exchange with a routing key of: *.tizi365.com
return BindingBuilder.bind(queue1).to(topic).with("*.tizi365.com");
}
@Bean
public Binding binding2(TopicExchange topic, Queue queue2) {
// Define a binding relationship, binding queue 2 to the direct exchange with a routing key of: *.baidu.com
return BindingBuilder.bind(queue2).to(topic).with("*.baidu.com");
}
}
Tip: When binding queue 1 and queue 2 to the exchange, the routing keys set are both using the * (star) wildcard, which matches a single word. If changed to # (hash), it would match multiple words.
4.2 Define Queue Listeners
Define message listeners using the RabbitListener annotation to consume messages from specific queues.
package com.tizi365.rabbitmq.listener;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;
// Let Spring manage the current class
@Component
public class DemoListener {
// Define a listener, specifying which queue to listen to through the queues parameter
@RabbitListener(queues = "tizi365.topic.queue1")
public void receive1(String msg) {
System.out.println("Received message from queue 1: " + msg);
}
// Define a listener, specifying which queue to listen to through the queues parameter
@RabbitListener(queues = "tizi365.topic.queue2")
public void receive2(String msg) {
System.out.println("Received message from queue 2: " + msg);
}
}
Since only queue 1 has a routing key of *.tizi365.com when binding to the exchange, it will match messages with the routing key (www.tizi365.com). Therefore, only queue 1 will be able to receive messages, while queue 2 won't receive any due to mismatched routing key.