Шаблон темы RabbitMQ для Java с использованием типа TopicExchange отличается от шаблона маршрутизации (Direct) тем, что параметры маршрутизации поддерживают нечеткое сопоставление. Благодаря более гибкому сопоставлению маршрутизации этот шаблон является более распространенным. Архитектура представлена на следующей диаграмме:

RabbitMQ Topic Pattern

Подсказка: Независимо от используемого рабочего шаблона RabbitMQ различие заключается в типе используемой биржи и параметрах маршрутизации.

1. Обучающий учебник

Пожалуйста, сначала прочтите следующие разделы, чтобы понять соответствующие знания:

2. Определение тематической биржи

В Spring AMQP класс, соответствующий прямой бирже, - это TopicExchange. Мы определяем биржу через класс конфигурации Spring Boot.

package com.tizi365.rabbitmq.config;

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class QueueConfig {
    @Bean
    public TopicExchange topic() {
        // Определение биржи
        // Параметр - имя биржи, которое должно быть уникальным
        return new TopicExchange("tizi365.topic");
    }
}

Подсказка: И поставщики сообщений, и потребители сообщений нуждаются в бирже.

3. Отправка сообщений

Мы отправляем сообщения на биржу, и биржа доставляет сообщения в соответствующие очереди на основе правил маршрутизации.

package com.tizi365.rabbitmq.service;

import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;

@Service
public class SendService {
    @Autowired
    private RabbitTemplate template;
    @Autowired
    private TopicExchange topic;

    // Для демонстрационных целей здесь используется запланированная задача для отправки сообщения каждую секунду
    @Scheduled(fixedDelay = 1000, initialDelay = 1000)
    public void send() {
        // Содержимое сообщения
        String message = "Привет, мир!";
        // Отправить сообщение
        // Первый параметр - имя биржи
        // Второй параметр - ключ маршрутизации. Шаблонная биржа доставляет сообщения в очереди, которые соответствуют ключу маршрутизации
        // Третий параметр - содержимое сообщения, поддерживается любой тип, пока он может быть сериализован
        template.convertAndSend(topic.getName(), "www.tizi365.com", message);
        System.out.println("Отправлено сообщение: '" + message + "'");
    }
}

Подсказка: Обратите внимание на второй параметр "www.tizi365.com" в методе convertAndSend, так как это ключевой параметр.

4. Получение сообщений

4.1 Определение очередей и привязка обменов

Для получения сообщений из очередей необходимо сначала определить очередь, а затем привязать очередь к целевому обмену. Ниже определены две очереди и привязаны к одному обмену.

package com.tizi365.rabbitmq.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class QueueConfig {
    @Bean
    public TopicExchange topic() {
        // Определяем обмен
        // Параметром является имя обмена, которое должно быть уникальным
        return new TopicExchange("tizi365.topic");
    }

    @Bean
    public Queue queue1() {
        // Определяем очередь 1
        return new Queue("tizi365.topic.queue1");
    }

    @Bean
    public Queue queue2() {
        // Определяем очередь 2
        return new Queue("tizi365.topic.queue2");
    }

    @Bean
    public Binding binding1(TopicExchange topic, Queue queue1) {
        // Определяем отношение привязки, привязывая очередь 1 к обмену по ключу маршрутизации: *.tizi365.com
        return BindingBuilder.bind(queue1).to(topic).with("*.tizi365.com");
    }

    @Bean
    public Binding binding2(TopicExchange topic, Queue queue2) {
        // Определяем отношение привязки, привязывая очередь 2 к обмену по ключу маршрутизации: *.baidu.com
        return BindingBuilder.bind(queue2).to(topic).with("*.baidu.com");
    }
}

Совет: При привязке очередей 1 и 2 к обмену используются маршрутизирующие ключи, оба использующие * (звездочку) в качестве шаблона, который сопоставляется с одним словом. Если изменить на # (решетку), это будет соответствовать нескольким словам.

4.2 Определение слушателей очередей

Определите слушателей сообщений с использованием аннотации RabbitListener для приема сообщений из определенных очередей.

package com.tizi365.rabbitmq.listener;

import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;

// Позволяет Spring управлять текущим классом
@Component
public class DemoListener {
    // Определяем слушателя, указывая очередь для прослушивания через параметр queues
    @RabbitListener(queues = "tizi365.topic.queue1")
    public void receive1(String msg) {
        System.out.println("Получено сообщение из очереди 1: " + msg);
    }

    // Определяем слушателя, указывая очередь для прослушивания через параметр queues
    @RabbitListener(queues = "tizi365.topic.queue2")
    public void receive2(String msg) {
        System.out.println("Получено сообщение из очереди 2: " + msg);
    }
}

Поскольку только у очереди 1 есть ключ маршрутизации *.tizi365.com при привязке к обмену, она будет сопоставляться с сообщениями, у которых ключ маршрутизации (www.tizi365.com). Поэтому только очередь 1 сможет получать сообщения, в то время как очередь 2 не получит ни одного сообщения из-за несовпадения ключа маршрутизации.