Pola topik RabbitMQ Java menggunakan tipe TopicExchange berbeda dengan pola routing (Direct) karena parameter routing mendukung pencocokan kabur (fuzzy matching). Karena pencocokan routing lebih fleksibel, ini adalah pola yang lebih umum digunakan. Arsitekturnya ditampilkan dalam diagram berikut:

RabbitMQ Topic Pattern

Hint: Terlepas dari pola kerja RabbitMQ yang digunakan, perbedaannya terletak pada jenis pertukaran yang digunakan dan parameter routing.

1. Tutorial Prasyarat

Harap baca bagian-bagian berikut terlebih dahulu untuk memahami pengetahuan terkait:

2. Mendefinisikan Pertukaran Topik

Dalam Spring AMQP, kelas yang sesuai dengan pertukaran Direct adalah TopicExchange. Kita mendefinisikan pertukaran melalui kelas konfigurasi Spring Boot.

package com.tizi365.rabbitmq.config;

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class QueueConfig {
    @Bean
    public TopicExchange topic() {
        // Mendefinisikan pertukaran
        // Parameter pertukaran adalah nama pertukaran, yang harus unik
        return new TopicExchange("tizi365.topic");
    }
}

Hint: Baik produsen pesan maupun konsumen memerlukan pertukaran.

3. Mengirim Pesan

Kita mengirim pesan ke pertukaran, dan pertukaran mengirimkan pesan ke antrean yang sesuai berdasarkan aturan routing.

package com.tizi365.rabbitmq.service;

import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;

@Service
public class SendService {
    @Autowired
    private RabbitTemplate template;
    @Autowired
    private TopicExchange topic;

    // Untuk tujuan demonstrasi, tugas terjadwal digunakan di sini untuk mengirimkan pesan setiap detik
    @Scheduled(fixedDelay = 1000, initialDelay = 1000)
    public void send() {
        // Isi pesan
        String message = "Halo Dunia!";
        // Mengirim pesan
        // Parameter pertama adalah nama pertukaran
        // Parameter kedua adalah kunci rute. Pertukaran topik mengirim pesan ke antrean yang cocok dengan kunci rute
        // Parameter ketiga adalah isi pesan, mendukung jenis apa pun selama dapat diserialisasi
        template.convertAndSend(topic.getName(), "www.tizi365.com", message);
        System.out.println("Pesan terkirim: '" + message + "'");
    }
}

Hint: Perhatikan parameter kedua "www.tizi365.com" dalam metode convertAndSend, karena itu adalah parameter penting.

4. Menerima Pesan

4.1 Mendefinisikan Antrian & Mengikat Exchange

Untuk mengonsumsi pesan dari antrian, Anda perlu terlebih dahulu mendefinisikan sebuah antrian dan kemudian mengikat antrian tersebut ke exchange target. Berikut, dua antrian didefinisikan dan diikat ke exchange yang sama.

package com.tizi365.rabbitmq.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class QueueConfig {
    @Bean
    public TopicExchange topic() {
        // Mendefinisikan exchange
        // Parameter adalah nama exchange, yang harus unik
        return new TopicExchange("tizi365.topic");
    }

    @Bean
    public Queue queue1() {
        // Mendefinisikan antrian 1
        return new Queue("tizi365.topic.queue1");
    }

    @Bean
    public Queue queue2() {
        // Mendefinisikan antrian 2
        return new Queue("tizi365.topic.queue2");
    }

    @Bean
    public Binding binding1(TopicExchange topic, Queue queue1) {
        // Mendefinisikan hubungan ikatan, mengikat antrian 1 ke exchange topik dengan routing key: *.tizi365.com
        return BindingBuilder.bind(queue1).to(topic).with("*.tizi365.com");
    }

    @Bean
    public Binding binding2(TopicExchange topic, Queue queue2) {
        // Mendefinisikan hubungan ikatan, mengikat antrian 2 ke exchange langsung dengan routing key: *.baidu.com
        return BindingBuilder.bind(queue2).to(topic).with("*.baidu.com");
    }
}

Tip: Saat mengikat antrian 1 dan antrian 2 ke exchange, routing key yang digunakan menggunakan wildcard * (bintang), yang cocok dengan satu kata. Jika diubah menjadi # (pagar), maka akan cocok dengan beberapa kata.

4.2 Mendefinisikan Pendengar Antrian

Mendefinisikan pendengar pesan menggunakan anotasi RabbitListener untuk mengonsumsi pesan dari antrian tertentu.

package com.tizi365.rabbitmq.listener;

import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;

// Biarkan Spring mengelola kelas saat ini
@Component
public class DemoListener {
    // Mendefinisikan pendengar, menentukan antrian mana yang didengarkan melalui parameter antrian
    @RabbitListener(queues = "tizi365.topic.queue1")
    public void receive1(String msg) {
        System.out.println("Menerima pesan dari antrian 1: " + msg);
    }

    // Mendefinisikan pendengar, menentukan antrian mana yang didengarkan melalui parameter antrian
    @RabbitListener(queues = "tizi365.topic.queue2")
    public void receive2(String msg) {
        System.out.println("Menerima pesan dari antrian 2: " + msg);
    }
}

Karena hanya antrian 1 memiliki routing key *.tizi365.com saat diikat ke exchange, itu akan cocok dengan pesan dengan routing key (www.tizi365.com). Oleh karena itu, hanya antrian 1 yang akan menerima pesan, sedangkan antrian 2 tidak akan menerima pesan karena routing key tidak cocok.