Pola topik RabbitMQ Java menggunakan tipe TopicExchange berbeda dengan pola routing (Direct) karena parameter routing mendukung pencocokan kabur (fuzzy matching). Karena pencocokan routing lebih fleksibel, ini adalah pola yang lebih umum digunakan. Arsitekturnya ditampilkan dalam diagram berikut:
Hint: Terlepas dari pola kerja RabbitMQ yang digunakan, perbedaannya terletak pada jenis pertukaran yang digunakan dan parameter routing.
1. Tutorial Prasyarat
Harap baca bagian-bagian berikut terlebih dahulu untuk memahami pengetahuan terkait:
- Konsep Dasar RabbitMQ
- Prinsip Pola Topik RabbitMQ
- Pemula Cepat untuk Java dengan RabbitMQ (Harus dibaca, karena bagian-bagian berikutnya tidak akan mengulang kode, hanya menampilkan kode kunci)
- Bagian Pola Publikasi-Langganan RabbitMQ untuk Java (Harus dibaca, karena sintaks kode hampir sama, hanya jenis pertukaran dan parameter rute yang berbeda)
2. Mendefinisikan Pertukaran Topik
Dalam Spring AMQP, kelas yang sesuai dengan pertukaran Direct adalah TopicExchange. Kita mendefinisikan pertukaran melalui kelas konfigurasi Spring Boot.
package com.tizi365.rabbitmq.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class QueueConfig {
@Bean
public TopicExchange topic() {
// Mendefinisikan pertukaran
// Parameter pertukaran adalah nama pertukaran, yang harus unik
return new TopicExchange("tizi365.topic");
}
}
Hint: Baik produsen pesan maupun konsumen memerlukan pertukaran.
3. Mengirim Pesan
Kita mengirim pesan ke pertukaran, dan pertukaran mengirimkan pesan ke antrean yang sesuai berdasarkan aturan routing.
package com.tizi365.rabbitmq.service;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
@Service
public class SendService {
@Autowired
private RabbitTemplate template;
@Autowired
private TopicExchange topic;
// Untuk tujuan demonstrasi, tugas terjadwal digunakan di sini untuk mengirimkan pesan setiap detik
@Scheduled(fixedDelay = 1000, initialDelay = 1000)
public void send() {
// Isi pesan
String message = "Halo Dunia!";
// Mengirim pesan
// Parameter pertama adalah nama pertukaran
// Parameter kedua adalah kunci rute. Pertukaran topik mengirim pesan ke antrean yang cocok dengan kunci rute
// Parameter ketiga adalah isi pesan, mendukung jenis apa pun selama dapat diserialisasi
template.convertAndSend(topic.getName(), "www.tizi365.com", message);
System.out.println("Pesan terkirim: '" + message + "'");
}
}
Hint: Perhatikan parameter kedua "www.tizi365.com" dalam metode convertAndSend, karena itu adalah parameter penting.
4. Menerima Pesan
4.1 Mendefinisikan Antrian & Mengikat Exchange
Untuk mengonsumsi pesan dari antrian, Anda perlu terlebih dahulu mendefinisikan sebuah antrian dan kemudian mengikat antrian tersebut ke exchange target. Berikut, dua antrian didefinisikan dan diikat ke exchange yang sama.
package com.tizi365.rabbitmq.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class QueueConfig {
@Bean
public TopicExchange topic() {
// Mendefinisikan exchange
// Parameter adalah nama exchange, yang harus unik
return new TopicExchange("tizi365.topic");
}
@Bean
public Queue queue1() {
// Mendefinisikan antrian 1
return new Queue("tizi365.topic.queue1");
}
@Bean
public Queue queue2() {
// Mendefinisikan antrian 2
return new Queue("tizi365.topic.queue2");
}
@Bean
public Binding binding1(TopicExchange topic, Queue queue1) {
// Mendefinisikan hubungan ikatan, mengikat antrian 1 ke exchange topik dengan routing key: *.tizi365.com
return BindingBuilder.bind(queue1).to(topic).with("*.tizi365.com");
}
@Bean
public Binding binding2(TopicExchange topic, Queue queue2) {
// Mendefinisikan hubungan ikatan, mengikat antrian 2 ke exchange langsung dengan routing key: *.baidu.com
return BindingBuilder.bind(queue2).to(topic).with("*.baidu.com");
}
}
Tip: Saat mengikat antrian 1 dan antrian 2 ke exchange, routing key yang digunakan menggunakan wildcard * (bintang), yang cocok dengan satu kata. Jika diubah menjadi # (pagar), maka akan cocok dengan beberapa kata.
4.2 Mendefinisikan Pendengar Antrian
Mendefinisikan pendengar pesan menggunakan anotasi RabbitListener untuk mengonsumsi pesan dari antrian tertentu.
package com.tizi365.rabbitmq.listener;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;
// Biarkan Spring mengelola kelas saat ini
@Component
public class DemoListener {
// Mendefinisikan pendengar, menentukan antrian mana yang didengarkan melalui parameter antrian
@RabbitListener(queues = "tizi365.topic.queue1")
public void receive1(String msg) {
System.out.println("Menerima pesan dari antrian 1: " + msg);
}
// Mendefinisikan pendengar, menentukan antrian mana yang didengarkan melalui parameter antrian
@RabbitListener(queues = "tizi365.topic.queue2")
public void receive2(String msg) {
System.out.println("Menerima pesan dari antrian 2: " + msg);
}
}
Karena hanya antrian 1 memiliki routing key *.tizi365.com saat diikat ke exchange, itu akan cocok dengan pesan dengan routing key (www.tizi365.com). Oleh karena itu, hanya antrian 1 yang akan menerima pesan, sedangkan antrian 2 tidak akan menerima pesan karena routing key tidak cocok.