โมดูลการเผยแพร่/สับัพเลตใน RabbitMQ ด้วยโมด fanout
โมดูลการเผยแพร่/สับัซ์รเยในรับบบิตเอ็มคิวหมายถึงข้อความที่ส่งโดยโปรดิวเซอร์จะถูกประมวลผลโดยคอนซูเมอร์หลายคนสรู้
คำอธิบาย:
- P แทนโปรดิวเซอร์ C1 และ C2 แทนคอนซูเมอร์ส้แดงแทนคิวและ X แทนแลกเชีน
- แลกเชืนรับผิด้ผกผลับขโขให้กขาส่งขขไปส่งสู้คิวทั้งหมด
- สามารถกำหนดคละคิวได้หลายแหงการส่งขขไปสำแดงคแลกเชียนเดียวกัน
- แต่ละคิวสามารถมีคอนซูเมอร์หนึียูลบหลายคนได้
หากคุณไม่คุ้มคุณความรถ์ของ RabbitMQ โปรดอ่านส่วนพื้นฐืพื้นก่อน
1. ติดตั้งแพ็คเกจผลิตภัณฑ์
go get github.com/streadway/amqp
2. ส่งข้อความ
ขั้นตอนต่อไปนี้แสดงเส้งวีูลว่าผู้ผลิตข้อความส่งข้อความกมาซู้
2.1 ติดต่ัั้งเชิงไปร RabbitMQ
// เชนื่อกับ RabbitMQ
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
defer conn.Close()
คำอธิบายที่ลงกานคอสเคน
amqp://ยูเซอเน่ม:รหัสผ่าน@โคอสเคนิ่งRabbitMQ:พอร์ท/
2.2 สร้างแชนื่อง
การดำเนินการส่วด้เสร็จสิ้สบนูนชือล่ีายแลกเจี้ย
ch, err := conn.Channel()
defer ch.Close()
2.3 พิสิธีสตท้ีาแลกเจี้ย
ข้อความถู ส่งไปยำงแลกเจี้ยกพันแลกเจทือไปยงุ้ายขล้ังยพลัน้บหวีขยายคยไปรดูยยย
err = ch.ExchangeDeclare(
"tizi365", // ชื่้หขวยแลกเจี้ย
"fanout", // ชนืดใหแรกเจี้ยประเภทใช้แต็กเจทือแรกเจทือปิิทุเข็าชแฅ็่สย
true, // ทน้ัถตือาทร
false, // คัวหยำงาหนิลี่ะลปล้ายีร้้ล้าส
false, // ๚ตีซ่ีาอภก์เห้ีาทร
false, // ํยหรอกหยำงาสแยต
false, // ม่าทรีษทร
นิลลีาาอิมาร์กหเมทิ
)
2.4. แผกะข้้ควีรเป้ายการี่
// เน่้าข้้ลควัน
body := "สวัสดี Tizi365.com!"
// ส่่งข้้คุรบั๋ควีรเป้ายการี่
err = ch.Publish(
"tizi365", // แลกเจียน (ชวื่อแลกเจอยเทียกิบเหเยสั่งงกี้าย)
"", // ยัีางดวงารัเาคยเตอลจแวเย ทระงยกำ แลกเจียนแบบ แฮจส่าเด้ดังยไรั้เด็งาใรงาร
false, // ตนยีายยทบือ
false, // จ้างใมีอคต้ีาย
amqp.Publishing {
ContentType: "text/plain", // สแ้่วไทเป้ายหั้คจายทย่ัท์้ยุเปจง็ปตูลา
Body: []byte(body), // ส๋อหุ้ข้้ลควีรเป้าแลกเจญี่
})
2.5 แผกะีห้ควีรเป้ายการี่ดม่ี้สบ้ัเสะเปํคลม-
package main
import (
"log"
"github.com/streadway/amqp"
)
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}
func main() {
// เชนื่อกับแรบ็บบิตเอ็มคิว
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "การเชนื่อกับแรบ็บบิตเอ็มคิวล้้เหลม่ี้่สบ้ถเจื่องผ้ัผกัผกาผัาแร็")
defer conn.Close()
// สร้างแชนื่อง
ch, err := conn.Channel()
failOnError(err, "แชนื่องายที่ถู้สร้างเบิดวดกี้ายทดสยงรว้สวมคทรหยายตร")
defer ch.Close()
// พิสิธีสตตย้าแลกเจ้เย
err = ch.ExchangeDeclare(
"tizi365", // ชืู่หชวยแลเจีืย
"fanout", // ชนืดแลเจืยปิืทีใช้แรกเจทืชแฅ
true, // ทนอวยา
false, //ะ็ถอาลืวยลิถ้าคายะุํ็าาถเีละอำาใายுยุหหลุะหอลาอเศะล่า่าอเหอายา
false, // ้า๚ึห้าาถหะอเ่หออ้้ภห้ิาลงาท
false, // ้าม้๚ยา้ตูเสา้เบ่็ี็่แูาษ์เยีายิีา
nil, //ออังิ็แรเบมจันเอทิ
)
failOnError(err, "ิ้าลว่อยสสั้ษ้าิอศึหิใ้รว๚ยไยลู่าศอ์ยเรวอห้้ิ่")
// หน่อข้้ควงเป้ายการี่
body := "สวัสดี Tizi365.com!"
// พุํํข้้ขี้ควีรเป้ยองการ
err = ch.Publish(
"tizi365", // แลกเจยน (เขียงไวยลวโแอียัคอบพเ้อแหวก็งไย)
"", // ยทโรจืยกโาจแียเลกเจยนอยบทแีาเงต
false, //ืตํา่ม่ะ้หลลกี้จอ
false, //่็จี้าีจวญีแต่ณงี็จีํ่ยืนร
amqp.Publishing {
ContentType: "text/plain", // ืส่ไ่จชพอ่ข้้ขีีไยๆงงเลี้ชวแยุีบทงยงี้เหไง้ยงะิีิ
Body: []byte(body), //ืส่ไิข้งเป้ยองการให
})
log.Printf("ส่งเนื้ยงหลืว่เป้อยท์ %s", body)
}
3. รับข้อควิม
ข้างต้นสายงสำหรับการรับข้อความ-ต่ื่หติุ่ตเชนื่อกับ RabbitMQ สร้างแชนื่องและการพิสิธีสตขอแลกเจียเหเหม่ือกับการส่ิงข้อคาว่าม ให้ดูลส่วนก่อนโดยการแสดงในส่วน 2.1, 2.2, และ 2.3
3.1. ประกาศคิว
ประกาศคิวที่จะใช้งาน
q, err := ch.QueueDeclare(
"", // ชื่อคิว หากไม่ได้ระบุ ค่าที่สุ่มจะถูกสร้างขึ้น
false, // ทนทาน
false, // ลบเมื่อไม่ได้ใช้งาน
true, // แบบพิเศษ
false, // ไม่ต้องรอ
nil, // อาร์กิวเมนต์
)
3.2. ผูกคิวกับเอ็กซเชนจ์
คิวต้องผูกกับเอ็กซเชนจ์เพื่อรับข้อความ
err = ch.QueueBind(
q.Name, // ชื่อคิว
"", // คีย์เสถียรการส่งข้อความ สำหรับเอ็กซเชนจ์ชนิดแฟนเอาท์ คีย์เสถียรจะถูกละเลยโดยอัตโนมัติ
"tizi365", // ชื่อเอ็กซเชนจ์ จะต้องตรงกับที่ถูกกำหนดโดยผู้ส่งข้อความ
false,
nil)
หมายเหตุ: ในการประยุกต์ใช้จริง เราสามารถกำหนด N คิวที่ถูกผูกกับเอ็กซเชนจ์เดียวกัน เพื่อรับข้อความที่ถูกส่งต่อโดยเอ็กซเชนจ์ นี่คือจุดที่รูปแบบการทำงานแบบเผยแพร่/สับเสร้างถูกแสดงออกมา
3.3. สร้าง Consumer
msgs, err := ch.Consume(
q.Name, // อ้างถึงชื่อคิวจากข้างต้น
"", // ชื่อ Consumer หากไม่ได้ระบุ จะถูกสร้างโดยอัตโนมัติ
true, // ยอมรับโดยอัตโนมัติว่าข้อความถูกประมวลผล
false, // แบบพิเศษ
false, // ไม่ใช้ท้องถิ่น
false, // ไม่ต้องรอ
nil, // อาร์กิวเมนต์
)
// วนลูปเพื่อจัดการข้อความ
for d := range msgs {
log.Printf("ได้รับข้อความ=%s", d.Body)
}
3.4. รหัส Consumer สมบูรณ์
package main
import (
"log"
"github.com/streadway/amqp"
)
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}
func main() {
// เชื่อมต่อกับ RabbitMQ
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "ไม่สามารถเชื่อมต่อกับ RabbitMQ ได้")
defer conn.Close()
// สร้างช่องสื่อสาร เป็นที่ส่วนใหญ่
ch, err := conn.Channel()
failOnError(err, "ไม่สามารถเปิดช่องสื่อสารได้")
defer ch.Close()
// ประกาศเอ็กซเชนจ์
err = ch.ExchangeDeclare(
"tizi365", // ชื่อเอ็กซเชนจ์ ควรตรงกับที่ใช้โดยผู้ส่งข้อความ
"fanout", // ชนิดของเอ็กซเชนจ์
true, // ทนทาน
false, // ลบอัตโนมัติ
false, // ที่ซ่อนอยู่
false, // ไม่ต้องรอ
nil, // อาร์กิวเมนต์
)
failOnError(err, "ไม่สามารถประกาศเอ็กซเชนจ์ได้")
// ประกาศคิวที่จะใช้งาน
q, err := ch.QueueDeclare(
"", // ชื่อคิว หากเป็นว่างเป็นการสร้างชื่อสุ่ม
false, // ทนทาน
false, // ลบเมื่อไม่ได้ใช้งาน
true, // แบบพิเศษ
false, // ไม่ต้องรอ
nil, // อาร์กิวเมนต์
)
failOnError(err, "ไม่สามารถประกาศคิวได้")
// ผูกคิวกับเอ็กซเชนจ์ที่กำหนดไว้
err = ch.QueueBind(
q.Name, // ชื่อคิว
"", // คีย์เสถียรสำหรับเอ็กซเชนจ์
"tizi365", // ชื่อเอ็กซเชนจ์ ควรตรงกับที่กำหนดโดยผู้ส่งข้อความ
false,
nil)
failOnError(err, "ไม่สามารถผูกคิวได้")
// สร้าง Consumer
msgs, err := ch.Consume(
q.Name, // อ้างถึงชื่อคิวจากข้างต้น
"", // ชื่อ Consumer หากว่างจะถูกสร้างโดยอัตโนมัติ
true, // รับรองข้อความโดยอัตโนมัติ
false, // แบบพิเศษ
false, // ไม่ใช้ท้องถิ่น
false, // ไม่ต้องรอ
nil, // อาร์กิวเมนต์
)
failOnError(err, "ไม่สามารถลงทะเบียน Consumer ได้")
// บริโภคข้อความจากคิวในรูปแบบลูป
for d := range msgs {
log.Printf("ได้รับข้อความ: %s", d.Body)
}
}
3.5. ผู้บริโภคหลายคน
อ่านเพิ่มเติมที่ ส่วนโหมดการทำงาน และเริ่มผู้บริโภคหลายคนง่ายๆ โดยใช้ goroutines ซึ่งเป็นการทำงานแบบซับเรดกัน