Mô hình định tuyến RabbitMQ (chế độ Trực tiếp) trong Java sử dụng loại trao đổi DirectExchange. Sự khác biệt so với mô hình publish-subscribe là trao đổi trực tiếp chuyển các thông điệp tới hàng đợi với các tham số định tuyến hoàn toàn phù hợp. Kiến trúc được hiển thị trong hình ảnh dưới đây:
Mẹo: Bất kể chế độ làm việc nào của RabbitMQ được sử dụng, sự khác biệt nằm ở loại trao đổi được sử dụng và các tham số định tuyến.
1. Hướng dẫn tiên quyết
Vui lòng đọc các phần sau để hiểu về kiến thức liên quan:
- Các Khái Niệm Cơ Bản về RabbitMQ
- Nguyên Lý Chế Độ Định Tuyến của RabbitMQ
- Bắt Đầu Nhanh với RabbitMQ trong Java (Phải đọc, vì các phần sau sẽ không nhân đôi mã, chỉ hiển thị mã chính)
- Mô Hình Publish-Subscribe cho Java với RabbitMQ (Phải đọc, vì cấu trúc mã gần như giống nhau, chỉ khác loại trao đổi và tham số định tuyến)
2. Xác định Loại Trực Tiếp
Trong Spring AMQP, lớp tương ứng với trao đổi trực tiếp là DirectExchange. Chúng ta xác định trao đổi thông qua một lớp cấu hình Spring Boot.
package com.tizi365.rabbitmq.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class QueueConfig {
@Bean
public DirectExchange direct() {
// Xác định trao đổi
// Tham số là tên trao đổi, phải là duy nhất
return new DirectExchange("tizi365.direct");
}
}
Mẹo: Cả người sản xuất và người tiêu dùng thông điệp đều cần một trao đổi.
3. Gửi Thông điệp
Chúng ta gửi thông điệp tới trao đổi, sau đó trao đổi sẽ chuyển thông điệp tới các hàng đợi tương ứng dựa trên quy tắc định tuyến.
package com.tizi365.rabbitmq.service;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
@Service
public class SendService {
@Autowired
private RabbitTemplate template;
@Autowired
private DirectExchange direct;
// Để tạo mẫu, chúng ta sử dụng một nhiệm vụ lập lịch để gửi một thông điệp mỗi giây
@Scheduled(fixedDelay = 1000, initialDelay = 1000)
public void send() {
// Nội dung thông điệp
String message = "Xin chào Thế Giới!";
// Gửi thông điệp
// Tham số đầu tiên là tên trao đổi
// Tham số thứ hai là khóa định tuyến. Với trao đổi trực tiếp, thông điệp được chuyển đến hàng đợi có khóa định tuyến phù hợp với "tizi365"
// Tham số thứ ba là nội dung thông điệp, hỗ trợ bất kỳ loại nào miễn là có thể được tuần tự hóa
template.convertAndSend(direct.getName(), "tizi365", message);
System.out.println("Đã gửi thông điệp '" + message + "'");
}
}
Mẹo: Chú ý đến tham số thứ hai trong phương thức convertAndSend, vì đây là một tham số quan trọng.
4. Nhận Thông điệp
4.1 Định nghĩa Hàng đợi và Ràng buộc Trao đổi
Để tiêu thụ các tin nhắn từ một hàng đợi, bạn cần định nghĩa hàng đợi trước và sau đó ràng buộc hàng đợi đó với sàn giao dịch mục tiêu. Đoạn code dưới đây định nghĩa hai hàng đợi và ràng buộc chúng với cùng một sàn giao dịch:
package com.tizi365.rabbitmq.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class QueueConfig {
@Bean
public DirectExchange direct() {
// Định nghĩa sàn giao dịch
// Tham số là tên sàn giao dịch, mà phải là duy nhất
return new DirectExchange("tizi365.direct");
}
@Bean
public Queue queue1() {
// Định nghĩa hàng đợi 1
return new Queue("tizi365.direct.queue1");
}
@Bean
public Queue queue2() {
// Định nghĩa hàng đợi 2
return new Queue("tizi365.direct.queue2");
}
@Bean
public Binding binding1(DirectExchange direct, Queue queue1) {
// Định nghĩa một ràng buộc để ràng buộc hàng đợi 1 với sàn giao dịch trực tiếp với một khóa định tuyến là "tizi365"
// Khi khóa định tuyến khớp với "tizi365", sàn giao dịch sẽ gửi các tin nhắn đến hàng đợi 1
return BindingBuilder.bind(queue1).to(direct).with("tizi365");
}
@Bean
public Binding binding2(DirectExchange direct, Queue queue2) {
// Định nghĩa một ràng buộc để ràng buộc hàng đợi 2 với sàn giao dịch trực tiếp với một khóa định tuyến là "baidu"
// Khi khóa định tuyến khớp với "baidu", sàn giao dịch sẽ gửi các tin nhắn đến hàng đợi 2
return BindingBuilder.bind(queue2).to(direct).with("baidu");
}
}
4.2 Định nghĩa Người nghe Hàng đợi
Định nghĩa người nghe tin nhắn bằng cách sử dụng chú thích RabbitListener để tiêu thụ tin nhắn từ các hàng đợi cụ thể:
package com.tizi365.rabbitmq.listener;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;
// Để Spring quản lý lớp hiện tại
@Component
public class DemoListener {
// Định nghĩa một người nghe để tiêu thụ tin nhắn từ hàng đợi 1
@RabbitListener(queues = "tizi365.direct.queue1")
public void receive1(String msg) {
System.out.println("Nhận thông điệp từ hàng đợi 1: " + msg);
}
// Định nghĩa một người nghe để tiêu thụ tin nhắn từ hàng đợi 2
@RabbitListener(queues = "tizi365.direct.queue2")
public void receive2(String msg) {
System.out.println("Nhận thông điệp từ hàng đợi 2: " + msg);
}
}
Vì chỉ hàng đợi 1 có một khóa định tuyến là "tizi365" khi ràng buộc với sàn giao dịch, chỉ hàng đợi 1 mới có thể nhận các tin nhắn. Hàng đợi 2, vì khóa định tuyến không khớp, sẽ không nhận bất kỳ tin nhắn nào.