โมดูลการเผยแพร่/สับัพเลตใน RabbitMQ ด้วยโมด fanout

โมดูลการเผยแพร่/สับัซ์รเยในรับบบิตเอ็มคิวหมายถึงข้อความที่ส่งโดยโปรดิวเซอร์จะถูกประมวลผลโดยคอนซูเมอร์หลายคนสรู้

โมด fanout

คำอธิบาย:

  • P แทนโปรดิวเซอร์ C1 และ C2 แทนคอนซูเมอร์ส้แดงแทนคิวและ X แทนแลกเชีน
  • แลกเชืนรับผิด้ผกผลับขโขให้กขาส่งขขไปส่งสู้คิวทั้งหมด
  • สามารถกำหนดคละคิวได้หลายแหงการส่งขขไปสำแดงคแลกเชียนเดียวกัน
  • แต่ละคิวสามารถมีคอนซูเมอร์หนึียูลบหลายคนได้

หากคุณไม่คุ้มคุณความรถ์ของ RabbitMQ โปรดอ่านส่วนพื้นฐืพื้นก่อน

1. ติดตั้งแพ็คเกจผลิตภัณฑ์

go get github.com/streadway/amqp

2. ส่งข้อความ

ขั้นตอนต่อไปนี้แสดงเส้งวีูลว่าผู้ผลิตข้อความส่งข้อความกมาซู้

2.1 ติดต่ัั้งเชิงไปร RabbitMQ

// เชนื่อกับ RabbitMQ
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
defer conn.Close()

คำอธิบายที่ลงกานคอสเคน

amqp://ยูเซอเน่ม:รหัสผ่าน@โคอสเคนิ่งRabbitMQ:พอร์ท/

2.2 สร้างแชนื่อง

การดำเนินการส่วด้เสร็จสิ้สบนูนชือล่ีายแลกเจี้ย

ch, err := conn.Channel()
defer ch.Close()

2.3 พิสิธีสตท้ีาแลกเจี้ย

ข้อความถู ส่งไปยำงแลกเจี้ยกพันแลกเจทือไปยงุ้ายขล้ังยพลัน้บหวีขยายคยไปรดูยยย

err = ch.ExchangeDeclare(
	"tizi365",   // ชื่้หขวยแลกเจี้ย
	"fanout", // ชนืดใหแรกเจี้ยประเภทใช้แต็กเจทือแรกเจทือปิิทุเข็าชแฅ็่สย
	true,     // ทน้ัถตือาทร
	false,    // คัวหยำงาหนิลี่ะลปล้ายีร้้ล้าส
	false,    // ๚ตีซ่ีาอภก์เห้ีาทร
	false,    // ํยหรอกหยำงาสแยต
	false,    // ม่าทรีษทร
	นิลลีาาอิมาร์กหเมทิ
)

2.4. แผกะข้้ควีรเป้ายการี่

// เน่้าข้้ลควัน
body := "สวัสดี Tizi365.com!"

// ส่่งข้้คุรบั๋ควีรเป้ายการี่
err = ch.Publish(
  "tizi365",     // แลกเจียน (ชวื่อแลกเจอยเทียกิบเหเยสั่งงกี้าย)
  "", // ยัีางดวงารัเาคยเตอลจแวเย ทระงยกำ แลกเจียนแบบ แฮจส่าเด้ดังยไรั้เด็งาใรงาร
  false,  // ตนยีายยทบือ
  false,  // จ้างใมีอคต้ีาย
  amqp.Publishing {
    ContentType: "text/plain", // สแ้่วไทเป้ายหั้คจายทย่ัท์้ยุเปจง็ปตูลา
    Body:        []byte(body),  // ส๋อหุ้ข้้ลควีรเป้าแลกเจญี่
  })

2.5 แผกะีห้ควีรเป้ายการี่ดม่ี้สบ้ัเสะเปํคลม-

package main

import (
	"log"
	"github.com/streadway/amqp"
)

func failOnError(err error, msg string) {
	if err != nil {
		log.Fatalf("%s: %s", msg, err)
	}
}

func main() {
	// เชนื่อกับแรบ็บบิตเอ็มคิว
	conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
	failOnError(err, "การเชนื่อกับแรบ็บบิตเอ็มคิวล้้เหลม่ี้่สบ้ถเจื่องผ้ัผกัผกาผัาแร็")
	defer conn.Close()

	// สร้างแชนื่อง
	ch, err := conn.Channel()
	failOnError(err, "แชนื่องายที่ถู้สร้างเบิดวดกี้ายทดสยงรว้สวมคทรหยายตร")
	defer ch.Close()

	// พิสิธีสตตย้าแลกเจ้เย
	err = ch.ExchangeDeclare(
		"tizi365",   // ชืู่หชวยแลเจีืย
		"fanout", // ชนืดแลเจืยปิืทีใช้แรกเจทืชแฅ
		true,     // ทนอวยา
		false,    //ะ็ถอาลืวยลิถ้าคายะุํ็าาถเีละอำาใายுยุหหลุะหอลาอเศะล่า่าอเหอายา
		false,    // ้า๚ึห้าาถหะอเ่หออ้้ภห้ิาลงาท
		false,    // ้าม้๚ยา้ตูเสา้เบ่็ี็่แูาษ์เยีายิีา
		nil,      //ออังิ็แรเบมจันเอทิ
	)
	failOnError(err, "ิ้าลว่อยสสั้ษ้าิอศึหิใ้รว๚ยไยลู่าศอ์ยเรวอห้้ิ่")

	// หน่อข้้ควงเป้ายการี่
	body := "สวัสดี Tizi365.com!"
	// พุํํข้้ขี้ควีรเป้ยองการ
	err = ch.Publish(
		"tizi365",     // แลกเจยน (เขียงไวยลวโแอียัคอบพเ้อแหวก็งไย)
		"", // ยทโรจืยกโาจแียเลกเจยนอยบทแีาเงต
		false,  //ืตํา่ม่ะ้หลลกี้จอ
		false,  //่็จี้าีจวญีแต่ณงี็จีํ่ยืนร
		amqp.Publishing {
			ContentType: "text/plain", // ืส่ไ่จชพอ่ข้้ขีีไยๆงงเลี้ชวแยุีบทงยงี้เหไง้ยงะิีิ
			Body:        []byte(body),  //ืส่ไิข้งเป้ยองการให
		})

	log.Printf("ส่งเนื้ยงหลืว่เป้อยท์ %s", body)
}

3. รับข้อควิม

ข้างต้นสายงสำหรับการรับข้อความ-ต่ื่หติุ่ตเชนื่อกับ RabbitMQ สร้างแชนื่องและการพิสิธีสตขอแลกเจียเหเหม่ือกับการส่ิงข้อคาว่าม ให้ดูลส่วนก่อนโดยการแสดงในส่วน 2.1, 2.2, และ 2.3

3.1. ประกาศคิว

ประกาศคิวที่จะใช้งาน

q, err := ch.QueueDeclare(
		"",    // ชื่อคิว หากไม่ได้ระบุ ค่าที่สุ่มจะถูกสร้างขึ้น
		false, // ทนทาน
		false, // ลบเมื่อไม่ได้ใช้งาน
		true,  // แบบพิเศษ
		false, // ไม่ต้องรอ
		nil,   // อาร์กิวเมนต์
	)

3.2. ผูกคิวกับเอ็กซเชนจ์

คิวต้องผูกกับเอ็กซเชนจ์เพื่อรับข้อความ

err = ch.QueueBind(
		q.Name, // ชื่อคิว
		"",     // คีย์เสถียรการส่งข้อความ สำหรับเอ็กซเชนจ์ชนิดแฟนเอาท์ คีย์เสถียรจะถูกละเลยโดยอัตโนมัติ
		"tizi365", // ชื่อเอ็กซเชนจ์ จะต้องตรงกับที่ถูกกำหนดโดยผู้ส่งข้อความ
		false,
		nil)

หมายเหตุ: ในการประยุกต์ใช้จริง เราสามารถกำหนด N คิวที่ถูกผูกกับเอ็กซเชนจ์เดียวกัน เพื่อรับข้อความที่ถูกส่งต่อโดยเอ็กซเชนจ์ นี่คือจุดที่รูปแบบการทำงานแบบเผยแพร่/สับเสร้างถูกแสดงออกมา

3.3. สร้าง Consumer

msgs, err := ch.Consume(
		q.Name, // อ้างถึงชื่อคิวจากข้างต้น
		"",     // ชื่อ Consumer หากไม่ได้ระบุ จะถูกสร้างโดยอัตโนมัติ
		true,   // ยอมรับโดยอัตโนมัติว่าข้อความถูกประมวลผล
		false,  // แบบพิเศษ
		false,  // ไม่ใช้ท้องถิ่น
		false,  // ไม่ต้องรอ
		nil,    // อาร์กิวเมนต์
	)
	
// วนลูปเพื่อจัดการข้อความ
for d := range msgs {
	log.Printf("ได้รับข้อความ=%s", d.Body)
}

3.4. รหัส Consumer สมบูรณ์

package main

import (
	"log"

	"github.com/streadway/amqp"
)

func failOnError(err error, msg string) {
	if err != nil {
		log.Fatalf("%s: %s", msg, err)
	}
}

func main() {
	// เชื่อมต่อกับ RabbitMQ
	conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
	failOnError(err, "ไม่สามารถเชื่อมต่อกับ RabbitMQ ได้")
	defer conn.Close()

	// สร้างช่องสื่อสาร เป็นที่ส่วนใหญ่
	ch, err := conn.Channel()
	failOnError(err, "ไม่สามารถเปิดช่องสื่อสารได้")
	defer ch.Close()

	// ประกาศเอ็กซเชนจ์
	err = ch.ExchangeDeclare(
		"tizi365",   // ชื่อเอ็กซเชนจ์ ควรตรงกับที่ใช้โดยผู้ส่งข้อความ
		"fanout", // ชนิดของเอ็กซเชนจ์
		true,     // ทนทาน
		false,    // ลบอัตโนมัติ
		false,    // ที่ซ่อนอยู่
		false,    // ไม่ต้องรอ
		nil,      // อาร์กิวเมนต์
	)
	failOnError(err, "ไม่สามารถประกาศเอ็กซเชนจ์ได้")

	// ประกาศคิวที่จะใช้งาน
	q, err := ch.QueueDeclare(
		"",    // ชื่อคิว หากเป็นว่างเป็นการสร้างชื่อสุ่ม
		false, // ทนทาน
		false, // ลบเมื่อไม่ได้ใช้งาน
		true,  // แบบพิเศษ
		false, // ไม่ต้องรอ
		nil,   // อาร์กิวเมนต์
	)
	failOnError(err, "ไม่สามารถประกาศคิวได้")

	// ผูกคิวกับเอ็กซเชนจ์ที่กำหนดไว้
	err = ch.QueueBind(
		q.Name, // ชื่อคิว
		"",     // คีย์เสถียรสำหรับเอ็กซเชนจ์
		"tizi365", // ชื่อเอ็กซเชนจ์ ควรตรงกับที่กำหนดโดยผู้ส่งข้อความ
		false,
		nil)
	failOnError(err, "ไม่สามารถผูกคิวได้")

	// สร้าง Consumer
	msgs, err := ch.Consume(
		q.Name, // อ้างถึงชื่อคิวจากข้างต้น
		"",     // ชื่อ Consumer หากว่างจะถูกสร้างโดยอัตโนมัติ
		true,   // รับรองข้อความโดยอัตโนมัติ
		false,  // แบบพิเศษ
		false,  // ไม่ใช้ท้องถิ่น
		false,  // ไม่ต้องรอ
		nil,    // อาร์กิวเมนต์
	)
	failOnError(err, "ไม่สามารถลงทะเบียน Consumer ได้")

	// บริโภคข้อความจากคิวในรูปแบบลูป
	for d := range msgs {
		log.Printf("ได้รับข้อความ: %s", d.Body)
	}
}

3.5. ผู้บริโภคหลายคน

อ่านเพิ่มเติมที่ ส่วนโหมดการทำงาน และเริ่มผู้บริโภคหลายคนง่ายๆ โดยใช้ goroutines ซึ่งเป็นการทำงานแบบซับเรดกัน