Java Pola Publish/Subscribe RabbitMQ (Mode Penyebaran, Mode Fanout)

Tipe pertukaran FanoutExchange digunakan untuk pola publish/subscribe dalam RabbitMQ, di mana pesan yang dikirim oleh produsen akan diproses oleh beberapa antrian konsumen. Arsitekturnya seperti yang ditunjukkan dalam diagram berikut:

Mode Kerja RabbitMQ

Pertukaran Fanout dapat meneruskan pesan ke semua antrian yang terikat.

Tip: Terlepas dari mode kerja RabbitMQ yang digunakan, perbedaannya terletak pada tipe pertukaran dan parameter routing yang digunakan.

1. Tutorial Prasyarat

Harap baca bagian berikut terlebih dahulu untuk memahami pengetahuan terkait:

2. Mendefinisikan Pertukaran Fanout

Dalam Spring AMQP, kelas FanoutExchange sesuai dengan pertukaran Fanout. Kita mendefinisikan pertukaran melalui kelas konfigurasi Spring Boot.

package com.tizi365.rabbitmq.config;

import org.springframework.amqp.core.FanoutExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class QueueConfig {
	// Mendefinisikan pertukaran
    @Bean
    public FanoutExchange fanout() {
        // Parameter adalah nama pertukaran, yang harus unik
        return new FanoutExchange("tizi365.fanout");
    }
}

Tip: Baik produsen pesan maupun konsumen memerlukan pertukaran.

3. Mengirim Pesan

Kita mengirim pesan ke pertukaran, yang akan mengirim pesan ke antrian yang sesuai berdasarkan aturan routing.

package com.tizi365.rabbitmq.service;

import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;

@Service
public class SendService {
    @Autowired
    private RabbitTemplate template;
    @Autowired
    private FanoutExchange fanout;

	// Untuk tujuan demonstrasi, tugas yang dijadwalkan digunakan untuk mengirim pesan setiap detik
    @Scheduled(fixedDelay = 1000, initialDelay = 1000)
    public void send() {
        // Konten pesan
        String message = "Halo Dunia!";
        // Mengirim pesan
        // Parameter pertama adalah nama pertukaran
        // Parameter kedua adalah kunci routing; pertukaran fanout akan mengabaikan kunci routing, jadi tidak perlu diatur
        // Parameter ketiga adalah konten pesan, yang mendukung jenis apa pun selama dapat diserialisasikan
        template.convertAndSend(fanout.getName(), "", message);
        System.out.println("Pesan terkirim: '" + message + "'");
    }
}

4. Menerima Pesan

4.1 Mendefinisikan Antrian & Mengikat Pertukaran

Untuk mengonsumsi pesan antrian, Anda perlu terlebih dahulu mendefinisikan antrian dan kemudian mengikat antrian ke pertukaran target. Di bawah ini, kita mendefinisikan dua antrian dan mengikatnya ke pertukaran yang sama.

package com.tizi365.rabbitmq.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class QueueConfig {
    @Bean
    public FanoutExchange fanout() {
        // Mendefinisikan pertukaran
        // Parameter adalah nama pertukaran, yang harus unik
        return new FanoutExchange("tizi365.fanout");
    }

    @Bean
    public Queue queue1() {
        // Mendefinisikan antrian 1
        return new Queue("tizi365.fanout.queue1");
    }

    @Bean
    public Queue queue2() {
        // Mendefinisikan antrian 2
        return new Queue("tizi365.fanout.queue2");
    }

    @Bean
    public Binding binding1(FanoutExchange fanout, Queue queue1) {
        // Mendefinisikan hubungan pengikatan, untuk mengikat antrian 1 ke pertukaran fanout
        return BindingBuilder.bind(queue1).to(fanout);
    }

    @Bean
    public Binding binding2(FanoutExchange fanout, Queue queue2) {
        // Mendefinisikan hubungan pengikatan, untuk mengikat antrian 2 ke pertukaran fanout
        return BindingBuilder.bind(queue2).to(fanout);
    }
}

4.2 Tentukan Pendengar Antrian

Menentukan pendengar pesan melalui anotasi RabbitListener untuk mengonsumsi pesan dari antrian tertentu.

package com.tizi365.rabbitmq.listener;

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

// Jadikan kelas saat ini dikelola oleh Spring
@Component
public class DemoListener {
    // Tentukan pendengar, dan tentukan antrian mana yang akan didengarkan menggunakan parameter antrian
    @RabbitListener(queues = "tizi365.fanout.queue1")
    public void receive1(String msg) {
        System.out.println("Menerima pesan dari antrian 1 = " + msg);
    }

    // Tentukan pendengar, dan tentukan antrian mana yang akan didengarkan menggunakan parameter antrian
    @RabbitListener(queues = "tizi365.fanout.queue2")
    public void receive2(String msg) {
        System.out.println("Menerima pesan dari antrian 2 = " + msg);
    }
}

Karena pertukaran sebelumnya didefinisikan sebagai tipe fanout, setiap pesan akan didistribusikan ke semua antrian yang terikat ke pertukaran saat ini, dan pesan akan ditangani secara terpisah oleh kedua metode di atas.

Catatan: Anotasi RabbitListener dapat diterapkan pada sebuah kelas atau metode. Jika anotasi RabbitListener didefinisikan pada sebuah kelas, perlu digabungkan dengan anotasi RabbitHandler untuk menandai metode kelas mana yang akan mengeksekusi penanganan pesan.

4.3 Tentukan Pendengar Antrian dengan Anotasi Lengkap

Anda tidak perlu class konfigurasi spring boot untuk mendefinisikan pertukaran, antrian, dan hubungan ikatan. Anda dapat langsung menentukan hubungan ikatan, antrian, dan pertukaran melalui parameter bindings dari anotasi RabbitListener.

@RabbitListener(
            bindings = {
                    @QueueBinding(
                            value = @Queue(name = "tizi365.fanout.queue3", durable = "true"),
                            exchange = @Exchange(name = "tizi365.fanout", durable = "true", type = ExchangeTypes.FANOUT)
                    )
            }
    )
public void receive3(String msg) {
    System.out.println("Menerima pesan dari antrian 3 = " + msg);
}

Penjelasan:

  • Anotasi QueueBinding: Mendefinisikan hubungan ikatan antara antrian dan pertukaran. Parameter value digunakan untuk mendefinisikan antrian, dan exchange digunakan untuk mendefinisikan pertukaran.
  • Anotasi Antrian: Mendefinisikan sebuah antrian. Parameter name mendefinisikan nama antrian (yang perlu unik), dan parameter durable menunjukkan apakah perlu tahan lama.
  • Anotasi Pertukaran: Mendefinisikan sebuah pertukaran. Parameter name mendefinisikan nama pertukaran, dan parameter type menunjukkan tipe pertukaran.