الگوی تاپیک RabbitMQ در جاوا، با استفاده از نوع TopicExchange، از الگوی مسیریابی (Direct) با این تفاوت است که پارامترهای مسیریابی از عملکرد تطابق فازی پشتیبانی میکنند. به دلیل این انعطاف پذیری بیشتر در تطابق مسیریابی، این الگو یک الگوی رایجتر است. معماری به صورت دیاگرام زیر نشان داده شده است:
نکته: بدون در نظر گرفتن الگوی کاری RabbitMQ که استفاده میشود، تفاوت در نوع تبادل استفاده شده و پارامترهای مسیریابی است.
1. آموزش پیشنیاز
لطفاً ابتدا این بخشها را برای درک دانش مربوطه مطالعه کنید:
- مفاهیم پایه RabbitMQ
- اصول الگوی موضوعی RabbitMQ
- شروع سریع برای جاوا با RabbitMQ (باید خوانده شود، زیرا بخشهای بعدی کد را تکرار نمیکنند، فقط کد کلیدی را نشان میدهند)
- بخش الگوی انتشار و اشتراکگذاری برای جاوا RabbitMQ (باید خوانده شود، زیرا نحوه نوشتار کد تقریباً یکسان است، فقط نوع تبادل و پارامترهای مسیردهی متفاوت است)
2. تعریف تبادل موضوعی
در Spring AMQP، کلاس متناظر با تبادل مستقیم، TopicExchange است. ما تبادل را از طریق کلاس تنظیمکننده Spring Boot تعریف میکنیم.
package com.tizi365.rabbitmq.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class QueueConfig {
@Bean
public TopicExchange topic() {
// تعریف تبادل
// پارامتر نام تبادل است که باید یکتا باشد
return new TopicExchange("tizi365.topic");
}
}
نکته: هم تولیدکنندگان پیام و هم مصرفکنندگان نیاز به تبادل دارند.
3. ارسال پیامها
ما پیامها را به تبادل ارسال میکنیم و تبادل پیامها را براساس قوانین مسیریابی به صفهای متناظر ارسال میکند.
package com.tizi365.rabbitmq.service;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
@Service
public class SendService {
@Autowired
private RabbitTemplate template;
@Autowired
private TopicExchange topic;
// برای اهداف دمو، یک وظیفه زمانبندی شده برای ارسال یک پیام در هر ثانیه استفاده شده است
@Scheduled(fixedDelay = 1000, initialDelay = 1000)
public void send() {
// محتوای پیام
String message = "Hello World!";
// ارسال پیام
// پارامتر اول نام تبادل است
// پارامتر دوم کلید مسیریابی است. تبادل موضوعی پیامها را به صفهایی ارسال میکند که با کلید مسیریابی مطابقت داشته باشند
// پارامتر سوم محتوای پیام است، از هر نوعی پشتیبانی میکند تا زمانی که بتواند سریالیسازی شود
template.convertAndSend(topic.getName(), "www.tizi365.com", message);
System.out.println("پیام ارسال شد: '" + message + "'");
}
}
نکته: به پارامتر دوم "www.tizi365.com" در متد convertAndSend توجه کنید، زیرا این پارامتر بسیار حیاتی است.
4. دریافت پیامها
۴.۱ تعریف صفها و متصل کردن تبادلها
برای مصرف پیامها از صفوف، ابتدا باید یک صف تعریف کرده و سپس آن را به تبادل مورد نظر متصل کنید. در زیر، دو صف تعریف شده و به همان تبادل متصل میشوند.
package com.tizi365.rabbitmq.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class QueueConfig {
@Bean
public TopicExchange topic() {
// تعریف تبادل
// پارامتر نام تبادل است که باید یکتا باشد
return new TopicExchange("tizi365.topic");
}
@Bean
public Queue queue1() {
// تعریف صف ۱
return new Queue("tizi365.topic.queue1");
}
@Bean
public Queue queue2() {
// تعریف صف ۲
return new Queue("tizi365.topic.queue2");
}
@Bean
public Binding binding1(TopicExchange topic, Queue queue1) {
// تعریف یک رابطه اتصال، اتصال صف ۱ به تبادل تاپیک با یک کلید مسیریگذاری: *.tizi365.com
return BindingBuilder.bind(queue1).to(topic).with("*.tizi365.com");
}
@Bean
public Binding binding2(TopicExchange topic, Queue queue2) {
// Define a binding relationship, binding queue 2 to the direct exchange with a routing key of: *.baidu.com
return BindingBuilder.bind(queue2).to(topic).with("*.baidu.com");
}
}
نکته: هنگام متصل کردن صف ۱ و صف ۲ به تبادل، کلیدهای مسیریگذاری استفاده شده هر دو از * (ستاره) نماد وحید است که با یک کلمه مطابقت میکند. اگر به # (هش) تغییر یابد، مطابقت چند کلمهای انجام خواهد شد.
۴.۲ تعریف گوشکنندگان صف
با استفاده از اعلامیه RabbitListener برای مصرف پیامها از صفهای مشخص، گوشکنندگان پیام تعریف شدهاند.
package com.tizi365.rabbitmq.listener;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;
// اجازه دهید اسپرینگ کلاس را مدیریت کند
@Component
public class DemoListener {
// تعریف یک گوشدهنده، با مشخص کردن صفی که باید به آن گوش داده شود از طریق پارامتر queues
@RabbitListener(queues = "tizi365.topic.queue1")
public void receive1(String msg) {
System.out.println("پیام از صف ۱ دریافت شد: " + msg);
}
@RabbitListener(queues = "tizi365.topic.queue2")
public void receive2(String msg) {
System.out.println("پیام از صف ۲ دریافت شد: " + msg);
}
}
زیرا تنها صف ۱ دارای کلید مسیریگذاری *.tizi365.com هنگام متصل شدن به تبادل است، پیامها با کلید مسیریگذاری (www.tizi365.com) با آن مطابقت دارند. بنابراین فقط صف ۱ قادر به دریافت پیامها خواهد بود، در حالی که صف ۲ به دلیل عدم مطابقت کلید مسیریگذاری هیچ پیامی دریافت نخواهد کرد.