Java Pola Publish/Subscribe RabbitMQ (Mode Penyebaran, Mode Fanout)
Tipe pertukaran FanoutExchange digunakan untuk pola publish/subscribe dalam RabbitMQ, di mana pesan yang dikirim oleh produsen akan diproses oleh beberapa antrian konsumen. Arsitekturnya seperti yang ditunjukkan dalam diagram berikut:
Pertukaran Fanout dapat meneruskan pesan ke semua antrian yang terikat.
Tip: Terlepas dari mode kerja RabbitMQ yang digunakan, perbedaannya terletak pada tipe pertukaran dan parameter routing yang digunakan.
1. Tutorial Prasyarat
Harap baca bagian berikut terlebih dahulu untuk memahami pengetahuan terkait:
- Konsep Dasar RabbitMQ
- Pola Publish/Subscribe RabbitMQ
- Panduan Cepat RabbitMQ Java (wajib, karena bagian selanjutnya tidak akan menduplikasi kode, hanya menampilkan potongan kode kunci)
2. Mendefinisikan Pertukaran Fanout
Dalam Spring AMQP, kelas FanoutExchange sesuai dengan pertukaran Fanout. Kita mendefinisikan pertukaran melalui kelas konfigurasi Spring Boot.
package com.tizi365.rabbitmq.config;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class QueueConfig {
// Mendefinisikan pertukaran
@Bean
public FanoutExchange fanout() {
// Parameter adalah nama pertukaran, yang harus unik
return new FanoutExchange("tizi365.fanout");
}
}
Tip: Baik produsen pesan maupun konsumen memerlukan pertukaran.
3. Mengirim Pesan
Kita mengirim pesan ke pertukaran, yang akan mengirim pesan ke antrian yang sesuai berdasarkan aturan routing.
package com.tizi365.rabbitmq.service;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
@Service
public class SendService {
@Autowired
private RabbitTemplate template;
@Autowired
private FanoutExchange fanout;
// Untuk tujuan demonstrasi, tugas yang dijadwalkan digunakan untuk mengirim pesan setiap detik
@Scheduled(fixedDelay = 1000, initialDelay = 1000)
public void send() {
// Konten pesan
String message = "Halo Dunia!";
// Mengirim pesan
// Parameter pertama adalah nama pertukaran
// Parameter kedua adalah kunci routing; pertukaran fanout akan mengabaikan kunci routing, jadi tidak perlu diatur
// Parameter ketiga adalah konten pesan, yang mendukung jenis apa pun selama dapat diserialisasikan
template.convertAndSend(fanout.getName(), "", message);
System.out.println("Pesan terkirim: '" + message + "'");
}
}
4. Menerima Pesan
4.1 Mendefinisikan Antrian & Mengikat Pertukaran
Untuk mengonsumsi pesan antrian, Anda perlu terlebih dahulu mendefinisikan antrian dan kemudian mengikat antrian ke pertukaran target. Di bawah ini, kita mendefinisikan dua antrian dan mengikatnya ke pertukaran yang sama.
package com.tizi365.rabbitmq.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class QueueConfig {
@Bean
public FanoutExchange fanout() {
// Mendefinisikan pertukaran
// Parameter adalah nama pertukaran, yang harus unik
return new FanoutExchange("tizi365.fanout");
}
@Bean
public Queue queue1() {
// Mendefinisikan antrian 1
return new Queue("tizi365.fanout.queue1");
}
@Bean
public Queue queue2() {
// Mendefinisikan antrian 2
return new Queue("tizi365.fanout.queue2");
}
@Bean
public Binding binding1(FanoutExchange fanout, Queue queue1) {
// Mendefinisikan hubungan pengikatan, untuk mengikat antrian 1 ke pertukaran fanout
return BindingBuilder.bind(queue1).to(fanout);
}
@Bean
public Binding binding2(FanoutExchange fanout, Queue queue2) {
// Mendefinisikan hubungan pengikatan, untuk mengikat antrian 2 ke pertukaran fanout
return BindingBuilder.bind(queue2).to(fanout);
}
}
4.2 Tentukan Pendengar Antrian
Menentukan pendengar pesan melalui anotasi RabbitListener untuk mengonsumsi pesan dari antrian tertentu.
package com.tizi365.rabbitmq.listener;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
// Jadikan kelas saat ini dikelola oleh Spring
@Component
public class DemoListener {
// Tentukan pendengar, dan tentukan antrian mana yang akan didengarkan menggunakan parameter antrian
@RabbitListener(queues = "tizi365.fanout.queue1")
public void receive1(String msg) {
System.out.println("Menerima pesan dari antrian 1 = " + msg);
}
// Tentukan pendengar, dan tentukan antrian mana yang akan didengarkan menggunakan parameter antrian
@RabbitListener(queues = "tizi365.fanout.queue2")
public void receive2(String msg) {
System.out.println("Menerima pesan dari antrian 2 = " + msg);
}
}
Karena pertukaran sebelumnya didefinisikan sebagai tipe fanout, setiap pesan akan didistribusikan ke semua antrian yang terikat ke pertukaran saat ini, dan pesan akan ditangani secara terpisah oleh kedua metode di atas.
Catatan: Anotasi RabbitListener dapat diterapkan pada sebuah kelas atau metode. Jika anotasi RabbitListener didefinisikan pada sebuah kelas, perlu digabungkan dengan anotasi RabbitHandler untuk menandai metode kelas mana yang akan mengeksekusi penanganan pesan.
4.3 Tentukan Pendengar Antrian dengan Anotasi Lengkap
Anda tidak perlu class konfigurasi spring boot untuk mendefinisikan pertukaran, antrian, dan hubungan ikatan. Anda dapat langsung menentukan hubungan ikatan, antrian, dan pertukaran melalui parameter bindings dari anotasi RabbitListener.
@RabbitListener(
bindings = {
@QueueBinding(
value = @Queue(name = "tizi365.fanout.queue3", durable = "true"),
exchange = @Exchange(name = "tizi365.fanout", durable = "true", type = ExchangeTypes.FANOUT)
)
}
)
public void receive3(String msg) {
System.out.println("Menerima pesan dari antrian 3 = " + msg);
}
Penjelasan:
- Anotasi QueueBinding: Mendefinisikan hubungan ikatan antara antrian dan pertukaran. Parameter value digunakan untuk mendefinisikan antrian, dan exchange digunakan untuk mendefinisikan pertukaran.
- Anotasi Antrian: Mendefinisikan sebuah antrian. Parameter name mendefinisikan nama antrian (yang perlu unik), dan parameter durable menunjukkan apakah perlu tahan lama.
- Anotasi Pertukaran: Mendefinisikan sebuah pertukaran. Parameter name mendefinisikan nama pertukaran, dan parameter type menunjukkan tipe pertukaran.