Java RabbitMQ Yayın/Abone Deseni (Yayın Modu, Fanout Modu)
FanoutExchange değişimi, RabbitMQ'da yayın/abone deseni için kullanılır. Bu desende bir üretici tarafından gönderilen bir mesaj, birden fazla tüketici kuyruğu tarafından işlenecektir. Mimaride aşağıdaki diyagramda gösterildiği gibi:
Fanout exchange, tüm bağlı kuyruklara mesajları iletebilir.
İpucu: Kullanılan RabbitMQ çalışma modundan bağımsız olarak, fark, değişim türü ve kullanılan yönlendirme parametrelerinde yatar.
1. Önkoşul Öğreticiler
İlgili bilgileri anlamak için lütfen önce aşağıdaki bölümleri okuyun:
- RabbitMQ Temel Kavramlar
- RabbitMQ Yayın/Abone Deseni
- RabbitMQ Java Hızlı Başlangıç Rehberi (zorunlu, çünkü sonraki bölümler kodu tekrarlamayacak, sadece temel kod parçacıklarını gösterecek)
2. Fanout Değişimini Tanımlama
Spring AMQP'de, FanoutExchange sınıfı Fanout değişimine karşılık gelir. Değişimi bir Spring Boot yapılandırma sınıfı aracılığıyla tanımlarız.
package com.tizi365.rabbitmq.config;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class QueueConfig {
// Değişimi tanımla
@Bean
public FanoutExchange fanout() {
// Parametre değişim adıdır ve benzersiz olmalıdır
return new FanoutExchange("tizi365.fanout");
}
}
İpucu: Hem mesaj üreticileri hem de tüketicileri bir değişime ihtiyaç duyar.
3. Mesaj Gönderme
Mesajları değişime göndeririz, değişim mesajları yönlendirme kurallarına göre ilgili kuyruklara iletecektir.
package com.tizi365.rabbitmq.service;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
@Service
public class SendService {
@Autowired
private RabbitTemplate template;
@Autowired
private FanoutExchange fanout;
// Demonstration amaçlı, her saniye bir mesaj göndermek için bir zamanlanmış görev kullanılır
@Scheduled(fixedDelay = 1000, initialDelay = 1000)
public void send() {
// Mesaj içeriği
String message = "Merhaba Dünya!";
// Mesajı gönder
// İlk parametre değişim adıdır
// İkinci parametre yönlendirme anahtarıdır; fanout değişimi yönlendirme anahtarını göz ardı eder, bu yüzden ayarlanması gerekmez
// Üçüncü parametre mesaj içeriğidir, herhangi bir türü destekler, sadece seri hale getirilebilirse
template.convertAndSend(fanout.getName(), "", message);
System.out.println("Mesaj gönderildi: '" + message + "'");
}
}
4. Mesaj Alımı
4.1 Kuyrukları Tanımlama ve Değişime Bağlama
Kuyruk mesajlarını tüketmek için önce bir kuyruğu tanımlamalısınız ve ardından kuyruğu hedef değişime bağlamalısınız. Aşağıda, iki kuyruğu tanımlıyor ve bunları aynı değişime bağlıyoruz.
package com.tizi365.rabbitmq.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class QueueConfig {
@Bean
public FanoutExchange fanout() {
// Değişimi tanımla
// Parametre değişim adıdır ve benzersiz olmalıdır
return new FanoutExchange("tizi365.fanout");
}
@Bean
public Queue queue1() {
// Kuyruğu tanımla 1
return new Queue("tizi365.fanout.queue1");
}
@Bean
public Queue queue2() {
// Kuyruğu tanımla 2
return new Queue("tizi365.fanout.queue2");
}
@Bean
public Binding binding1(FanoutExchange fanout, Queue queue1) {
// Bağlantı ilişkisini tanımla, kuyruğu 1'i fanout değişimine bağlamak için
return BindingBuilder.bind(queue1).to(fanout);
}
@Bean
public Binding binding2(FanoutExchange fanout, Queue queue2) {
// Bağlantı ilişkisini tanımla, kuyruğu 2'yi fanout değişimine bağlamak için
return BindingBuilder.bind(queue2).to(fanout);
}
}
4.2 Kuyruk Dinleyicilerini Tanımlama
Belirli kuyruklardan mesajları tüketmek için RabbitListener açıklamasıyla mesaj dinleyicilerini tanımlayın.
package com.tizi365.rabbitmq.listener;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
// Mevcut sınıfın Spring tarafından yönetilmesini sağlar
@Component
public class DemoListener {
// Bir dinleyici tanımlayın ve hangi kuyruğu dinleyeceğinizi queues parametresini kullanarak belirtin
@RabbitListener(queues = "tizi365.fanout.queue1")
public void receive1(String msg) {
System.out.println("Kuyruk 1'den gelen mesaj alındı = " + msg);
}
// Bir dinleyici tanımlayın ve hangi kuyruğu dinleyeceğinizi queues parametresini kullanarak belirtin
@RabbitListener(queues = "tizi365.fanout.queue2")
public void receive2(String msg) {
System.out.println("Kuyruk 2'den gelen mesaj alındı = " + msg);
}
}
Daha önceki örnek, fanout türünde bir takas olarak tanımlanmıştı, bu nedenle her mesaj mevcut takasa bağlı tüm kuyruklara dağıtılacak ve mesajlar yukarıdaki iki yöntem tarafından ayrı ayrı işlenecektir.
Not: RabbitListener açıklaması bir sınıfa veya metoda uygulanabilir. Eğer RabbitListener açıklaması bir sınıfa uygulanmışsa, mesaj işleme hangi sınıf yönteminin yürütüleceğini belirtmek için RabbitHandler açıklaması ile birleştirilmelidir.
4.3 Tüm Açıklama İle Kuyruk Dinleyicileri Tanımlama
Takas, kuyruk ve bağlama ilişkilerini tanımlamak için spring boot yapılandırma sınıfına ihtiyacınız yoktur. RabbitListener açıklamasının bindings parametresi aracılığıyla doğrudan bağlama ilişkilerini, kuyrukları ve takasları tanımlayabilirsiniz.
@RabbitListener(
bindings = {
@QueueBinding(
value = @Queue(name = "tizi365.fanout.queue3", durable = "true"),
exchange = @Exchange(name = "tizi365.fanout", durable = "true", type = ExchangeTypes.FANOUT)
)
}
)
public void receive3(String msg) {
System.out.println("Kuyruk 3'ten gelen mesaj alındı = " + msg);
}
Açıklama:
- QueueBinding açıklaması: Kuyruk ile takas arasındaki bağlama ilişkisini tanımlar. value parametresi kuyruğu tanımlamak için kullanılır ve exchange takası tanımlamak için kullanılır.
- Queue açıklaması: Bir kuyruğu tanımlar. name parametresi kuyruk adını tanımlar (benzersiz olması gerekir) ve durable parametresi kuyruğun kalıcı olup olmayacağını belirtir.
- Exchange açıklaması: Bir takası tanımlar. name parametresi takas adını tanımlar, type parametresi takas türünü belirtir.