Java RabbitMQ Publish/Subscribe Pattern (Broadcast Mode, Fanout Mode)
Тип обмена FanoutExchange используется для шаблона издатель/подписчик в RabbitMQ, где сообщение, отправленное производителем, будет обработано несколькими очередями потребителей. Архитектура представлена на следующей диаграмме:
Обмен Fanout может пересылать сообщения во все привязанные очереди.
Совет: Независимо от использованного рабочего режима RabbitMQ разница заключается в типе обмена и используемых параметрах маршрутизации.
1. Учебные пособия
Пожалуйста, сначала прочтите следующие разделы, чтобы понять соответствующие знания:
- Основные концепции RabbitMQ
- Шаблон издатель/подписчик RabbitMQ
- Краткое руководство по Java в RabbitMQ (обязательно, поскольку последующие разделы не будут дублировать код, а покажут только основные фрагменты кода)
2. Определение обмена Fanout
В Spring AMQP класс FanoutExchange соответствует обмену Fanout. Мы определяем обмен через класс конфигурации Spring Boot.
package com.tizi365.rabbitmq.config;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class QueueConfig {
// Определение обмена
@Bean
public FanoutExchange fanout() {
// Параметр - имя обмена, которое должно быть уникальным
return new FanoutExchange("tizi365.fanout");
}
}
Совет: Обмен необходим как производителям сообщений, так и потребителям.
3. Отправка сообщений
Мы отправляем сообщения в обмен, который доставит сообщения в соответствующие очереди на основе правил маршрутизации.
package com.tizi365.rabbitmq.service;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
@Service
public class SendService {
@Autowired
private RabbitTemplate template;
@Autowired
private FanoutExchange fanout;
// Для демонстрационных целей используется отложенная задача для отправки сообщения каждую секунду
@Scheduled(fixedDelay = 1000, initialDelay = 1000)
public void send() {
// Содержимое сообщения
String message = "Привет, мир!";
// Отправка сообщения
// Первый параметр - имя обмена
// Второй параметр - ключ маршрутизации; обмен Fanout будет игнорировать ключ маршрутизации, поэтому его не нужно устанавливать
// Третий параметр - содержимое сообщения, поддерживает любой тип при условии, что его можно сериализовать
template.convertAndSend(fanout.getName(), "", message);
System.out.println("Отправлено сообщение: '" + message + "'");
}
}
4. Получение сообщений
4.1 Определение очередей и привязка обмена
Для получения сообщений из очереди необходимо сначала определить очередь, а затем привязать очередь к целевому обмену. Ниже мы определяем две очереди и привязываем их к одному обмену.
package com.tizi365.rabbitmq.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class QueueConfig {
@Bean
public FanoutExchange fanout() {
// Определение обмена
// Параметр - имя обмена, которое должно быть уникальным
return new FanoutExchange("tizi365.fanout");
}
@Bean
public Queue queue1() {
// Определение очереди 1
return new Queue("tizi365.fanout.queue1");
}
@Bean
public Queue queue2() {
// Определение очереди 2
return new Queue("tizi365.fanout.queue2");
}
@Bean
public Binding binding1(FanoutExchange fanout, Queue queue1) {
// Определение отношения привязки для привязки очереди 1 к обмену Fanout
return BindingBuilder.bind(queue1).to(fanout);
}
@Bean
public Binding binding2(FanoutExchange fanout, Queue queue2) {
// Определение отношения привязки для привязки очереди 2 к обмену Fanout
return BindingBuilder.bind(queue2).to(fanout);
}
}
4.2 Определение прослушивателей очереди
Определите прослушиватели сообщений с помощью аннотации RabbitListener для потребления сообщений из конкретных очередей.
package com.tizi365.rabbitmq.listener;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
// Сделать текущий класс управляемым Spring
@Component
public class DemoListener {
// Определение прослушивателя и указание, какую очередь прослушивать с помощью параметра очередей
@RabbitListener(queues = "tizi365.fanout.queue1")
public void receive1(String msg) {
System.out.println("Получено сообщение из очереди 1 = " + msg);
}
// Определение прослушивателя и указание, какую очередь прослушивать с помощью параметра очередей
@RabbitListener(queues = "tizi365.fanout.queue2")
public void receive2(String msg) {
System.out.println("Получено сообщение из очереди 2 = " + msg);
}
}
Поскольку ранее был определен обмен типа fanout, каждое сообщение будет распределено на все очереди, привязанные к текущему обмену, и сообщения будут обрабатываться отдельно с помощью вышеуказанных двух методов.
Примечание: Аннотацию RabbitListener можно применять к классу или методу. Если аннотация RabbitListener определена в классе, она должна быть объединена с аннотацией RabbitHandler, чтобы указать, какой метод класса будет выполнять обработку сообщения.
4.3 Определение прослушивателей очереди с полной аннотацией
Вам не нужно создавать класс конфигурации Spring Boot для определения обменов, очередей и связей. Вы можете непосредственно определить связи, очереди и обмены через параметр bindings аннотации RabbitListener.
@RabbitListener(
bindings = {
@QueueBinding(
value = @Queue(name = "tizi365.fanout.queue3", durable = "true"),
exchange = @Exchange(name = "tizi365.fanout", durable = "true", type = ExchangeTypes.FANOUT)
)
}
)
public void receive3(String msg) {
System.out.println("Получено сообщение из очереди 3 = " + msg);
}
Пояснение:
- Аннотация QueueBinding: Определяет связь между очередью и обменом. Параметр value используется для определения очереди, а exchange используется для определения обмена.
- Аннотация Queue: Определяет очередь. Параметр name определяет имя очереди (которое должно быть уникальным), а параметр durable указывает, нужно ли ему быть долговечным.
- Аннотация Exchange: Определяет обмен. Параметр name определяет имя обмена, а параметр type указывает тип обмена.