โหมดคิวงานเรียงคี Golang RabbitMQ
คำอธิบาย: P แทนผู้ผลิต (producer), C แทนผู้บริโภค (consumer), และสีแดงแทนคิว (queue)
หมายเหตุ: หากคุณไม่คุ้นเคยกับ RabbitMQ โปรดอ่านส่วน คอนเซปต์พื้นฐานของ RabbitMQ ก่อน
1. ติดตั้ง Dependencies
go get github.com/streadway/amqp
นำเข้าแพ็กเกจขึ้นโปรแกรม
import (
"github.com/streadway/amqp"
)
2. ส่งข้อความ
ขั้นตอนต่อไปนี้แสดงการขยายของผู้ผลิตข้อความ
2.1. เชื่อมต่อไปยังเซิร์ฟเวอร์ RabbitMQ
// เชื่อมต่อไปยังเซิร์ฟเวอร์ RabbitMQ
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
defer conn.Close()
คำอธิบายที่อยู่การเชื่อมต่อ:
amqp://ชื่อผู้ใช้:รหัสผ่าน@ที่อยู่ของ RabbitMQ:พอร์ต/
2.2. สร้างช่อง
การดำเนินการส่วนใหญ่จะเกิดขึ้นบนช่อง
ch, err := conn.Channel()
defer ch.Close()
2.3. ประกาศคิว
แทนคิวที่เราต้องการอ่านหรือเขียน
q, err := ch.QueueDeclare(
"hello", // ชื่อคิว
false, // คงทนข้อความ
false, // ลบคิวเมื่อไม่ได้ใช้งาน
false, // ที่เอกสาร
false, // ไม่จอง
nil, // Argument
)
2.4. ดันข้อความ
// เนื้อหาข้อความ
body := "สวัสดี โลก!"
// ดันข้อความ
err = ch.Publish(
"", // แลกเชน (ไม่สนใจที่นี่)
q.Name, // พารามิเตอร์การเสียบ, ใช้ชื่อคิวเป็นพารามิเตอร์การเสียบ
false, // คำเริยบ
false, // มัตถอน
amqp.Publishing {
ContentType: "text/plain",
Body: []byte(body), // เนื้อหาข้อความ
})
2.5. โค้ดสมบูรณ์สำหรับการส่งข้อความ
package main
// นำเข้าแพ็กเกจ
import (
"log"
"github.com/streadway/amqp"
)
// จัดการข้อผิดพลาด
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}
func main() {
// เชื่อมต่อไปยัง RabbitMQ
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "เชื่อมต่อกับ RabbitMQ ล้มเหลว")
defer conn.Close()
// สร้างช่อง
ch, err := conn.Channel()
failOnError(err, "เปิดช่องล้มเหลว")
defer ch.Close()
// ประกาศคิวที่จะดำเนินการ
q, err := ch.QueueDeclare(
"hello", // ชื่อ
false, // ทนทาน
false, // ลบเมื่อไม่ได้ใช้
false, // ที่เอกสาร
false, // ไม่รอ
nil, // Argument
)
failOnError(err, "การประกาศคิวล้มเหลว")
// เนื้อหาข้อความที่จะส่ง
body := "สวัสดี โลก!"
// ส่งข้อความ
err = ch.Publish(
"", // แลกเชน
q.Name, // คีย์เส้นทาง
false, // บังคับ
false, // ตรง
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(body),
})
failOnError(err, "การเผยแพร่ข้อความล้มเหลว")
log.Printf(" [x] ส่ง %s", body)
}
3. การรับข้อความ
ขั้นตอน 3 ขั้นแรกของการรับข้อความเหมือนกับการส่งข้อความ จะตรงกับส่วน 2.1, 2.2 และ 2.3 ตามลำดับ โค้ดทั้งหมดสำหรับการรับข้อความคือดังนี้:
package main
// Import packages
import (
"log"
"github.com/streadway/amqp"
)
// Error handling
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}
func main() {
// Connect to RabbitMQ
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "การเชื่อมต่อไปที่ RabbitMQ ล้มเหลว")
defer conn.Close()
// Create a channel
ch, err := conn.Channel()
failOnError(err, "การเปิดช่องทางล้มเหลว")
defer ch.Close()
// ประกาศคิวที่จะดำเนินการ
q, err := ch.QueueDeclare(
"hello", // ต้องการให้ชื่อคิวตรงกับชื่อคิวที่ใช้สำหรับส่งข้อความ
false, // durable
false, // ลบเมื่อไม่ได้ใช้
false, // ความเป็นเอกลักษณ์
false, // no-wait
nil, // arguments
)
failOnError(err, "การประกาศคิวล้มเหลว")
// สร้างผู้บริโภคข้อความ
msgs, err := ch.Consume(
q.Name, // ชื่อคิว
"", // ชื่อผู้บริโภค หากไม่ได้กรอก ระบบจะสร้างไอดีที่ไม่ซ้ำซ้อนโดยอัตโนมัติ
true, // ว่าจะตอบรับข้อความโดยอัตโนมัติหรือไม่ กล่าวคือ แจ้งเมื่อ RabbitMQ ได้รับข้อความเรียบร้อยแล้ว
false, // ไม่ใช่เอกลักษณ์
false, // no-local
false, // no-wait
nil, // args
)
failOnError(err, "การลงทะเบียนเป็นผู้บริโภคล้มเหลว")
// ดึงข้อความจากคิวในลูป
for d := range msgs {
// พิมพ์เนื้อหาข้อความ
log.Printf("ได้รับข้อความ: %s", d.Body)
}
}