Java RabbitMQ Yayın/Abone Deseni (Yayın Modu, Fanout Modu)

FanoutExchange değişimi, RabbitMQ'da yayın/abone deseni için kullanılır. Bu desende bir üretici tarafından gönderilen bir mesaj, birden fazla tüketici kuyruğu tarafından işlenecektir. Mimaride aşağıdaki diyagramda gösterildiği gibi:

RabbitMQ Çalışma Modu

Fanout exchange, tüm bağlı kuyruklara mesajları iletebilir.

İpucu: Kullanılan RabbitMQ çalışma modundan bağımsız olarak, fark, değişim türü ve kullanılan yönlendirme parametrelerinde yatar.

1. Önkoşul Öğreticiler

İlgili bilgileri anlamak için lütfen önce aşağıdaki bölümleri okuyun:

2. Fanout Değişimini Tanımlama

Spring AMQP'de, FanoutExchange sınıfı Fanout değişimine karşılık gelir. Değişimi bir Spring Boot yapılandırma sınıfı aracılığıyla tanımlarız.

package com.tizi365.rabbitmq.config;

import org.springframework.amqp.core.FanoutExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class QueueConfig {
	// Değişimi tanımla
    @Bean
    public FanoutExchange fanout() {
        // Parametre değişim adıdır ve benzersiz olmalıdır
        return new FanoutExchange("tizi365.fanout");
    }
}

İpucu: Hem mesaj üreticileri hem de tüketicileri bir değişime ihtiyaç duyar.

3. Mesaj Gönderme

Mesajları değişime göndeririz, değişim mesajları yönlendirme kurallarına göre ilgili kuyruklara iletecektir.

package com.tizi365.rabbitmq.service;

import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;

@Service
public class SendService {
    @Autowired
    private RabbitTemplate template;
    @Autowired
    private FanoutExchange fanout;

	// Demonstration amaçlı, her saniye bir mesaj göndermek için bir zamanlanmış görev kullanılır
    @Scheduled(fixedDelay = 1000, initialDelay = 1000)
    public void send() {
        // Mesaj içeriği
        String message = "Merhaba Dünya!";
        // Mesajı gönder
        // İlk parametre değişim adıdır
        // İkinci parametre yönlendirme anahtarıdır; fanout değişimi yönlendirme anahtarını göz ardı eder, bu yüzden ayarlanması gerekmez
        // Üçüncü parametre mesaj içeriğidir, herhangi bir türü destekler, sadece seri hale getirilebilirse
        template.convertAndSend(fanout.getName(), "", message);
        System.out.println("Mesaj gönderildi: '" + message + "'");
    }
}

4. Mesaj Alımı

4.1 Kuyrukları Tanımlama ve Değişime Bağlama

Kuyruk mesajlarını tüketmek için önce bir kuyruğu tanımlamalısınız ve ardından kuyruğu hedef değişime bağlamalısınız. Aşağıda, iki kuyruğu tanımlıyor ve bunları aynı değişime bağlıyoruz.

package com.tizi365.rabbitmq.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class QueueConfig {
    @Bean
    public FanoutExchange fanout() {
        // Değişimi tanımla
        // Parametre değişim adıdır ve benzersiz olmalıdır
        return new FanoutExchange("tizi365.fanout");
    }

    @Bean
    public Queue queue1() {
        // Kuyruğu tanımla 1
        return new Queue("tizi365.fanout.queue1");
    }

    @Bean
    public Queue queue2() {
        // Kuyruğu tanımla 2
        return new Queue("tizi365.fanout.queue2");
    }

    @Bean
    public Binding binding1(FanoutExchange fanout, Queue queue1) {
        // Bağlantı ilişkisini tanımla, kuyruğu 1'i fanout değişimine bağlamak için
        return BindingBuilder.bind(queue1).to(fanout);
    }

    @Bean
    public Binding binding2(FanoutExchange fanout, Queue queue2) {
        // Bağlantı ilişkisini tanımla, kuyruğu 2'yi fanout değişimine bağlamak için
        return BindingBuilder.bind(queue2).to(fanout);
    }
}

4.2 Kuyruk Dinleyicilerini Tanımlama

Belirli kuyruklardan mesajları tüketmek için RabbitListener açıklamasıyla mesaj dinleyicilerini tanımlayın.

package com.tizi365.rabbitmq.listener;

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

// Mevcut sınıfın Spring tarafından yönetilmesini sağlar
@Component
public class DemoListener {
    // Bir dinleyici tanımlayın ve hangi kuyruğu dinleyeceğinizi queues parametresini kullanarak belirtin
    @RabbitListener(queues = "tizi365.fanout.queue1")
    public void receive1(String msg) {
        System.out.println("Kuyruk 1'den gelen mesaj alındı = " + msg);
    }

    // Bir dinleyici tanımlayın ve hangi kuyruğu dinleyeceğinizi queues parametresini kullanarak belirtin
    @RabbitListener(queues = "tizi365.fanout.queue2")
    public void receive2(String msg) {
        System.out.println("Kuyruk 2'den gelen mesaj alındı = " + msg);
    }
}

Daha önceki örnek, fanout türünde bir takas olarak tanımlanmıştı, bu nedenle her mesaj mevcut takasa bağlı tüm kuyruklara dağıtılacak ve mesajlar yukarıdaki iki yöntem tarafından ayrı ayrı işlenecektir.

Not: RabbitListener açıklaması bir sınıfa veya metoda uygulanabilir. Eğer RabbitListener açıklaması bir sınıfa uygulanmışsa, mesaj işleme hangi sınıf yönteminin yürütüleceğini belirtmek için RabbitHandler açıklaması ile birleştirilmelidir.

4.3 Tüm Açıklama İle Kuyruk Dinleyicileri Tanımlama

Takas, kuyruk ve bağlama ilişkilerini tanımlamak için spring boot yapılandırma sınıfına ihtiyacınız yoktur. RabbitListener açıklamasının bindings parametresi aracılığıyla doğrudan bağlama ilişkilerini, kuyrukları ve takasları tanımlayabilirsiniz.

@RabbitListener(
            bindings = {
                    @QueueBinding(
                            value = @Queue(name = "tizi365.fanout.queue3", durable = "true"),
                            exchange = @Exchange(name = "tizi365.fanout", durable = "true", type = ExchangeTypes.FANOUT)
                    )
            }
    )
public void receive3(String msg) {
    System.out.println("Kuyruk 3'ten gelen mesaj alındı = " + msg);
}

Açıklama:

  • QueueBinding açıklaması: Kuyruk ile takas arasındaki bağlama ilişkisini tanımlar. value parametresi kuyruğu tanımlamak için kullanılır ve exchange takası tanımlamak için kullanılır.
  • Queue açıklaması: Bir kuyruğu tanımlar. name parametresi kuyruk adını tanımlar (benzersiz olması gerekir) ve durable parametresi kuyruğun kalıcı olup olmayacağını belirtir.
  • Exchange açıklaması: Bir takası tanımlar. name parametresi takas adını tanımlar, type parametresi takas türünü belirtir.