RabbitMQ ไม่รองรับการส่ง delayed messages โดยตรง ปัจจุบันการ implement หลักๆ รวมถึงการใช้ dead-letter exchange + message TTL scheme หรือใช้ plugin ชื่อ rabbitmq-delayed-message-exchange มากที่สุด
การใช้งานสำหรับ delayed queues
- กรณีที่ต้องการกำหนดระยะเวลาสำหรับการผลิตและบริโภคข้อความ ตัวอย่างเช่นในธุรกิจออนไลน์ มีกรณีที่การสั่งซื้อจะถูกปิดถ้าการชำระเงินไม่สำเร็จภายในระยะเวลาที่กำหนด ที่นี่ข้อความ delayed จะถูกส่งเมื่อการสั่งซื้อถูกสร้าง ข้อความนี้จะถูกส่งไปยังผู้บริโภค 30 นาทีต่อมา และผู้บริโภคจำเป็นต้องตรวจสอบว่าการสั่งซื้อที่เกี่ยวข้องได้รับการชำระเงินหรือไม่ ถ้าการชำระเงินไม่สำเร็จ การสั่งซื้อจะถูกปิด ถ้าการชำระเงินสำเร็จ ข้อความนี้จะถูกละเลย
- กรณีที่กิจกรรมที่เป็น delayed ถูกเรียกใช้โดยข้อความ ตัวอย่างเช่น การส่งข้อความเตือนแก่ผู้ใช้หลังจากระยะเวลาที่กำหนด
วิธีการใช้ dead-letter exchange + message TTL scheme
หลักการหลักของวิธีนี้คือการสร้างคิวโดยไม่มีผู้บริโภคและใช้เวลาหมดอายุของข้อความ (TTL) เมื่อข้อความหมดอายุ มันก็กลายเป็น dead letter ซึ่งจะถูกส่งต่อไปสู่ dead-letter exchange แล้วไปที่ dead-letter queue ซึ่งสามารถถูกบริโภคได้
ในวิธีการนี้ เวลาหมดอายุของข้อความจะเป็นเวลาหน่วยการหน่วยของ delayed message ตัวอย่างเช่น ถ้า TTL ของข้อความถูกตั้งไว้ที่ 30 วินาที และไม่มีผู้บริโภคสำหรับคิว ข้อความจะหมดอายุหลังจาก 30 วินาทีและกลายเป็น dead letter ซึ่งจะถูกจัดการโดย dead-letter queue
เพื่อสำหรับการ implement วิธีนี้ คุณจำเป็นต้องตั้งค่า properties ตามที่อธิบายไว้ในสองบทแนะนำดังต่อไปนี้:
การใช้งาน plugin สำหรับ delayed message solution
1. การติดตั้ง plugin
rabbitmq-delayed-message-exchange GitHub repository:
https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases
ดาวน์โหลดไฟล์ rabbitmq_delayed_message_exchange-3.8.9-0199d11c.ez จากส่วนของ assets ในหน้า release ของ GitHub และวางไฟล์นี้ไว้ในไดเร็กทอรี่ของ plugin ใน RabbitMQ (ไดเร็กทอรี่ plugins)
หมายเหตุ: หมายเลขเวอร์ชันอาจแตกต่างจากที่แสดงในบทแนะนำ ถ้า RabbitMQ ของคุณเป็นเวอร์ชันล่าสุด คุณสามารถเลือกเวอร์ชันล่าสุดของ plugin ได้เลย
2. การเปิดใช้งาน plugin
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
3. การกำหนด exchange
ตั้งค่า properties ของ custom exchange โดยใช้ x-delayed-type เพื่อรองรับการส่ง delayed messages
props := make(map[string]interface{})
// พารามิเตอร์ Key เพื่อรองรับการส่ง delayed messages
props["x-delayed-type"] = "direct"
// ประกาศ exchange
err = ch.ExchangeDeclare(
"delay.queue", // ชื่อ exchange
"fanout", // ชนิดของ exchange
true, // ทนทาน
false,
false,
false,
props, // ตั้งค่า properties
)
4. การส่ง delayed messages
ตั้งค่าเวลาหน่วยในการหน่วยการาใช้ header ของข้อความ (x-delay)
msgHeaders := make(map[string]interface{})
// ตั้งค่าเวลาหน่วยการตadvance ในการหน่วยให้ใช้ header ของข้อความ ในหน่วยเป็นมิลลิวินาที
msgHeaders["x-delay"] = 6000
err = ch.Publish(
"delay.queue", // ชื่อ exchange
"", // พารามิเตอร์การเส้นทาง
false,
false,
amqp.Publishing{
Headers: msgHeaders, // ตั้งค่า headers ของ message
ContentType: "text/plain",
Body: []byte(body),
})
หมายเหตุ: ถ้าคุณใช้บริการ RabbitMQ Message Queue ของ Alibaba Cloud โดยตรง คุณสามารถตั้งค่าเวลาหน่วยซึ่งใช้ attribute ของ header ของข้อความ (delay) โดยไม่ต้องติดตั้ง plugin Alibaba Cloud ได้ขยาย RabbitMQ ไว้สำหรับจุดประสงค์นี้แล้ว