الگوی انتشار / اشتراک (حالت پخش، حالت فن آوت) در RabbitMQ با استفاده از جاوا
نوع تبادل FanoutExchange برای الگوی انتشار / اشتراک در RabbitMQ استفاده میشود که در آن یک پیام ارسالی توسط یک تولید کننده توسط چند صف مصرفکننده پردازش میشود. معماری به شکل دیاگرام زیر نشان داده شده است:
تبادل Fanout پیامها را به تمام صفهای متصل فرستاده میشود.
نکته: بدون توجه به حالت کار RabbitMQ استفاده شده، تفاوت در نوع تبادل و پارامترهای مسیریابی استفاده شده است.
1. آموزشهای پیشنیاز
لطفاً ابتدا بخشهای زیر را بخوانید تا دانش مرتبط را فهمید:
- مفاهیم اصلی RabbitMQ
- الگوی انتشار / اشتراک RabbitMQ
- راهنمای شروع سریع جاوا RabbitMQ (اجباری است؛ زیرا بخشهای بعدی کد را تکرار نمیکنند، فقط بخشهای کلیدی کد را نشان میدهند)
2. تعریف تبادل Fanout
در Spring AMQP، کلاس FanoutExchange متناظر با تبادل Fanout است. ما تبادل را از طریق یک کلاس پیکربندی Spring Boot تعریف میکنیم.
package com.tizi365.rabbitmq.config;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.context.annotation.Bean;
importorg.springframework.context.annotation.Configuration;
@Configuration
public class QueueConfig {
// تعریف تبادل
@Bean
public FanoutExchange fanout() {
// پارامتر نام تبادل است که باید یکتا باشد
return new FanoutExchange("tizi365.fanout");
}
}
نکته: هر دو تولید کننده و مصرفکننده پیام نیاز به یک تبادل دارند.
3. ارسال پیامها
ما پیامها را به تبادل ارسال میکنیم، که پیامها را براساس قوانین مسیردهی به صفهای متناظر ارسال میکند.
package com.tizi365.rabbitmq.service;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
@Service
public class SendService {
@Autowired
private RabbitTemplate template;
@Autowired
private FanoutExchange fanout;
// به منظور نمایش، یک وظیفه زمانبندی شده برای ارسال پیام هر ثانیه استفاده میشود
@Scheduled(fixedDelay = 1000, initialDelay = 1000)
public void send() {
// محتوای پیام
String message = "سلام دنیا!";
// ارسال پیام
// پارامتر اول نام تبادل است
// پارامتر دوم کلید مسیریابی است؛ تبادل fanout کلید مسیریابی را نادیده میگیرد بنابراین لازم نیست تنظیم شود
// پارامتر سوم محتوای پیام است که هر نوعی را پشتیبانی میکند تا زمانی که بتواند سریالیسازی شود
template.convertAndSend(fanout.getName(), "", message);
System.out.println("پیام ارسال شد: '" + message + "'");
}
}
4. دریافت پیامها
4.1 تعریف صفها و اتصال تبادل
برای مصرف پیامهای صف، ابتدا باید یک صف تعریف کنید و سپس صف را به تبادل مقصد متصل کنید. در زیر، ما دو صف تعریف کرده و آنها را به همان تبادل متصل میکنیم.
package com.tizi365.rabbitmq.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class QueueConfig {
@Bean
public FanoutExchange fanout() {
// تعریف تبادل
// پارامتر نام تبادل که باید یکتا باشد
return new FanoutExchange("tizi365.fanout");
}
@Bean
public Queue queue1() {
// تعریف صف ۱
return new Queue("tizi365.fanout.queue1");
}
@Bean
public Queue queue2() {
// تعریف صف ۲
return new Queue("tizi365.fanout.queue2");
}
@Bean
public Binding binding1(FanoutExchange fanout, Queue queue1) {
// تعریف ارتباط اتصال برای اتصال صف ۱ به تبادل fanout
return BindingBuilder.bind(queue1).to(fanout);
}
@Bean
public Binding binding2(FanoutExchange fanout, Queue queue2) {
// تعریف ارتباط اتصال برای اتصال صف ۲ به تبادل fanout
return BindingBuilder.bind(queue2).to(fanout);
}
}
4.2 تعریف گوشکنندگان صف
با استفاده از انوتیشن RabbitListener، گوشکنندگان پیام را برای مصرف پیامها از صفهای خاص تعریف کنید.
package com.tizi365.rabbitmq.listener;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
// کلاس فعلی را توسط اسپرینگ مدیریت کنید
@Component
public class DemoListener {
// تعریف یک گوشکننده و مشخص کردن صف مورد نظر برای گوشکردن از طریق پارامترهای صفوف
@RabbitListener(queues = "tizi365.fanout.queue1")
public void receive1(String msg) {
System.out.println("پیام از صف 1 دریافت شد = " + msg);
}
// تعریف یک گوشکننده و مشخص کردن صف مورد نظر برای گوشکردن از طریق پارامترهای صفوف
@RabbitListener(queues = "tizi365.fanout.queue2")
public void receive2(String msg) {
System.out.println("پیام از صف 2 دریافت شد = " + msg);
}
}
زیرا تبادل قبلی بهصورت fanout تعریف شده است، هر پیام به تمام صفوفی که به تبادل فعلی بایند شدهاند پخش میشود، و پیامها به صورت جداگانه توسط دو متد بالا پردازش میشوند.
توجه: انوتیشن RabbitListener میتواند بر روی یک کلاس یا متد اعمال شود. اگر انوتیشن RabbitListener بر روی یک کلاس تعریف شود، بهصورت همراه با انوتیشن RabbitHandler باید به متد کلاسی که پردازش پیام را انجام میدهد، نظیر داده شود.
۴.۳ تعریف گوشکنندگان صف با انوتیشن کامل
شما نیازی به تعریف کلاس پیکربندی spring boot برای تعریف تبادلات، صفوف و روابط بایند ندارید. شما میتوانید بهصورت مستقیم روابط بایندینگ، صفوف و تبادلات را از طریق پارامتر bindings انوتیشن RabbitListener تعریف کنید.
@RabbitListener(
bindings = {
@QueueBinding(
value = @Queue(name = "tizi365.fanout.queue3", durable = "true"),
exchange = @Exchange(name = "tizi365.fanout", durable = "true", type = ExchangeTypes.FANOUT)
)
}
)
public void receive3(String msg) {
System.out.println("پیام از صف 3 دریافت شد = " + msg);
}
توضیح:
- انوتیشن QueueBinding: رابطه بایندینگ بین صف و تبادل را تعریف میکند. پارامتر value برای تعریف صف استفاده شده و exchange برای تعریف تبادل استفاده میشود.
- انوتیشن Queue: یک صف را تعریف میکند. پارامتر name نام صف (که باید یکتا باشد) و پارامتر durable نشاندهنده این است که آیا نیاز به دوام است یا خیر.
- انوتیشن Exchange: یک تبادل را تعریف میکند. پارامتر name نام تبادل را تعیین میکند و پارامتر type نوع تبادل را نشان میدهد.