نمط النشر/الاشتراك لـ RabbitMQ بنمط البث (وضع البث، وضع المروحة)

نوع تبادل FanoutExchange يُستخدم لنمط النشر/الاشتراك في RabbitMQ، حيث سيتم معالجة الرسالة المرسلة بواسطة المُنتِج من قبل عدة طوابير مُستهلكة. يُظهر الهندسة المعمارية كما هو موضح في الرسم البياني التالي:

وضع عمل RabbitMQ

يمكن أن يُعيد تبادل Fanout الرسائل إلى جميع الطوابير المُرتبطة به.

نصيحة: بغض النظر عن وضع عمل RabbitMQ المستخدم، الفارق يكمن في نوع التبادل والمعلمات الاستدعاء.

1. البرامج التعليمية الأولية

يرجى قراءة الأقسام التالية أولاً لفهم المعرفة ذات الصلة:

2. تعريف تبادل المروحة

في Spring AMQP، تُوافق فئة FanoutExchange على تبادل المروحة. نقوم بتعريف التبادل من خلال فئة تكوين Spring Boot.

package com.tizi365.rabbitmq.config;

import org.springframework.amqp.core.FanoutExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class QueueConfig {
	// يُعرف التبادل
    @Bean
    public FanoutExchange fanout() {
        // المعلمة هي اسم التبادل، والذي يجب أن يكون فريدًا
        return new FanoutExchange("tizi365.fanout");
    }
}

نصيحة: تتطلب كلاً من مُنتج ومُستهلك الرسائل تبادلًا.

3. إرسال الرسائل

نُرسل الرسائل إلى التبادل، الذي سيقوم بتسليم الرسائل إلى الطوابير المقابلة بناءً على قواعد التوجيه.

package com.tizi365.rabbitmq.service;

import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;

@Service
public class SendService {
    @Autowired
    private RabbitTemplate template;
    @Autowired
    private FanoutExchange fanout;

	// لأغراض العرض، يُستخدم مهمة مجدولة لإرسال رسالة كل ثانية
    @Scheduled(fixedDelay = 1000, initialDelay = 1000)
    public void send() {
        // محتوى الرسالة
        String message = "مرحبًا بالعالم!";
        // إرسال الرسالة
        // المُعلمة الأولى هي اسم التبادل
        // المُعلمة الثانية هي مفتاح التوجيه؛ يتجاهل تبادل المروحة مفتاح التوجيه، لذا لا يحتاج إلى الإعداد
        // المُعلمة الثالثة هي محتوى الرسالة، والذي يدعم أي نوع طالما يمكن تسلسله
        template.convertAndSend(fanout.getName(), "", message);
        System.out.println("تم إرسال الرسالة: '" + message + "'");
    }
}

4. استقبال الرسائل

4.1 تعريف الطوابير وربط التبادل

لاستهلاك رسائل الطابور، تحتاج أولاً إلى تعريف طابور ثم ربط الطابور بتبادل الهدف. فيما يلي، نُعرف طابورين ونربطهما بنفس التبادل.

package com.tizi365.rabbitmq.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class QueueConfig {
    @Bean
    public FanoutExchange fanout() {
        // تعريف التبادل
        // المعلمة هي اسم التبادل، والذي يجب أن يكون فريدًا
        return new FanoutExchange("tizi365.fanout");
    }

    @Bean
    public Queue queue1() {
        // تعريف طابور 1
        return new Queue("tizi365.fanout.queue1");
    }

    @Bean
    public Queue queue2() {
        // تعريف طابور 2
        return new Queue("tizi365.fanout.queue2");
    }

    @Bean
    public Binding binding1(FanoutExchange fanout, Queue queue1) {
        // تعريف علاقة ربط، لربط طابور 1 بتبادل المروحة
        return BindingBuilder.bind(queue1).to(fanout);
    }

    @Bean
    public Binding binding2(FanoutExchange fanout, Queue queue2) {
        // تعريف علاقة ربط، لربط طابور 2 بتبادل المروحة
        return BindingBuilder.bind(queue2).to(fanout);
    }
}

4.2 تعريف مستمعي قائمة الانتظار

يُعرف مستمعو الرسالة من خلال تعليق RabbitListener لاستهلاك الرسائل من قوائم انتظار محددة.

package com.tizi365.rabbitmq.listener;

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

// اجعل الصف الحالي يتم إدارته بواسطة Spring
@Component
public class DemoListener {
    // قم بتعريف مستمع ، وحدد أي قائمة انتظار يتم الاستماع إليها باستخدام معلمة القوائم الانتظارية
    @RabbitListener(queues = "tizi365.fanout.queue1")
    public void receive1(String msg) {
        System.out.println("تم استقبال الرسالة من قائمة الانتظار 1 = " + msg);
    }

    // قم بتعريف مستمع ، وحدد أي قائمة انتظار يتم الاستماع إليها باستخدام معلمة القوائم الانتظارية
    @RabbitListener(queues = "tizi365.fanout.queue2")
    public void receive2(String msg) {
        System.out.println("تم استقبال الرسالة من قائمة الانتظار 2 = " + msg);
    }
}

نظرًا لأن التبادل السابق تم تعريفه كنوع النشر العام ، سيتم توزيع كل رسالة على جميع قوائم الانتظار المرتبطة بالتبادل الحالي ، وسيتم التعامل مع الرسائل بشكل منفصل عن طريق الأساليب المذكورة أعلاه.

ملحوظة: يمكن تطبيق تعليق RabbitListener على صف أو طريقة. إذا تم تحديد تعليق RabbitListener على صف ، فيجب الجمع بين تعليق RabbitHandler لتحديد أي طريقة صف ستنفذ معالجة الرسالة.

4.3 تعريف مستمعي قائمة الانتظار بالتعليقات الكاملة

لا حاجة لصف التكوين في Spring Boot لتعريف التبادلات وقوائم الانتظار وعلاقات الربط. يمكنك تحديد العلاقات الربط، وقوائم الانتظار والتبادلات مباشرةً من خلال معلمات الربط في تعليق RabbitListener.

@RabbitListener(
            bindings = {
                    @QueueBinding(
                            value = @Queue(name = "tizi365.fanout.queue3", durable = "true"),
                            exchange = @Exchange(name = "tizi365.fanout", durable = "true", type = ExchangeTypes.FANOUT)
                    )
            }
    )
public void receive3(String msg) {
    System.out.println("تم استقبال الرسالة من قائمة الانتظار 3 = " + msg);
}

الشرح:

  • تعليق QueueBinding: يحدد العلاقة الربط بين القائمة الانتظار والتبادل. يتم استخدام معلمة القيمة لتعريف القائمة الانتظارية، ويتم استخدام التبادل لتعريف التبادل.
  • تعليق Queue: يحدد قائمة انتظار. معلمة الاسم تحدد اسم القائمة الانتظارية (التي يجب أن تكون فريدة)، ويشير معلمة الدائم إلى ما إذا كانت تحتاج إلى الدوام.
  • تبادل التعليق: يحدد التبادل. معلمة الاسم تحدد اسم التبادل، ويشير معلمة النوع إلى نوع التبادل.