การเผยแพร่และการสับเสริมรูปแบบ (โหมดการกระจายข่าวแบบยอดนิยม, โหมดแฟนเอาท์)

ประเภทแลกเปลี่ยน FanoutExchange ใช้สำหรับรูปแบบการเผยแพร่และสับเสริมใน RabbitMQ โดยที่ข้อความที่ส่งโดยโปรดิวเซอร์จะถูกประมวลผลโดยคิวคอนซูมเมอร์หลายรายการ โครงสร้างที่แสดงในไดอะแกรมต่อไปนี้:

โหมดการทำงานของ RabbitMQ

Fanout exchange สามารถส่งข้อความไปยังคิวทั้งหมดที่ผูกเข้ากับมัน

คำแนะนำ: ไม่ว่าจะใช้โหมดการทำงานของ RabbitMQ ใด ๆ การต่างกันอยู่ที่ประเภทแลกเปลี่ยนและพารามิเตอร์การเดินทางที่ใช้

1. บทชี้สูตรขั้นต้น

โปรดอ่านส่วนต่อไปนี้ก่อนเพื่อเข้าใจความรู้ที่เกี่ยวข้อง:

2. กำหนดแลกเปลี่ยนแบบแฟนเอาท์

ใน Spring AMQP, คลาส FanoutExchange สอดคล้องกับแลกเปลี่ยนแบบแฟนเอาท์ และเรากำหนดแลกเปลี่ยนผ่านคลาสการกำหนด Spring Boot

package com.tizi365.rabbitmq.config;

import org.springframework.amqp.core.FanoutExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class QueueConfig {
	// กำหนดแลกเปลี่ยน
    @Bean
    public FanoutExchange fanout() {
        // พารามิเตอร์คือชื่อแลกเปลี่ยนซึ่งต้องไม่ซ้ำกัน
        return new FanoutExchange("tizi365.fanout");
    }
}

คำแนะนำ: ผลิตและผู้บริโภคข้อความต้องใช้แลกเปลี่ยนเท่านั้น

3. ส่งข้อความ

เราส่งข้อความไปยังแลกเปลี่ยนซึ่งจะส่งข้อความไปที่คิวที่เกี่ยวข้องโดยฐานกฎเส้นทางการเดินทาง

package com.tizi365.rabbitmq.service;

import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;

@Service
public class SendService {
    @Autowired
    private RabbitTemplate template;
    @Autowired
    private FanoutExchange fanout;

	// เพื่อเป็นการสาธิต เราใช้งานงานที่ตารางได้เพื่อส่งข้อความทุกวินาที
    @Scheduled(fixedDelay = 1000, initialDelay = 1000)
    public void send() {
        // เนื้อหาข้อความ
        String message = "สวัสดี โลก!";
        // ส่งข้อความ
        // พารามิเตอร์แรกคือชื่อแลกเปลี่ยน
        // พารามิเตอร์ที่สองคือเส้นทางการเดินทาง; แลกเปลี่ยนแบบแฟนเอาท์จะละเว้นเส้นทางการเดินทาง ดังนั้นไม่จำเป็นต้องตั้งค่า
        // พารามิเตอร์ที่สามคือเนื้อหาข้อความ ซึ่งสนับสนุนประเภทใดก็ได้เมื่อสามารถครุศาสตร์ได้
        template.convertAndSend(fanout.getName(), "", message);
        System.out.println("ส่งข้อความ: '" + message + "'");
    }
}

4. รับข้อความ

4.1 กำหนดคิวและผูกแลกเปลี่ยน

เพื่อรับข้อความในคิว คุณจำเป็นต้องกำหนดคิวก่อนแล้วผูกคิวไปที่แลกเปลี่ยนเป้าหมาย ด้านล่างนี้เรากำหนดคิวสองคิวและผูกทั้งสองคิวไปที่แลกเปลี่ยนเดียวกัน

package com.tizi365.rabbitmq.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class QueueConfig {
    @Bean
    public FanoutExchange fanout() {
        // กำหนดแลกเปลี่ยน
        // พารามิเตอร์คือชื่อแลกเปลี่ยนซึ่งต้องไม่ซ้ำกัน
        return new FanoutExchange("tizi365.fanout");
    }

    @Bean
    public Queue queue1() {
        // กำหนดคิว 1
        return new Queue("tizi365.fanout.queue1");
    }

    @Bean
    public Queue queue2() {
        // กำหนดคิว 2
        return new Queue("tizi365.fanout.queue2");
    }

    @Bean
    public Binding binding1(FanoutExchange fanout, Queue queue1) {
        // กำหนดความสัมพันธ์ผูกเข้าดำเนินการ ที่ผูกคิว 1 ไปที่แลกเปลี่ยนแบบแฟนเอาท์
        return BindingBuilder.bind(queue1).to(fanout);
    }

    @Bean
    public Binding binding2(FanoutExchange fanout, Queue queue2) {
        // กำหนดความสัมพันธ์ผูกเข้าดำเนินการ ที่ผูกคิว 2 ไปที่แลกเปลี่ยนแบบแฟนเอาท์
        return BindingBuilder.bind(queue2).to(fanout);
    }
}

4.2 กำหนดผู้ฟังคิว

ใช้การประกาศ RabbitListener annotation เพื่อกำหนดผู้ฟังข้อความผ่านคิวที่เฉพาะเจาะจง

package com.tizi365.rabbitmq.listener;

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

// ทำให้คลาสปัจจุบันถูกจัดการโดย Spring
@Component
public class DemoListener {
    // กำหนดผู้ฟัง และระบุว่าจะฟังคิวไหนโดยใช้พารามิเตอร์ queues
    @RabbitListener(queues = "tizi365.fanout.queue1")
    public void receive1(String msg) {
        System.out.println("ได้รับข้อความจากคิว 1 = " + msg);
    }

    // กำหนดผู้ฟัง และระบุว่าจะฟังคิวไหนโดยใช้พารามิเตอร์ queues
    @RabbitListener(queues = "tizi365.fanout.queue2")
    public void receive2(String msg) {
        System.out.println("ได้รับข้อความจากคิว 2 = " + msg);
    }
}

เนื่องจากการแลกเปลี่ยนก่อนหน้าได้ถูกกำหนดให้เป็นประเภท fanout แต่ละข้อความจะถูกกระจายไปยังคิวทั้งหมดที่ผูกกับแลกเชนปัจจุบัน และข้อความจะถูกจัดการโดยวิธีดังกล่าวข้างต้นอิงหลักโดยต่างหาก

หมายเหตุ: ตัวทำเครื่องหมาย RabbitListener annotation สามารถถูกนำไปใช้กับคลาสหรือเมดอด ถ้าตัวทำเครื่องหมาย RabbitListener ถูกกำหนดไว้บนคลาส มันจำเป็นต้องถูกผสมกับการทำเครื่องหมาย RabbitHandler ผูกกับวิธีของคลาสแบบไหนที่จะดำเนินการจัดการข้อความ

4.3 กำหนดผู้ฟังคิวด้วย Full Annotation

คุณไม่จำเป็นต้องสร้างคลาสกำหนด spring boot แล้วกำหนดการทำงานของแลกเชน คิว และความสัมพันธ์การผูกของมัน

คุณสามารถกำหนดความสัมพันธ์การผูกได้โดยตรง คิว แลกเชน ผ่านผู้ฟังโดยการใช้งานพารามิเตอร์ของการผูกของ RabbitListener annotation.

@RabbitListener(
            bindings = {
                    @QueueBinding(
                            value = @Queue(name = "tizi365.fanout.queue3", durable = "true"),
                            exchange = @Exchange(name = "tizi365.fanout", durable = "true", type = ExchangeTypes.FANOUT)
                    )
            }
    )
public void receive3(String msg) {
    System.out.println("ได้รับข้อความจากคิว 3 = " + msg);
}

คำอธิบาย:

  • การใช้งานคุณลักษณะ QueueBinding annotation: กำหนดความสัมพันธ์การผูกระหว่างคิวและแลกเชน พารามิเตอร์ value ถูกใช้เพื่อกำหนดคิวและแลกเชนถูกใช้เพื่อกำหนดแลกเชน
  • การใช้งานคุณลักษณะ Queue annotation: กำหนดคิว พารามิเตอร์ name กำหนดชื่อคิว (ที่จำเป็นต้องเป็นระเบียบ) และพารามิเตอร์ durable ระบุว่าจำเป็นต้องจะทนทานหรือไม่
  • การใช้งานคุณลักษณะ Exchange annotation: กำหนดแลกเชน พารามิเตอร์ name กำหนดชื่อแลกเชน และพารามิเตอร์ type ระบุประเภทแลกเชน