Шаблон маршрутизации RabbitMQ (Прямой режим) на Java использует тип обмена DirectExchange. Отличие от шаблона издатель-подписчик заключается в том, что обмен Direct доставляет сообщения в очереди с параметрами маршрутизации, которые полностью соответствуют. Архитектура показана на изображении ниже:

RabbitMQ Прямой режим

Совет: Независимо от используемого режима работы RabbitMQ, разница заключается в типе используемого обмена и параметрах маршрутизации.

1. Обязательные учебные материалы

Пожалуйста, ознакомьтесь со следующими разделами, чтобы понять связанные сведения:

2. Определение Прямого обмена

В Spring AMQP классом, соответствующим обмену Direct, является DirectExchange. Мы определяем обмен через класс конфигурации Spring Boot.

package com.tizi365.rabbitmq.config;

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class QueueConfig {
    @Bean
    public DirectExchange direct() {
        // Определение обмена
        // Параметр - имя обмена, которое должно быть уникальным
        return new DirectExchange("tizi365.direct");
    }
}

Совет: И поставщику сообщений, и потребителю требуется обмен.

3. Отправка сообщений

Мы отправляем сообщения в обмен, который затем доставляет их в соответствующие очереди на основе правил маршрутизации.

package com.tizi365.rabbitmq.service;

import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;

@Service
public class SendService {
    @Autowired
    private RabbitTemplate template;
    @Autowired
    private DirectExchange direct;

    // Для демонстрации используем запланированную задачу для отправки сообщения каждую секунду
    @Scheduled(fixedDelay = 1000, initialDelay = 1000)
    public void send() {
        // Содержимое сообщения
        String message = "Привет, мир!";
        // Отправка сообщения
        // Первый параметр - имя обмена
        // Второй параметр - ключ маршрутизации. Для прямого обмена сообщение доставляется в очередь, ключ маршрутизации которой совпадает с "tizi365"
        // Третий параметр - содержимое сообщения, поддерживается любой тип, пока он может быть сериализован
        template.convertAndSend(direct.getName(), "tizi365", message);
        System.out.println("Отправлено сообщение '" + message + "'");
    }
}

Совет: Обратите внимание на второй параметр в методе convertAndSend, поскольку этот параметр критически важен.

4. Получение сообщений

4.1 Определение очередей и привязка к обменнику

Для потребления сообщений из очереди сначала необходимо определить саму очередь, а затем выполнить привязку очереди к целевому обменнику. В следующем коде определяются две очереди и привязываются к одному и тому же обменнику:

package com.tizi365.rabbitmq.config;

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class QueueConfig {
    @Bean
    public DirectExchange direct() {
        // Определяем обменник
        // Параметром является имя обменника, которое должно быть уникальным
        return new DirectExchange("tizi365.direct");
    }

    @Bean
    public Queue queue1() {
        // Определяем очередь 1
        return new Queue("tizi365.direct.queue1");
    }

    @Bean
    public Queue queue2() {
        // Определяем очередь 2
        return new Queue("tizi365.direct.queue2");
    }

    @Bean
    public Binding binding1(DirectExchange direct, Queue queue1) {
        // Определяем привязку для привязки очереди 1 к обменнику direct с ключом маршрутизации "tizi365"
		// При совпадении ключа маршрутизации "tizi365" обменник будет доставлять сообщения в очередь 1
        return BindingBuilder.bind(queue1).to(direct).with("tizi365");
    }

    @Bean
    public Binding binding2(DirectExchange direct, Queue queue2) {
        // Определяем привязку для привязки очереди 2 к обменнику direct с ключом маршрутизации "baidu"
		// При совпадении ключа маршрутизации "baidu" обменник будет доставлять сообщения в очередь 2
        return BindingBuilder.bind(queue2).to(direct).with("baidu");
    }
}

4.2 Определение слушателей очередей

Определите слушателей сообщений с использованием аннотации RabbitListener для потребления сообщений из конкретных очередей:

package com.tizi365.rabbitmq.listener;

import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;

// Позволяет Spring управлять текущим классом
@Component
public class DemoListener {
    // Определяем слушателя для потребления сообщений из очереди 1
    @RabbitListener(queues = "tizi365.direct.queue1")
    public void receive1(String msg) {
        System.out.println("Получено сообщение из очереди 1: " + msg);
    }

    // Определяем слушателя для потребления сообщений из очереди 2
    @RabbitListener(queues = "tizi365.direct.queue2")
    public void receive2(String msg) {
        System.out.println("Получено сообщение из очереди 2: " + msg);
    }
}

Поскольку только у очереди 1 есть ключ маршрутизации "tizi365" при привязке к обменнику, только очередь 1 может получать сообщения. Очередь 2, потому что ключ маршрутизации не совпадает, не будет получать никаких сообщений.