RabbitMQ ไม่รองรับการส่ง delayed messages โดยตรง ปัจจุบันการ implement หลักๆ รวมถึงการใช้ dead-letter exchange + message TTL scheme หรือใช้ plugin ชื่อ rabbitmq-delayed-message-exchange มากที่สุด

การใช้งานสำหรับ delayed queues

  • กรณีที่ต้องการกำหนดระยะเวลาสำหรับการผลิตและบริโภคข้อความ ตัวอย่างเช่นในธุรกิจออนไลน์ มีกรณีที่การสั่งซื้อจะถูกปิดถ้าการชำระเงินไม่สำเร็จภายในระยะเวลาที่กำหนด ที่นี่ข้อความ delayed จะถูกส่งเมื่อการสั่งซื้อถูกสร้าง ข้อความนี้จะถูกส่งไปยังผู้บริโภค 30 นาทีต่อมา และผู้บริโภคจำเป็นต้องตรวจสอบว่าการสั่งซื้อที่เกี่ยวข้องได้รับการชำระเงินหรือไม่ ถ้าการชำระเงินไม่สำเร็จ การสั่งซื้อจะถูกปิด ถ้าการชำระเงินสำเร็จ ข้อความนี้จะถูกละเลย
  • กรณีที่กิจกรรมที่เป็น delayed ถูกเรียกใช้โดยข้อความ ตัวอย่างเช่น การส่งข้อความเตือนแก่ผู้ใช้หลังจากระยะเวลาที่กำหนด

วิธีการใช้ dead-letter exchange + message TTL scheme

หลักการหลักของวิธีนี้คือการสร้างคิวโดยไม่มีผู้บริโภคและใช้เวลาหมดอายุของข้อความ (TTL) เมื่อข้อความหมดอายุ มันก็กลายเป็น dead letter ซึ่งจะถูกส่งต่อไปสู่ dead-letter exchange แล้วไปที่ dead-letter queue ซึ่งสามารถถูกบริโภคได้

ในวิธีการนี้ เวลาหมดอายุของข้อความจะเป็นเวลาหน่วยการหน่วยของ delayed message ตัวอย่างเช่น ถ้า TTL ของข้อความถูกตั้งไว้ที่ 30 วินาที และไม่มีผู้บริโภคสำหรับคิว ข้อความจะหมดอายุหลังจาก 30 วินาทีและกลายเป็น dead letter ซึ่งจะถูกจัดการโดย dead-letter queue

เพื่อสำหรับการ implement วิธีนี้ คุณจำเป็นต้องตั้งค่า properties ตามที่อธิบายไว้ในสองบทแนะนำดังต่อไปนี้:

การใช้งาน plugin สำหรับ delayed message solution

1. การติดตั้ง plugin

rabbitmq-delayed-message-exchange GitHub repository:

https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases

ดาวน์โหลดไฟล์ rabbitmq_delayed_message_exchange-3.8.9-0199d11c.ez จากส่วนของ assets ในหน้า release ของ GitHub และวางไฟล์นี้ไว้ในไดเร็กทอรี่ของ plugin ใน RabbitMQ (ไดเร็กทอรี่ plugins)

หมายเหตุ: หมายเลขเวอร์ชันอาจแตกต่างจากที่แสดงในบทแนะนำ ถ้า RabbitMQ ของคุณเป็นเวอร์ชันล่าสุด คุณสามารถเลือกเวอร์ชันล่าสุดของ plugin ได้เลย

2. การเปิดใช้งาน plugin

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

3. การกำหนด exchange

ตั้งค่า properties ของ custom exchange โดยใช้ x-delayed-type เพื่อรองรับการส่ง delayed messages

props := make(map[string]interface{})
// พารามิเตอร์ Key เพื่อรองรับการส่ง delayed messages
props["x-delayed-type"] = "direct"

// ประกาศ exchange
err = ch.ExchangeDeclare(
"delay.queue", // ชื่อ exchange
"fanout",      // ชนิดของ exchange
true,          // ทนทาน
false,
false,
false,
props,         // ตั้งค่า properties
)

4. การส่ง delayed messages

ตั้งค่าเวลาหน่วยในการหน่วยการาใช้ header ของข้อความ (x-delay)

msgHeaders := make(map[string]interface{})
// ตั้งค่าเวลาหน่วยการตadvance ในการหน่วยให้ใช้ header ของข้อความ ในหน่วยเป็นมิลลิวินาที
msgHeaders["x-delay"] = 6000

err = ch.Publish(
"delay.queue", // ชื่อ exchange
"",            // พารามิเตอร์การเส้นทาง
false,
false,
amqp.Publishing{
Headers: msgHeaders,  // ตั้งค่า headers ของ message
ContentType: "text/plain",
Body: []byte(body),
})

หมายเหตุ: ถ้าคุณใช้บริการ RabbitMQ Message Queue ของ Alibaba Cloud โดยตรง คุณสามารถตั้งค่าเวลาหน่วยซึ่งใช้ attribute ของ header ของข้อความ (delay) โดยไม่ต้องติดตั้ง plugin Alibaba Cloud ได้ขยาย RabbitMQ ไว้สำหรับจุดประสงค์นี้แล้ว