Шаблон темы RabbitMQ для Java с использованием типа TopicExchange отличается от шаблона маршрутизации (Direct) тем, что параметры маршрутизации поддерживают нечеткое сопоставление. Благодаря более гибкому сопоставлению маршрутизации этот шаблон является более распространенным. Архитектура представлена на следующей диаграмме:
Подсказка: Независимо от используемого рабочего шаблона RabbitMQ различие заключается в типе используемой биржи и параметрах маршрутизации.
1. Обучающий учебник
Пожалуйста, сначала прочтите следующие разделы, чтобы понять соответствующие знания:
- Основные концепции RabbitMQ
- Принципы шаблона темы RabbitMQ
- Быстрый старт для Java с RabbitMQ (обязательно к прочтению, так как последующие разделы не будут повторять код, а покажут только ключевой код)
- Раздел о шаблоне издатель-подписчик для Java в RabbitMQ (обязательно к прочтению, так как синтаксис кода практически тот же, отличаются только тип биржи и параметры маршрутизации)
2. Определение тематической биржи
В Spring AMQP класс, соответствующий прямой бирже, - это TopicExchange. Мы определяем биржу через класс конфигурации Spring Boot.
package com.tizi365.rabbitmq.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class QueueConfig {
@Bean
public TopicExchange topic() {
// Определение биржи
// Параметр - имя биржи, которое должно быть уникальным
return new TopicExchange("tizi365.topic");
}
}
Подсказка: И поставщики сообщений, и потребители сообщений нуждаются в бирже.
3. Отправка сообщений
Мы отправляем сообщения на биржу, и биржа доставляет сообщения в соответствующие очереди на основе правил маршрутизации.
package com.tizi365.rabbitmq.service;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
@Service
public class SendService {
@Autowired
private RabbitTemplate template;
@Autowired
private TopicExchange topic;
// Для демонстрационных целей здесь используется запланированная задача для отправки сообщения каждую секунду
@Scheduled(fixedDelay = 1000, initialDelay = 1000)
public void send() {
// Содержимое сообщения
String message = "Привет, мир!";
// Отправить сообщение
// Первый параметр - имя биржи
// Второй параметр - ключ маршрутизации. Шаблонная биржа доставляет сообщения в очереди, которые соответствуют ключу маршрутизации
// Третий параметр - содержимое сообщения, поддерживается любой тип, пока он может быть сериализован
template.convertAndSend(topic.getName(), "www.tizi365.com", message);
System.out.println("Отправлено сообщение: '" + message + "'");
}
}
Подсказка: Обратите внимание на второй параметр "www.tizi365.com" в методе convertAndSend, так как это ключевой параметр.
4. Получение сообщений
4.1 Определение очередей и привязка обменов
Для получения сообщений из очередей необходимо сначала определить очередь, а затем привязать очередь к целевому обмену. Ниже определены две очереди и привязаны к одному обмену.
package com.tizi365.rabbitmq.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class QueueConfig {
@Bean
public TopicExchange topic() {
// Определяем обмен
// Параметром является имя обмена, которое должно быть уникальным
return new TopicExchange("tizi365.topic");
}
@Bean
public Queue queue1() {
// Определяем очередь 1
return new Queue("tizi365.topic.queue1");
}
@Bean
public Queue queue2() {
// Определяем очередь 2
return new Queue("tizi365.topic.queue2");
}
@Bean
public Binding binding1(TopicExchange topic, Queue queue1) {
// Определяем отношение привязки, привязывая очередь 1 к обмену по ключу маршрутизации: *.tizi365.com
return BindingBuilder.bind(queue1).to(topic).with("*.tizi365.com");
}
@Bean
public Binding binding2(TopicExchange topic, Queue queue2) {
// Определяем отношение привязки, привязывая очередь 2 к обмену по ключу маршрутизации: *.baidu.com
return BindingBuilder.bind(queue2).to(topic).with("*.baidu.com");
}
}
Совет: При привязке очередей 1 и 2 к обмену используются маршрутизирующие ключи, оба использующие * (звездочку) в качестве шаблона, который сопоставляется с одним словом. Если изменить на # (решетку), это будет соответствовать нескольким словам.
4.2 Определение слушателей очередей
Определите слушателей сообщений с использованием аннотации RabbitListener для приема сообщений из определенных очередей.
package com.tizi365.rabbitmq.listener;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;
// Позволяет Spring управлять текущим классом
@Component
public class DemoListener {
// Определяем слушателя, указывая очередь для прослушивания через параметр queues
@RabbitListener(queues = "tizi365.topic.queue1")
public void receive1(String msg) {
System.out.println("Получено сообщение из очереди 1: " + msg);
}
// Определяем слушателя, указывая очередь для прослушивания через параметр queues
@RabbitListener(queues = "tizi365.topic.queue2")
public void receive2(String msg) {
System.out.println("Получено сообщение из очереди 2: " + msg);
}
}
Поскольку только у очереди 1 есть ключ маршрутизации *.tizi365.com при привязке к обмену, она будет сопоставляться с сообщениями, у которых ключ маршрутизации (www.tizi365.com). Поэтому только очередь 1 сможет получать сообщения, в то время как очередь 2 не получит ни одного сообщения из-за несовпадения ключа маршрутизации.