Pola routing RabbitMQ (mode Langsung) dalam Java menggunakan tipe pertukaran DirectExchange. Perbedannya dengan pola publish-subscribe adalah bahwa pertukaran Langsung mengirim pesan ke antrian dengan parameter routing yang sepenuhnya cocok. Arsitekturnya seperti yang ditunjukkan pada gambar di bawah ini:

RabbitMQ Direct Mode

Tip: Terlepas dari mode kerja RabbitMQ yang digunakan, perbedaannya terletak pada tipe pertukaran yang digunakan dan parameter routing.

1. Tutorial Prasyarat

Silakan baca bagian berikut untuk memahami pengetahuan terkait:

2. Mendefinisikan Pertukaran Langsung

Pada Spring AMQP, kelas yang sesuai dengan pertukaran Langsung adalah DirectExchange. Kami mendefinisikan pertukaran melalui kelas konfigurasi Spring Boot.

package com.tizi365.rabbitmq.config;

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class QueueConfig {
    @Bean
    public DirectExchange direct() {
        // Definisikan pertukaran
        // Parameter adalah nama pertukaran, yang harus unik
        return new DirectExchange("tizi365.direct");
    }
}

Tip: Baik produsen pesan maupun konsumen memerlukan pertukaran.

3. Mengirim Pesan

Kami mengirim pesan ke pertukaran, yang kemudian mengirim pesan ke antrian yang sesuai berdasarkan aturan routing.

package com.tizi365.rabbitmq.service;

import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;

@Service
public class SendService {
    @Autowired
    private RabbitTemplate template;
    @Autowired
    private DirectExchange direct;

    // Untuk demonstrasi, kami menggunakan tugas terjadwal untuk mengirim pesan setiap detik
    @Scheduled(fixedDelay = 1000, initialDelay = 1000)
    public void send() {
        // Isi pesan
        String message = "Halo Dunia!";
        // Kirim pesan
        // Parameter pertama adalah nama pertukaran
        // Parameter kedua adalah kunci routing. Dengan pertukaran langsung, pesan dikirim ke antrian yang kunci routing-nya cocok dengan "tizi365"
        // Parameter ketiga adalah konten pesan, yang mendukung jenis apa pun selama dapat diserialkan
        template.convertAndSend(direct.getName(), "tizi365", message);
        System.out.println("Mengirim pesan '" + message + "'");
    }
}

Tip: Perhatikan parameter kedua dalam metode convertAndSend, karena ini adalah parameter penting.

4. Menerima Pesan

4.1 Mendefinisikan Antrian dan Mengikat Exchange

Untuk mengonsumsi pesan dari sebuah antrian, Anda perlu terlebih dahulu mendefinisikan antrian tersebut, dan kemudian mengikat antrian ke exchange target. Berikut adalah kode yang mendefinisikan dua antrian dan mengikat keduanya ke exchange yang sama:

package com.tizi365.rabbitmq.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class QueueConfig {
    @Bean
    public DirectExchange direct() {
        // Mendefinisikan exchange
        // Parameter yang dibutuhkan adalah nama exchange, yang harus unik
        return new DirectExchange("tizi365.direct");
    }

    @Bean
    public Queue queue1() {
        // Mendefinisikan antrian 1
        return new Queue("tizi365.direct.queue1");
    }

    @Bean
    public Queue queue2() {
        // Mendefinisikan antrian 2
        return new Queue("tizi365.direct.queue2");
    }

    @Bean
    public Binding binding1(DirectExchange direct, Queue queue1) {
        // Mendefinisikan ikatan untuk mengikat antrian 1 ke exchange langsung dengan routing key "tizi365"
		// Ketika routing key cocok dengan "tizi365", exchange akan mengirimkan pesan ke antrian 1
        return BindingBuilder.bind(queue1).to(direct).with("tizi365");
    }

    @Bean
    public Binding binding2(DirectExchange direct, Queue queue2) {
        // Mendefinisikan ikatan untuk mengikat antrian 2 ke exchange langsung dengan routing key "baidu"
		// Ketika routing key cocok dengan "baidu", exchange akan mengirimkan pesan ke antrian 2
        return BindingBuilder.bind(queue2).to(direct).with("baidu");
    }
}

4.2 Mendefinisikan Pemantau Antrian

Mendefinisikan pemantau pesan menggunakan anotasi RabbitListener untuk mengonsumsi pesan dari antrian tertentu:

package com.tizi365.rabbitmq.listener;

import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;

// Biarkan Spring mengelola kelas saat ini
@Component
public class DemoListener {
    // Mendefinisikan pemantau untuk mengonsumsi pesan dari antrian1
    @RabbitListener(queues = "tizi365.direct.queue1")
    public void receive1(String msg) {
        System.out.println("Menerima pesan dari antrian1: " + msg);
    }

    // Mendefinisikan pemantau untuk mengonsumsi pesan dari antrian2
    @RabbitListener(queues = "tizi365.direct.queue2")
    public void receive2(String msg) {
        System.out.println("Menerima pesan dari antrian2: " + msg);
    }
}

Karena hanya antrian 1 memiliki routing key "tizi365" saat diikat ke exchange, hanya antrian 1 yang dapat menerima pesan. Antrian 2, karena routing key tidak cocok, tidak akan menerima pesan apa pun.