الگوی انتشار / اشتراک (حالت پخش، حالت فن آوت) در RabbitMQ با استفاده از جاوا

نوع تبادل FanoutExchange برای الگوی انتشار / اشتراک در RabbitMQ استفاده می‌شود که در آن یک پیام ارسالی توسط یک تولید کننده توسط چند صف مصرف‌کننده پردازش می‌شود. معماری به شکل دیاگرام زیر نشان داده شده است:

حالت کار RabbitMQ

تبادل Fanout پیام‌ها را به تمام صف‌های متصل فرستاده می‌شود.

نکته: بدون توجه به حالت کار RabbitMQ استفاده شده، تفاوت در نوع تبادل و پارامترهای مسیریابی استفاده شده است.

1. آموزش‌های پیش‌نیاز

لطفاً ابتدا بخش‌های زیر را بخوانید تا دانش مرتبط را فهمید:

2. تعریف تبادل Fanout

در Spring AMQP، کلاس FanoutExchange متناظر با تبادل Fanout است. ما تبادل را از طریق یک کلاس پیکربندی Spring Boot تعریف می‌کنیم.

package com.tizi365.rabbitmq.config;

import org.springframework.amqp.core.FanoutExchange;
import org.springframework.context.annotation.Bean;
importorg.springframework.context.annotation.Configuration;

@Configuration
public class QueueConfig {
    // تعریف تبادل
    @Bean
    public FanoutExchange fanout() {
        // پارامتر نام تبادل است که باید یکتا باشد
        return new FanoutExchange("tizi365.fanout");
    }
}

نکته: هر دو تولید کننده و مصرف‌کننده پیام نیاز به یک تبادل دارند.

3. ارسال پیام‌ها

ما پیام‌ها را به تبادل ارسال می‌کنیم، که پیام‌ها را براساس قوانین مسیردهی به صف‌های متناظر ارسال می‌کند.

package com.tizi365.rabbitmq.service;

import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;

@Service
public class SendService {
    @Autowired
    private RabbitTemplate template;
    @Autowired
    private FanoutExchange fanout;

    // به منظور نمایش، یک وظیفه زمانبندی شده برای ارسال پیام هر ثانیه استفاده می‌شود
    @Scheduled(fixedDelay = 1000, initialDelay = 1000)
    public void send() {
        // محتوای پیام
        String message = "سلام دنیا!";
        // ارسال پیام
        // پارامتر اول نام تبادل است
        // پارامتر دوم کلید مسیریابی است؛ تبادل fanout کلید مسیریابی را نادیده می‌گیرد بنابراین لازم نیست تنظیم شود
        // پارامتر سوم محتوای پیام است که هر نوعی را پشتیبانی می‌کند تا زمانی که بتواند سریالی‌سازی شود
        template.convertAndSend(fanout.getName(), "", message);
        System.out.println("پیام ارسال شد: '" + message + "'");
    }
}

4. دریافت پیام‌ها

4.1 تعریف صف‌ها و اتصال تبادل

برای مصرف پیام‌های صف، ابتدا باید یک صف تعریف کنید و سپس صف را به تبادل مقصد متصل کنید. در زیر، ما دو صف تعریف کرده و آن‌ها را به همان تبادل متصل می‌کنیم.

package com.tizi365.rabbitmq.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class QueueConfig {
    @Bean
    public FanoutExchange fanout() {
        // تعریف تبادل
        // پارامتر نام تبادل که باید یکتا باشد
        return new FanoutExchange("tizi365.fanout");
    }

    @Bean
    public Queue queue1() {
        // تعریف صف ۱
        return new Queue("tizi365.fanout.queue1");
    }

    @Bean
    public Queue queue2() {
        // تعریف صف ۲
        return new Queue("tizi365.fanout.queue2");
    }

    @Bean
    public Binding binding1(FanoutExchange fanout, Queue queue1) {
        // تعریف ارتباط اتصال برای اتصال صف ۱ به تبادل fanout
        return BindingBuilder.bind(queue1).to(fanout);
    }

    @Bean
    public Binding binding2(FanoutExchange fanout, Queue queue2) {
        // تعریف ارتباط اتصال برای اتصال صف ۲ به تبادل fanout
        return BindingBuilder.bind(queue2).to(fanout);
    }
}

4.2 تعریف گوش‌کنندگان صف

با استفاده از انوتیشن RabbitListener، گوش‌کنندگان پیام را برای مصرف پیام‌ها از صف‌های خاص تعریف کنید.

package com.tizi365.rabbitmq.listener;

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

// کلاس فعلی را توسط اسپرینگ مدیریت کنید
@Component
public class DemoListener {
    // تعریف یک گوش‌کننده و مشخص کردن صف مورد نظر برای گوش‍کردن از طریق پارامترهای صفوف
    @RabbitListener(queues = "tizi365.fanout.queue1")
    public void receive1(String msg) {
        System.out.println("پیام از صف 1 دریافت شد = " + msg);
    }

    // تعریف یک گوش‌کننده و مشخص کردن صف مورد نظر برای گوش‍کردن از طریق پارامترهای صفوف
    @RabbitListener(queues = "tizi365.fanout.queue2")
    public void receive2(String msg) {
        System.out.println("پیام از صف 2 دریافت شد = " + msg);
    }
}

زیرا تبادل قبلی به‌صورت fanout تعریف شده است، هر پیام به تمام صفوفی که به تبادل فعلی بایند شده‌اند پخش می‌شود، و پیام‌ها به صورت جداگانه توسط دو متد بالا پردازش می‌شوند.

توجه: انوتیشن RabbitListener می‌تواند بر روی یک کلاس یا متد اعمال شود. اگر انوتیشن Rabbit‌Listener بر روی یک کلاس تعریف شود، به‌صورت همراه با انوتیشن RabbitHandler باید به متد کلاسی که پردازش پیام را انجام می‌دهد، نظیر داده شود.

۴.۳ تعریف گوش‌کنندگان صف با انوتیشن کامل

شما نیازی به تعریف کلاس پیکربندی spring boot برای تعریف تبادلات، صفوف و روابط بایند ندارید. شما می‌توانید به‌صورت مستقیم روابط بایندینگ، صفوف و تبادلات را از طریق پارامتر bindings انوتیشن RabbitListener تعریف کنید.

@RabbitListener(
            bindings = {
                    @QueueBinding(
                            value = @Queue(name = "tizi365.fanout.queue3", durable = "true"),
                            exchange = @Exchange(name = "tizi365.fanout", durable = "true", type = ExchangeTypes.FANOUT)
                    )
            }
    )
public void receive3(String msg) {
    System.out.println("پیام از صف 3 دریافت شد = " + msg);
}

توضیح:

  • انوتیشن QueueBinding: رابطه بایندینگ بین صف و تبادل را تعریف می‌کند. پارامتر value برای تعریف صف استفاده شده و exchange برای تعریف تبادل استفاده می‌شود.
  • انوتیشن Queue: یک صف را تعریف می‌کند. پارامتر name نام صف (که باید یکتا باشد) و پارامتر durable نشان‌دهنده این است که آیا نیاز به دوام است یا خیر.
  • انوتیشن Exchange: یک تبادل را تعریف می‌کند. پارامتر name نام تبادل را تعیین می‌کند و پارامتر type نوع تبادل را نشان می‌دهد.