Простейший режим очереди в Java RabbitMQ состоит из производителя и потребителя.
В настоящее время операции Java с RabbitMQ в основном используют пакет spring-boot-starter-amqp
в Spring Boot, по сути, используя Spring AMQP для работы с очередью.
1. Руководство по предварительным условиям
Пожалуйста, ознакомьтесь с следующими главами, чтобы понять соответствующие сведения:
2. Зависимость от пакета
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
3. Настройка RabbitMQ
Измените конфигурацию application.yml
:
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
4. Объявление очереди
Настройте очередь через класс конфигурации Spring Boot:
package com.tizi365.rabbitmq.config;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class QueueConfig {
@Bean
public Queue helloQueue() {
// Объявите очередь, имя очереди должно быть уникальным
return new Queue("hello");
}
}
Совет: Вы можете определить несколько очередей в соответствии с бизнес-требованиями. Имя очереди и идентификатор бина Queue должны быть разными. Здесь имя метода является идентификатором бина.
5. Отправка сообщения
Отправка сообщений требует класса RabbitTemplate, который Spring Boot уже инициализировал для нас. Просто внедрите экземпляр:
package com.tizi365.rabbitmq.service;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
@Service
public class SendService {
// Внедрите экземпляр RabbitTemplate
@Autowired
private RabbitTemplate template;
// Внедрите ранее определенную очередь
@Autowired
@Qualifier("helloQueue")
private Queue helloQueue;
// Для демонстрации мы используем встроенную запланированную задачу Spring для регулярной отправки сообщений (одно сообщение в секунду)
@Scheduled(fixedDelay = 1000, initialDelay = 1000)
public void send() {
// Содержание сообщения
String message = "Привет, мир!";
// Отправить сообщение
// Первый параметр - ключ маршрутизации, здесь мы используем имя очереди в качестве ключа маршрутизации
// Второй параметр - содержание сообщения, поддерживает любой тип, пока он поддерживает сериализацию
template.convertAndSend(helloQueue.getName(), message);
System.out.println("Отправка сообщения '" + message + "'");
}
}
Совет: Здесь мы не используем обмен напрямую. Будет использоваться базовый (прямой) обмен по умолчанию.
6. Получение сообщения
Для потребителя также просто получать сообщения:
package com.tizi365.rabbitmq.listener;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
// Объявите слушателя сообщений, укажите очередь для прослушивания через параметр `queues`, который должен соответствовать предыдущему имени очереди
@RabbitListener(queues = "hello")
public class HelloListener {
// Используйте RabbitHandler для маркировки обработчика сообщений, используется для выполнения логики обработки сообщений
@RabbitHandler
public void receive(String msg) {
System.out.println("Получено сообщение потребителем: '" + msg + "'");
}
}
7. Пользовательский тип сообщения
Ранее мы отправили сообщение строкового типа. В реальных бизнес-сценариях мы предпочли бы прямо отправлять различные пользовательские типы Java-объектов.
Определение объекта сущности
package com.tizi365.rabbitmq.domain;
import java.io.Serializable;
import lombok.Data;
// Содержание блога
@Data
public class Blog implements Serializable {
// id
private Integer id;
// title
private String title;
}
Отправка сообщений пользовательского типа
Blog blog = new Blog();
blog.setId(100);
blog.setTitle("Tizi365 Руководство по RabbitMQ");
// Отправить сообщение
template.convertAndSend(helloQueue.getName(), blog);
Получение сообщений пользовательского типа
@RabbitHandler
// Просто измените тип параметра метода на пользовательский тип сообщения
public void receive(Blog msg) {
System.out.println("Потребитель - Получено сообщение '" + msg.getTitle() + "'");
}
Использование JSON-сериализации для содержимого сообщения
Когда RabbitMQ отправляет данные объекта типа Java, по умолчанию используется инструмент сериализации объектов JDK. Мы можем изменить это, чтобы использовать JSON-формат для сериализации данных, что позволит другим языкам потреблять сообщения из Java и сделает формат сообщения более читаемым.
Измените ранее созданный класс конфигурации и добавьте следующую конфигурацию для использования парсера Jackson JSON для сериализации и десериализации данных сообщения.
@Bean
public Jackson2JsonMessageConverter messageConverter() {
// Установите конвертер сообщений по умолчанию
return new Jackson2JsonMessageConverter();
}