Mô hình định tuyến RabbitMQ (chế độ Trực tiếp) trong Java sử dụng loại trao đổi DirectExchange. Sự khác biệt so với mô hình publish-subscribe là trao đổi trực tiếp chuyển các thông điệp tới hàng đợi với các tham số định tuyến hoàn toàn phù hợp. Kiến ​​trúc được hiển thị trong hình ảnh dưới đây:

RabbitMQ Direct Mode

Mẹo: Bất kể chế độ làm việc nào của RabbitMQ được sử dụng, sự khác biệt nằm ở loại trao đổi được sử dụng và các tham số định tuyến.

1. Hướng dẫn tiên quyết

Vui lòng đọc các phần sau để hiểu về kiến thức liên quan:

2. Xác định Loại Trực Tiếp

Trong Spring AMQP, lớp tương ứng với trao đổi trực tiếp là DirectExchange. Chúng ta xác định trao đổi thông qua một lớp cấu hình Spring Boot.

package com.tizi365.rabbitmq.config;

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class QueueConfig {
    @Bean
    public DirectExchange direct() {
        // Xác định trao đổi
        // Tham số là tên trao đổi, phải là duy nhất
        return new DirectExchange("tizi365.direct");
    }
}

Mẹo: Cả người sản xuất và người tiêu dùng thông điệp đều cần một trao đổi.

3. Gửi Thông điệp

Chúng ta gửi thông điệp tới trao đổi, sau đó trao đổi sẽ chuyển thông điệp tới các hàng đợi tương ứng dựa trên quy tắc định tuyến.

package com.tizi365.rabbitmq.service;

import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;

@Service
public class SendService {
    @Autowired
    private RabbitTemplate template;
    @Autowired
    private DirectExchange direct;

    // Để tạo mẫu, chúng ta sử dụng một nhiệm vụ lập lịch để gửi một thông điệp mỗi giây
    @Scheduled(fixedDelay = 1000, initialDelay = 1000)
    public void send() {
        // Nội dung thông điệp
        String message = "Xin chào Thế Giới!";
        // Gửi thông điệp
        // Tham số đầu tiên là tên trao đổi
        // Tham số thứ hai là khóa định tuyến. Với trao đổi trực tiếp, thông điệp được chuyển đến hàng đợi có khóa định tuyến phù hợp với "tizi365"
        // Tham số thứ ba là nội dung thông điệp, hỗ trợ bất kỳ loại nào miễn là có thể được tuần tự hóa
        template.convertAndSend(direct.getName(), "tizi365", message);
        System.out.println("Đã gửi thông điệp '" + message + "'");
    }
}

Mẹo: Chú ý đến tham số thứ hai trong phương thức convertAndSend, vì đây là một tham số quan trọng.

4. Nhận Thông điệp

4.1 Định nghĩa Hàng đợi và Ràng buộc Trao đổi

Để tiêu thụ các tin nhắn từ một hàng đợi, bạn cần định nghĩa hàng đợi trước và sau đó ràng buộc hàng đợi đó với sàn giao dịch mục tiêu. Đoạn code dưới đây định nghĩa hai hàng đợi và ràng buộc chúng với cùng một sàn giao dịch:

package com.tizi365.rabbitmq.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class QueueConfig {
    @Bean
    public DirectExchange direct() {
        // Định nghĩa sàn giao dịch
        // Tham số là tên sàn giao dịch, mà phải là duy nhất
        return new DirectExchange("tizi365.direct");
    }

    @Bean
    public Queue queue1() {
        // Định nghĩa hàng đợi 1
        return new Queue("tizi365.direct.queue1");
    }

    @Bean
    public Queue queue2() {
        // Định nghĩa hàng đợi 2
        return new Queue("tizi365.direct.queue2");
    }

    @Bean
    public Binding binding1(DirectExchange direct, Queue queue1) {
        // Định nghĩa một ràng buộc để ràng buộc hàng đợi 1 với sàn giao dịch trực tiếp với một khóa định tuyến là "tizi365"
		// Khi khóa định tuyến khớp với "tizi365", sàn giao dịch sẽ gửi các tin nhắn đến hàng đợi 1
        return BindingBuilder.bind(queue1).to(direct).with("tizi365");
    }

    @Bean
    public Binding binding2(DirectExchange direct, Queue queue2) {
        // Định nghĩa một ràng buộc để ràng buộc hàng đợi 2 với sàn giao dịch trực tiếp với một khóa định tuyến là "baidu"
		// Khi khóa định tuyến khớp với "baidu", sàn giao dịch sẽ gửi các tin nhắn đến hàng đợi 2
        return BindingBuilder.bind(queue2).to(direct).with("baidu");
    }
}

4.2 Định nghĩa Người nghe Hàng đợi

Định nghĩa người nghe tin nhắn bằng cách sử dụng chú thích RabbitListener để tiêu thụ tin nhắn từ các hàng đợi cụ thể:

package com.tizi365.rabbitmq.listener;

import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;

// Để Spring quản lý lớp hiện tại
@Component
public class DemoListener {
    // Định nghĩa một người nghe để tiêu thụ tin nhắn từ hàng đợi 1
    @RabbitListener(queues = "tizi365.direct.queue1")
    public void receive1(String msg) {
        System.out.println("Nhận thông điệp từ hàng đợi 1: " + msg);
    }

    // Định nghĩa một người nghe để tiêu thụ tin nhắn từ hàng đợi 2
    @RabbitListener(queues = "tizi365.direct.queue2")
    public void receive2(String msg) {
        System.out.println("Nhận thông điệp từ hàng đợi 2: " + msg);
    }
}

Vì chỉ hàng đợi 1 có một khóa định tuyến là "tizi365" khi ràng buộc với sàn giao dịch, chỉ hàng đợi 1 mới có thể nhận các tin nhắn. Hàng đợi 2, vì khóa định tuyến không khớp, sẽ không nhận bất kỳ tin nhắn nào.