Простейший режим очереди в Java RabbitMQ состоит из производителя и потребителя.

RabbitMQ простая очередь

В настоящее время операции Java с RabbitMQ в основном используют пакет spring-boot-starter-amqp в Spring Boot, по сути, используя Spring AMQP для работы с очередью.

1. Руководство по предварительным условиям

Пожалуйста, ознакомьтесь с следующими главами, чтобы понять соответствующие сведения:

2. Зависимость от пакета

<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

3. Настройка RabbitMQ

Измените конфигурацию application.yml:

spring:
  rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest

4. Объявление очереди

Настройте очередь через класс конфигурации Spring Boot:

package com.tizi365.rabbitmq.config;

import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class QueueConfig {
    @Bean
    public Queue helloQueue() {
        // Объявите очередь, имя очереди должно быть уникальным
        return new Queue("hello");
    }
}

Совет: Вы можете определить несколько очередей в соответствии с бизнес-требованиями. Имя очереди и идентификатор бина Queue должны быть разными. Здесь имя метода является идентификатором бина.

5. Отправка сообщения

Отправка сообщений требует класса RabbitTemplate, который Spring Boot уже инициализировал для нас. Просто внедрите экземпляр:

package com.tizi365.rabbitmq.service;

import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;

@Service
public class SendService {
    // Внедрите экземпляр RabbitTemplate
    @Autowired
    private RabbitTemplate template;
    
    // Внедрите ранее определенную очередь
    @Autowired
    @Qualifier("helloQueue")
    private Queue helloQueue;
    
    // Для демонстрации мы используем встроенную запланированную задачу Spring для регулярной отправки сообщений (одно сообщение в секунду)
    @Scheduled(fixedDelay = 1000, initialDelay = 1000)
    public void send() {
        // Содержание сообщения
        String message = "Привет, мир!";
        // Отправить сообщение
        // Первый параметр - ключ маршрутизации, здесь мы используем имя очереди в качестве ключа маршрутизации
        // Второй параметр - содержание сообщения, поддерживает любой тип, пока он поддерживает сериализацию
        template.convertAndSend(helloQueue.getName(), message);
        System.out.println("Отправка сообщения '" + message + "'");
    }
}

Совет: Здесь мы не используем обмен напрямую. Будет использоваться базовый (прямой) обмен по умолчанию.

6. Получение сообщения

Для потребителя также просто получать сообщения:

package com.tizi365.rabbitmq.listener;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
// Объявите слушателя сообщений, укажите очередь для прослушивания через параметр `queues`, который должен соответствовать предыдущему имени очереди
@RabbitListener(queues = "hello")
public class HelloListener {
	// Используйте RabbitHandler для маркировки обработчика сообщений, используется для выполнения логики обработки сообщений
    @RabbitHandler
    public void receive(String msg) {
        System.out.println("Получено сообщение потребителем: '" + msg + "'");
    }
}

7. Пользовательский тип сообщения

Ранее мы отправили сообщение строкового типа. В реальных бизнес-сценариях мы предпочли бы прямо отправлять различные пользовательские типы Java-объектов.

Определение объекта сущности

package com.tizi365.rabbitmq.domain;

import java.io.Serializable;
import lombok.Data;

// Содержание блога
@Data
public class Blog implements Serializable {
    // id
    private Integer id;
    // title
    private String title;
}

Отправка сообщений пользовательского типа

Blog blog = new Blog();
blog.setId(100);
blog.setTitle("Tizi365 Руководство по RabbitMQ");

// Отправить сообщение
template.convertAndSend(helloQueue.getName(), blog);

Получение сообщений пользовательского типа

@RabbitHandler
// Просто измените тип параметра метода на пользовательский тип сообщения
public void receive(Blog msg) {
    System.out.println("Потребитель - Получено сообщение '" + msg.getTitle() + "'");
}

Использование JSON-сериализации для содержимого сообщения

Когда RabbitMQ отправляет данные объекта типа Java, по умолчанию используется инструмент сериализации объектов JDK. Мы можем изменить это, чтобы использовать JSON-формат для сериализации данных, что позволит другим языкам потреблять сообщения из Java и сделает формат сообщения более читаемым.

Измените ранее созданный класс конфигурации и добавьте следующую конфигурацию для использования парсера Jackson JSON для сериализации и десериализации данных сообщения.

@Bean
public Jackson2JsonMessageConverter messageConverter() {
    // Установите конвертер сообщений по умолчанию
    return new Jackson2JsonMessageConverter();
}