การเผยแพร่และการสับเสริมรูปแบบ (โหมดการกระจายข่าวแบบยอดนิยม, โหมดแฟนเอาท์)
ประเภทแลกเปลี่ยน FanoutExchange ใช้สำหรับรูปแบบการเผยแพร่และสับเสริมใน RabbitMQ โดยที่ข้อความที่ส่งโดยโปรดิวเซอร์จะถูกประมวลผลโดยคิวคอนซูมเมอร์หลายรายการ โครงสร้างที่แสดงในไดอะแกรมต่อไปนี้:
Fanout exchange สามารถส่งข้อความไปยังคิวทั้งหมดที่ผูกเข้ากับมัน
คำแนะนำ: ไม่ว่าจะใช้โหมดการทำงานของ RabbitMQ ใด ๆ การต่างกันอยู่ที่ประเภทแลกเปลี่ยนและพารามิเตอร์การเดินทางที่ใช้
1. บทชี้สูตรขั้นต้น
โปรดอ่านส่วนต่อไปนี้ก่อนเพื่อเข้าใจความรู้ที่เกี่ยวข้อง:
- แนวคิดพื้นฐานของ RabbitMQ
- รูปแบบการเผยแพร่และสับเสริมของ RabbitMQ
- คู่มือเริ่มต้นด้วย Java ของ RabbitMQ (บังคับ, เนื่องจากส่วนถัดไปจะไม่ทำซ้ำโค้ด เพียงแค่แสดงตัวอย่างโค้ดสำคัญเท่านั้น)
2. กำหนดแลกเปลี่ยนแบบแฟนเอาท์
ใน Spring AMQP, คลาส FanoutExchange สอดคล้องกับแลกเปลี่ยนแบบแฟนเอาท์ และเรากำหนดแลกเปลี่ยนผ่านคลาสการกำหนด Spring Boot
package com.tizi365.rabbitmq.config;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class QueueConfig {
// กำหนดแลกเปลี่ยน
@Bean
public FanoutExchange fanout() {
// พารามิเตอร์คือชื่อแลกเปลี่ยนซึ่งต้องไม่ซ้ำกัน
return new FanoutExchange("tizi365.fanout");
}
}
คำแนะนำ: ผลิตและผู้บริโภคข้อความต้องใช้แลกเปลี่ยนเท่านั้น
3. ส่งข้อความ
เราส่งข้อความไปยังแลกเปลี่ยนซึ่งจะส่งข้อความไปที่คิวที่เกี่ยวข้องโดยฐานกฎเส้นทางการเดินทาง
package com.tizi365.rabbitmq.service;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
@Service
public class SendService {
@Autowired
private RabbitTemplate template;
@Autowired
private FanoutExchange fanout;
// เพื่อเป็นการสาธิต เราใช้งานงานที่ตารางได้เพื่อส่งข้อความทุกวินาที
@Scheduled(fixedDelay = 1000, initialDelay = 1000)
public void send() {
// เนื้อหาข้อความ
String message = "สวัสดี โลก!";
// ส่งข้อความ
// พารามิเตอร์แรกคือชื่อแลกเปลี่ยน
// พารามิเตอร์ที่สองคือเส้นทางการเดินทาง; แลกเปลี่ยนแบบแฟนเอาท์จะละเว้นเส้นทางการเดินทาง ดังนั้นไม่จำเป็นต้องตั้งค่า
// พารามิเตอร์ที่สามคือเนื้อหาข้อความ ซึ่งสนับสนุนประเภทใดก็ได้เมื่อสามารถครุศาสตร์ได้
template.convertAndSend(fanout.getName(), "", message);
System.out.println("ส่งข้อความ: '" + message + "'");
}
}
4. รับข้อความ
4.1 กำหนดคิวและผูกแลกเปลี่ยน
เพื่อรับข้อความในคิว คุณจำเป็นต้องกำหนดคิวก่อนแล้วผูกคิวไปที่แลกเปลี่ยนเป้าหมาย ด้านล่างนี้เรากำหนดคิวสองคิวและผูกทั้งสองคิวไปที่แลกเปลี่ยนเดียวกัน
package com.tizi365.rabbitmq.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class QueueConfig {
@Bean
public FanoutExchange fanout() {
// กำหนดแลกเปลี่ยน
// พารามิเตอร์คือชื่อแลกเปลี่ยนซึ่งต้องไม่ซ้ำกัน
return new FanoutExchange("tizi365.fanout");
}
@Bean
public Queue queue1() {
// กำหนดคิว 1
return new Queue("tizi365.fanout.queue1");
}
@Bean
public Queue queue2() {
// กำหนดคิว 2
return new Queue("tizi365.fanout.queue2");
}
@Bean
public Binding binding1(FanoutExchange fanout, Queue queue1) {
// กำหนดความสัมพันธ์ผูกเข้าดำเนินการ ที่ผูกคิว 1 ไปที่แลกเปลี่ยนแบบแฟนเอาท์
return BindingBuilder.bind(queue1).to(fanout);
}
@Bean
public Binding binding2(FanoutExchange fanout, Queue queue2) {
// กำหนดความสัมพันธ์ผูกเข้าดำเนินการ ที่ผูกคิว 2 ไปที่แลกเปลี่ยนแบบแฟนเอาท์
return BindingBuilder.bind(queue2).to(fanout);
}
}
4.2 กำหนดผู้ฟังคิว
ใช้การประกาศ RabbitListener annotation เพื่อกำหนดผู้ฟังข้อความผ่านคิวที่เฉพาะเจาะจง
package com.tizi365.rabbitmq.listener;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
// ทำให้คลาสปัจจุบันถูกจัดการโดย Spring
@Component
public class DemoListener {
// กำหนดผู้ฟัง และระบุว่าจะฟังคิวไหนโดยใช้พารามิเตอร์ queues
@RabbitListener(queues = "tizi365.fanout.queue1")
public void receive1(String msg) {
System.out.println("ได้รับข้อความจากคิว 1 = " + msg);
}
// กำหนดผู้ฟัง และระบุว่าจะฟังคิวไหนโดยใช้พารามิเตอร์ queues
@RabbitListener(queues = "tizi365.fanout.queue2")
public void receive2(String msg) {
System.out.println("ได้รับข้อความจากคิว 2 = " + msg);
}
}
เนื่องจากการแลกเปลี่ยนก่อนหน้าได้ถูกกำหนดให้เป็นประเภท fanout แต่ละข้อความจะถูกกระจายไปยังคิวทั้งหมดที่ผูกกับแลกเชนปัจจุบัน และข้อความจะถูกจัดการโดยวิธีดังกล่าวข้างต้นอิงหลักโดยต่างหาก
หมายเหตุ: ตัวทำเครื่องหมาย RabbitListener annotation สามารถถูกนำไปใช้กับคลาสหรือเมดอด ถ้าตัวทำเครื่องหมาย RabbitListener ถูกกำหนดไว้บนคลาส มันจำเป็นต้องถูกผสมกับการทำเครื่องหมาย RabbitHandler ผูกกับวิธีของคลาสแบบไหนที่จะดำเนินการจัดการข้อความ
4.3 กำหนดผู้ฟังคิวด้วย Full Annotation
คุณไม่จำเป็นต้องสร้างคลาสกำหนด spring boot แล้วกำหนดการทำงานของแลกเชน คิว และความสัมพันธ์การผูกของมัน
คุณสามารถกำหนดความสัมพันธ์การผูกได้โดยตรง คิว แลกเชน ผ่านผู้ฟังโดยการใช้งานพารามิเตอร์ของการผูกของ RabbitListener annotation.
@RabbitListener(
bindings = {
@QueueBinding(
value = @Queue(name = "tizi365.fanout.queue3", durable = "true"),
exchange = @Exchange(name = "tizi365.fanout", durable = "true", type = ExchangeTypes.FANOUT)
)
}
)
public void receive3(String msg) {
System.out.println("ได้รับข้อความจากคิว 3 = " + msg);
}
คำอธิบาย:
- การใช้งานคุณลักษณะ QueueBinding annotation: กำหนดความสัมพันธ์การผูกระหว่างคิวและแลกเชน พารามิเตอร์ value ถูกใช้เพื่อกำหนดคิวและแลกเชนถูกใช้เพื่อกำหนดแลกเชน
- การใช้งานคุณลักษณะ Queue annotation: กำหนดคิว พารามิเตอร์ name กำหนดชื่อคิว (ที่จำเป็นต้องเป็นระเบียบ) และพารามิเตอร์ durable ระบุว่าจำเป็นต้องจะทนทานหรือไม่
- การใช้งานคุณลักษณะ Exchange annotation: กำหนดแลกเชน พารามิเตอร์ name กำหนดชื่อแลกเชน และพารามิเตอร์ type ระบุประเภทแลกเชน