รูปแบบการเริ่มต้นของ RabbitMQ (โหมด Direct) ใน Java ใช้ประเภทของ exchange ที่เรียกว่า DirectExchange ความแตกต่างจากโครงสร้างการแพร่ส่งคือ exchange แบบ Direct ส่งข้อความไปยังคิวด้วยพารามิเตอร์ของการเสร็จสมบูรณ์ โครงสร้างอยู่ตามที่แสดงในรูปภาพด้านล่าง:
คำแนะนำ: ไมว่าจะใช้โหมดการทำงานของ RabbitMQ แบบไหน ความแตกต่างอยู่ที่ประเภท exchange ที่ใช้และพารามิเตอร์การเสร็จสมบูรณ์
1. บทเรียนที่ต้องการก่อน
โปรดอ่านส่วนต่อไปนี้เพื่อเข้าใจความรู้ที่เกี่ยวข้อง:
- คอนเซปพื้นฐานของ RabbitMQ
- หลักการโหมดการเชื่อมต่อของ RabbitMQ
- การเริ่มต้นด่วนสำหรับ Java กับ RabbitMQ (ต้องอ่าน เนื่องจากรองหลังไม่ทำซ้ำโค้ด แค่แสดงโค้ดสำคัญเท่านั้น)
- รูปแบบการเผยแพร่สำหรับ Java กับ RabbitMQ (ต้องอ่าน เนื่องจากโครงสร้างโค้ดเหมือนกัน เพียงแต่ประเภทของ exchange และพารามิเตอร์การเสร็จสมบูรณ์แตกต่าง)
2. กำหนด Exchange แบบ Direct
ใน Spring AMQP คลาสที่สอดคล้องกับ Exchange แบบ Direct คือ DirectExchange เรากำหนด exchange ผ่านคลาสการกำหนด Spring Boot
package com.tizi365.rabbitmq.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class QueueConfig {
@Bean
public DirectExchange direct() {
// กำหนด exchange
// พารามิเตอร์คือชื่อ exchange ซึ่งต้องเป็นที่แตกต่างกัน
return new DirectExchange("tizi365.direct");
}
}
คำแนะนำ: ทั้งผู้ผลิตและผู้บริโภคข้อความต้องใช้ exchange
3. ส่งข้อความ
เราส่งข้อความไปยัง exchange และจากนั้นจะส่งข้อความไปยังคิวที่เกี่ยวข้องตามกฎการเสร็จสมบูรณ์
package com.tizi365.rabbitmq.service;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
@Service
public class SendService {
@Autowired
private RabbitTemplate template;
@Autowired
private DirectExchange direct;
// เพื่อสาธิตเราใช้งานการทำงานตามกำหนดเวลาเพื่อส่งข้อความทุกวินาที
@Scheduled(fixedDelay = 1000, initialDelay = 1000)
public void send() {
// เนื้อหาข้อความ
String message = "สวัสดีชาวโลก!";
// ส่งข้อความ
// พารามิเตอร์ที่หนึ่งคือชื่อ exchange
// พารามิเตอร์ที่สองคือ routing key กับ exchange แบบ direct ข้อความจะถูกส่งไปยังคิวที่ routing key ตรงกับ "tizi365"
// พารามิเตอร์ที่สามคือเนื้อหาข้อความ สนับสนุนประเภทใดก็ได้ก็ต่อเมื่อสามารถถูก serialize
template.convertAndSend(direct.getName(), "tizi365", message);
System.out.println("ส่งข้อความ '" + message + "'");
}
}
คำแนะนำ: ให้ใส่ใจที่พารามิเตอร์ที่สองในเมทอด convertAndSend เนื่องจากเป็นพารามิเตอร์ที่สำคัญ
4. รับข้อความ
4.1 กำหนดคิวและผูกแลกเชน
เพื่อบริโภคข้อความจากคิว คุณต้องกำหนดคิวก่อน และผูกคิวกับแลกเชนเป้าหมาย โค้ดต่อไปนี้กำหนดคิวสองอันและผูกพวกเขากับแลกเชนเดียวกัน:
package com.tizi365.rabbitmq.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class QueueConfig {
@Bean
public DirectExchange direct() {
// กำหนดแลกเชน
// พารามิเตอร์คือชื่อแลกเชนซึ่งต้องไม่ซ้ำกัน
return new DirectExchange("tizi365.direct");
}
@Bean
public Queue queue1() {
// กำหนดคิว 1
return new Queue("tizi365.direct.queue1");
}
@Bean
public Queue queue2() {
// กำหนดคิว 2
return new Queue("tizi365.direct.queue2");
}
@Bean
public Binding binding1(DirectExchange direct, Queue queue1) {
// กำหนดการผูกเพื่อผูกคิว 1 กับแลกเชนตรง ด้วยรูทติ้งคีย์ของ "tizi365"
// เมื่อรูทติ้งคีย์ตรงกับ "tizi365" แลกเชนจะส่งข้อความไปยังคิว 1
return BindingBuilder.bind(queue1).to(direct).with("tizi365");
}
@Bean
public Binding binding2(DirectExchange direct, Queue queue2) {
// กำหนดการผูกเพื่อผูกคิว 2 กับแลกเชนตรง ด้วยรูทติ้งคีย์ของ "baidu"
// เมื่อรูทติ้งคีย์ตรงกับ "baidu" แลกเชนจะส่งข้อความไปยังคิว 2
return BindingBuilder.bind(queue2).to(direct).with("baidu");
}
}
4.2 กำหนดผู้ฟังค์ชันของคิว
กำหนดผู้ฟังค์ชันข้อความโดยใช้การประกาศ RabbitListener เพื่อบริโภคข้อความจากคิวที่ระบุ:
package com.tizi365.rabbitmq.listener;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;
// ให้ Spring จัดการคลาสปัจจุบัน
@Component
public class DemoListener {
// กำหนดผู้ฟังค์ชันเพื่อบริโภคข้อความจากคิว 1
@RabbitListener(queues = "tizi365.direct.queue1")
public void receive1(String msg) {
System.out.println("ได้รับข้อความจากคิว 1: " + msg);
}
// กำหนดผู้ฟังค์ชันเพื่อบริโภคข้อความจากคิว 2
@RabbitListener(queues = "tizi365.direct.queue2")
public void receive2(String msg) {
System.out.println("ได้รับข้อความจากคิว 2: " + msg);
}
}
เนื่องจากมีเพียงคิว 1 ที่มีรูทติ้งคีย์เป็น "tizi365" เมื่อผูกกับแลกเชน จึงเพียงคิว 1 เท่านั้นที่สามารถรับข้อความได้ คิว 2 เนื่องจากรูทติ้งคีย์ไม่ตรงกันจะไม่ได้รับข้อความใด ๆ