Шаблон маршрутизации RabbitMQ (Прямой режим) на Java использует тип обмена DirectExchange. Отличие от шаблона издатель-подписчик заключается в том, что обмен Direct доставляет сообщения в очереди с параметрами маршрутизации, которые полностью соответствуют. Архитектура показана на изображении ниже:
Совет: Независимо от используемого режима работы RabbitMQ, разница заключается в типе используемого обмена и параметрах маршрутизации.
1. Обязательные учебные материалы
Пожалуйста, ознакомьтесь со следующими разделами, чтобы понять связанные сведения:
- Основные концепции RabbitMQ
- Принципы маршрутизации в режиме работы RabbitMQ
- Быстрый старт для Java с RabbitMQ (обязательно к прочтению, так как последующие разделы не будут дублировать код, а только отображать ключевой код)
- Шаблон издатель-подписчик для Java с RabbitMQ (обязательно к прочтению, поскольку структура кода практически идентична, отличаются только тип обмена и параметры маршрутизации)
2. Определение Прямого обмена
В Spring AMQP классом, соответствующим обмену Direct, является DirectExchange. Мы определяем обмен через класс конфигурации Spring Boot.
package com.tizi365.rabbitmq.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class QueueConfig {
@Bean
public DirectExchange direct() {
// Определение обмена
// Параметр - имя обмена, которое должно быть уникальным
return new DirectExchange("tizi365.direct");
}
}
Совет: И поставщику сообщений, и потребителю требуется обмен.
3. Отправка сообщений
Мы отправляем сообщения в обмен, который затем доставляет их в соответствующие очереди на основе правил маршрутизации.
package com.tizi365.rabbitmq.service;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
@Service
public class SendService {
@Autowired
private RabbitTemplate template;
@Autowired
private DirectExchange direct;
// Для демонстрации используем запланированную задачу для отправки сообщения каждую секунду
@Scheduled(fixedDelay = 1000, initialDelay = 1000)
public void send() {
// Содержимое сообщения
String message = "Привет, мир!";
// Отправка сообщения
// Первый параметр - имя обмена
// Второй параметр - ключ маршрутизации. Для прямого обмена сообщение доставляется в очередь, ключ маршрутизации которой совпадает с "tizi365"
// Третий параметр - содержимое сообщения, поддерживается любой тип, пока он может быть сериализован
template.convertAndSend(direct.getName(), "tizi365", message);
System.out.println("Отправлено сообщение '" + message + "'");
}
}
Совет: Обратите внимание на второй параметр в методе convertAndSend, поскольку этот параметр критически важен.
4. Получение сообщений
4.1 Определение очередей и привязка к обменнику
Для потребления сообщений из очереди сначала необходимо определить саму очередь, а затем выполнить привязку очереди к целевому обменнику. В следующем коде определяются две очереди и привязываются к одному и тому же обменнику:
package com.tizi365.rabbitmq.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class QueueConfig {
@Bean
public DirectExchange direct() {
// Определяем обменник
// Параметром является имя обменника, которое должно быть уникальным
return new DirectExchange("tizi365.direct");
}
@Bean
public Queue queue1() {
// Определяем очередь 1
return new Queue("tizi365.direct.queue1");
}
@Bean
public Queue queue2() {
// Определяем очередь 2
return new Queue("tizi365.direct.queue2");
}
@Bean
public Binding binding1(DirectExchange direct, Queue queue1) {
// Определяем привязку для привязки очереди 1 к обменнику direct с ключом маршрутизации "tizi365"
// При совпадении ключа маршрутизации "tizi365" обменник будет доставлять сообщения в очередь 1
return BindingBuilder.bind(queue1).to(direct).with("tizi365");
}
@Bean
public Binding binding2(DirectExchange direct, Queue queue2) {
// Определяем привязку для привязки очереди 2 к обменнику direct с ключом маршрутизации "baidu"
// При совпадении ключа маршрутизации "baidu" обменник будет доставлять сообщения в очередь 2
return BindingBuilder.bind(queue2).to(direct).with("baidu");
}
}
4.2 Определение слушателей очередей
Определите слушателей сообщений с использованием аннотации RabbitListener для потребления сообщений из конкретных очередей:
package com.tizi365.rabbitmq.listener;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;
// Позволяет Spring управлять текущим классом
@Component
public class DemoListener {
// Определяем слушателя для потребления сообщений из очереди 1
@RabbitListener(queues = "tizi365.direct.queue1")
public void receive1(String msg) {
System.out.println("Получено сообщение из очереди 1: " + msg);
}
// Определяем слушателя для потребления сообщений из очереди 2
@RabbitListener(queues = "tizi365.direct.queue2")
public void receive2(String msg) {
System.out.println("Получено сообщение из очереди 2: " + msg);
}
}
Поскольку только у очереди 1 есть ключ маршрутизации "tizi365" при привязке к обменнику, только очередь 1 может получать сообщения. Очередь 2, потому что ключ маршрутизации не совпадает, не будет получать никаких сообщений.