Java RabbitMQ Publish/Subscribe Pattern (Broadcast Mode, Fanout Mode)

Тип обмена FanoutExchange используется для шаблона издатель/подписчик в RabbitMQ, где сообщение, отправленное производителем, будет обработано несколькими очередями потребителей. Архитектура представлена на следующей диаграмме:

Рабочий режим RabbitMQ

Обмен Fanout может пересылать сообщения во все привязанные очереди.

Совет: Независимо от использованного рабочего режима RabbitMQ разница заключается в типе обмена и используемых параметрах маршрутизации.

1. Учебные пособия

Пожалуйста, сначала прочтите следующие разделы, чтобы понять соответствующие знания:

2. Определение обмена Fanout

В Spring AMQP класс FanoutExchange соответствует обмену Fanout. Мы определяем обмен через класс конфигурации Spring Boot.

package com.tizi365.rabbitmq.config;

import org.springframework.amqp.core.FanoutExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class QueueConfig {
	// Определение обмена
    @Bean
    public FanoutExchange fanout() {
        // Параметр - имя обмена, которое должно быть уникальным
        return new FanoutExchange("tizi365.fanout");
    }
}

Совет: Обмен необходим как производителям сообщений, так и потребителям.

3. Отправка сообщений

Мы отправляем сообщения в обмен, который доставит сообщения в соответствующие очереди на основе правил маршрутизации.

package com.tizi365.rabbitmq.service;

import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;

@Service
public class SendService {
    @Autowired
    private RabbitTemplate template;
    @Autowired
    private FanoutExchange fanout;

	// Для демонстрационных целей используется отложенная задача для отправки сообщения каждую секунду
    @Scheduled(fixedDelay = 1000, initialDelay = 1000)
    public void send() {
        // Содержимое сообщения
        String message = "Привет, мир!";
        // Отправка сообщения
        // Первый параметр - имя обмена
        // Второй параметр - ключ маршрутизации; обмен Fanout будет игнорировать ключ маршрутизации, поэтому его не нужно устанавливать
        // Третий параметр - содержимое сообщения, поддерживает любой тип при условии, что его можно сериализовать
        template.convertAndSend(fanout.getName(), "", message);
        System.out.println("Отправлено сообщение: '" + message + "'");
    }
}

4. Получение сообщений

4.1 Определение очередей и привязка обмена

Для получения сообщений из очереди необходимо сначала определить очередь, а затем привязать очередь к целевому обмену. Ниже мы определяем две очереди и привязываем их к одному обмену.

package com.tizi365.rabbitmq.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class QueueConfig {
    @Bean
    public FanoutExchange fanout() {
        // Определение обмена
        // Параметр - имя обмена, которое должно быть уникальным
        return new FanoutExchange("tizi365.fanout");
    }

    @Bean
    public Queue queue1() {
        // Определение очереди 1
        return new Queue("tizi365.fanout.queue1");
    }

    @Bean
    public Queue queue2() {
        // Определение очереди 2
        return new Queue("tizi365.fanout.queue2");
    }

    @Bean
    public Binding binding1(FanoutExchange fanout, Queue queue1) {
        // Определение отношения привязки для привязки очереди 1 к обмену Fanout
        return BindingBuilder.bind(queue1).to(fanout);
    }

    @Bean
    public Binding binding2(FanoutExchange fanout, Queue queue2) {
        // Определение отношения привязки для привязки очереди 2 к обмену Fanout
        return BindingBuilder.bind(queue2).to(fanout);
    }
}

4.2 Определение прослушивателей очереди

Определите прослушиватели сообщений с помощью аннотации RabbitListener для потребления сообщений из конкретных очередей.

package com.tizi365.rabbitmq.listener;

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

// Сделать текущий класс управляемым Spring
@Component
public class DemoListener {
    // Определение прослушивателя и указание, какую очередь прослушивать с помощью параметра очередей
    @RabbitListener(queues = "tizi365.fanout.queue1")
    public void receive1(String msg) {
        System.out.println("Получено сообщение из очереди 1 = " + msg);
    }

    // Определение прослушивателя и указание, какую очередь прослушивать с помощью параметра очередей
    @RabbitListener(queues = "tizi365.fanout.queue2")
    public void receive2(String msg) {
        System.out.println("Получено сообщение из очереди 2 = " + msg);
    }
}

Поскольку ранее был определен обмен типа fanout, каждое сообщение будет распределено на все очереди, привязанные к текущему обмену, и сообщения будут обрабатываться отдельно с помощью вышеуказанных двух методов.

Примечание: Аннотацию RabbitListener можно применять к классу или методу. Если аннотация RabbitListener определена в классе, она должна быть объединена с аннотацией RabbitHandler, чтобы указать, какой метод класса будет выполнять обработку сообщения.

4.3 Определение прослушивателей очереди с полной аннотацией

Вам не нужно создавать класс конфигурации Spring Boot для определения обменов, очередей и связей. Вы можете непосредственно определить связи, очереди и обмены через параметр bindings аннотации RabbitListener.

@RabbitListener(
            bindings = {
                    @QueueBinding(
                            value = @Queue(name = "tizi365.fanout.queue3", durable = "true"),
                            exchange = @Exchange(name = "tizi365.fanout", durable = "true", type = ExchangeTypes.FANOUT)
                    )
            }
    )
public void receive3(String msg) {
    System.out.println("Получено сообщение из очереди 3 = " + msg);
}

Пояснение:

  • Аннотация QueueBinding: Определяет связь между очередью и обменом. Параметр value используется для определения очереди, а exchange используется для определения обмена.
  • Аннотация Queue: Определяет очередь. Параметр name определяет имя очереди (которое должно быть уникальным), а параметр durable указывает, нужно ли ему быть долговечным.
  • Аннотация Exchange: Определяет обмен. Параметр name определяет имя обмена, а параметр type указывает тип обмена.