โหมดคิวงานเรียงคี Golang RabbitMQ

Golang RabbitMQ

คำอธิบาย: P แทนผู้ผลิต (producer), C แทนผู้บริโภค (consumer), และสีแดงแทนคิว (queue)

หมายเหตุ: หากคุณไม่คุ้นเคยกับ RabbitMQ โปรดอ่านส่วน คอนเซปต์พื้นฐานของ RabbitMQ ก่อน

1. ติดตั้ง Dependencies

go get github.com/streadway/amqp

นำเข้าแพ็กเกจขึ้นโปรแกรม

import (
  "github.com/streadway/amqp"
)

2. ส่งข้อความ

ขั้นตอนต่อไปนี้แสดงการขยายของผู้ผลิตข้อความ

2.1. เชื่อมต่อไปยังเซิร์ฟเวอร์ RabbitMQ

// เชื่อมต่อไปยังเซิร์ฟเวอร์ RabbitMQ
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
defer conn.Close()

คำอธิบายที่อยู่การเชื่อมต่อ:

amqp://ชื่อผู้ใช้:รหัสผ่าน@ที่อยู่ของ RabbitMQ:พอร์ต/

2.2. สร้างช่อง

การดำเนินการส่วนใหญ่จะเกิดขึ้นบนช่อง

ch, err := conn.Channel()
defer ch.Close()

2.3. ประกาศคิว

แทนคิวที่เราต้องการอ่านหรือเขียน

q, err := ch.QueueDeclare(
  "hello", // ชื่อคิว
  false,   // คงทนข้อความ
  false,   // ลบคิวเมื่อไม่ได้ใช้งาน
  false,   // ที่เอกสาร
  false,   // ไม่จอง
  nil,     // Argument
)

2.4. ดันข้อความ

// เนื้อหาข้อความ
body := "สวัสดี โลก!"

// ดันข้อความ
err = ch.Publish(
  "",     // แลกเชน (ไม่สนใจที่นี่)
  q.Name, // พารามิเตอร์การเสียบ, ใช้ชื่อคิวเป็นพารามิเตอร์การเสียบ
  false,  // คำเริยบ
  false,  // มัตถอน
  amqp.Publishing {
    ContentType: "text/plain",
    Body:        []byte(body),  // เนื้อหาข้อความ
  })

2.5. โค้ดสมบูรณ์สำหรับการส่งข้อความ

package main

// นำเข้าแพ็กเกจ
import (
	"log"
	"github.com/streadway/amqp"
)

// จัดการข้อผิดพลาด
func failOnError(err error, msg string) {
	if err != nil {
		log.Fatalf("%s: %s", msg, err)
	}
}

func main() {
	// เชื่อมต่อไปยัง RabbitMQ
	conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
	failOnError(err, "เชื่อมต่อกับ RabbitMQ ล้มเหลว")
	defer conn.Close()

	// สร้างช่อง
	ch, err := conn.Channel()
	failOnError(err, "เปิดช่องล้มเหลว")
	defer ch.Close()

	// ประกาศคิวที่จะดำเนินการ
	q, err := ch.QueueDeclare(
		"hello", // ชื่อ
		false,   // ทนทาน
		false,   // ลบเมื่อไม่ได้ใช้
		false,   // ที่เอกสาร
		false,   // ไม่รอ
		nil,     // Argument
	)
	failOnError(err, "การประกาศคิวล้มเหลว")

	// เนื้อหาข้อความที่จะส่ง
	body := "สวัสดี โลก!"

	// ส่งข้อความ
	err = ch.Publish(
		"",     // แลกเชน
		q.Name, // คีย์เส้นทาง
		false,  // บังคับ
		false,  // ตรง
		amqp.Publishing{
			ContentType: "text/plain",
			Body:        []byte(body),
		})
	failOnError(err, "การเผยแพร่ข้อความล้มเหลว")
	log.Printf(" [x] ส่ง %s", body)
}

3. การรับข้อความ

ขั้นตอน 3 ขั้นแรกของการรับข้อความเหมือนกับการส่งข้อความ จะตรงกับส่วน 2.1, 2.2 และ 2.3 ตามลำดับ โค้ดทั้งหมดสำหรับการรับข้อความคือดังนี้:

package main

// Import packages
import (
	"log"
	"github.com/streadway/amqp"
)

// Error handling
func failOnError(err error, msg string) {
	if err != nil {
		log.Fatalf("%s: %s", msg, err)
	}
}

func main() {
	// Connect to RabbitMQ
	conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
	failOnError(err, "การเชื่อมต่อไปที่ RabbitMQ ล้มเหลว")
	defer conn.Close()

	// Create a channel
	ch, err := conn.Channel()
	failOnError(err, "การเปิดช่องทางล้มเหลว")
	defer ch.Close()
	
	// ประกาศคิวที่จะดำเนินการ
	q, err := ch.QueueDeclare(
		"hello", // ต้องการให้ชื่อคิวตรงกับชื่อคิวที่ใช้สำหรับส่งข้อความ
		false,   // durable
		false,   // ลบเมื่อไม่ได้ใช้
		false,   // ความเป็นเอกลักษณ์
		false,   // no-wait
		nil,     // arguments
	)
	failOnError(err, "การประกาศคิวล้มเหลว")

	// สร้างผู้บริโภคข้อความ
	msgs, err := ch.Consume(
		q.Name, // ชื่อคิว
		"",     // ชื่อผู้บริโภค หากไม่ได้กรอก ระบบจะสร้างไอดีที่ไม่ซ้ำซ้อนโดยอัตโนมัติ
		true,   // ว่าจะตอบรับข้อความโดยอัตโนมัติหรือไม่ กล่าวคือ แจ้งเมื่อ RabbitMQ ได้รับข้อความเรียบร้อยแล้ว
		false,  // ไม่ใช่เอกลักษณ์
		false,  // no-local
		false,  // no-wait
		nil,    // args
	)
	failOnError(err, "การลงทะเบียนเป็นผู้บริโภคล้มเหลว")
	
	// ดึงข้อความจากคิวในลูป
	for d := range msgs {
		// พิมพ์เนื้อหาข้อความ
		log.Printf("ได้รับข้อความ: %s", d.Body)
	}
}