Mô hình Đăng ký/Nhận thông báo trong RabbitMQ (Chế độ Phát sóng, Chế độ Fanout)

Mô hình đăng ký/nhận thông báo trong RabbitMQ có nghĩa là một tin nhắn được gửi bởi một nhà sản xuất sẽ được xử lý bởi nhiều người tiêu dùng.

Chế độ Fanout

Giải thích:

  • P đại diện cho nhà sản xuất, C1 và C2 đại diện cho người tiêu dùng, màu đỏ đại diện cho hàng đợi, và X đại diện cho trao đổi.
  • Trao đổi chịu trách nhiệm chuyển tiếp tin nhắn đến tất cả hàng đợi được ràng buộc với trao đổi.
  • Có thể xác định nhiều hàng đợi, mỗi hàng đợi được ràng buộc với cùng một trao đổi.
  • Mỗi hàng đợi có thể có một hoặc nhiều người tiêu dùng.

Lưu ý: Nếu bạn không quen với RabbitMQ, vui lòng đọc phần Khái niệm Cơ bản về RabbitMQ trước.

1. Cài đặt Gói Phụ thuộc

go get github.com/streadway/amqp

2. Gửi Tin Nhắn

Các bước sau đây mô tả cách nhà sản xuất tin nhắn gửi tin nhắn.

2.1. Kết nối tới máy chủ RabbitMQ

// Kết nối tới máy chủ RabbitMQ
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
defer conn.Close()

Giải thích địa chỉ kết nối:

amqp://tênngườidùng:mậtkhẩu@ĐịachỉRabbitMQ:Cổng/

2.2. Tạo một Kênh

Hầu hết các hoạt động được thực hiện trên kênh.

ch, err := conn.Channel()
defer ch.Close()

2.3. Khai báo một Trung gian

Tin nhắn được gửi trước tiên tới trung gian. Trung gian chuyển tiếp tin nhắn tới hàng đợi dựa trên chiến lược của nó.

err = ch.ExchangeDeclare(
	"tizi365",   // Tên trung gian
	"fanout", // Loại trung gian, sử dụng loại fanout ở đây, tức là mô hình đăng ký/nhận thông báo
	true,     // Bền vững
	false,    // Tự động xóa
	false,    // Nội bộ
	false,    // Không chờ đợi
	nil,      // Đối số
)

2.4. Gửi một Tin Nhắn

// Nội dung tin nhắn
body := "Xin chào Tizi365.com!"

// Gửi tin nhắn
err = ch.Publish(
  "tizi365",     // Trung gian (tên trung gian tương ứng với khai báo trước đó)
  "", // Khóa định tuyến, đối với loại trung gian fanout, khóa định tuyến sẽ tự động bị bỏ qua, do đó không cần phải cung cấp
  false,  // Bắt buộc
  false,  // Ngay lập tức
  amqp.Publishing {
    ContentType: "text/plain", // Loại nội dung tin nhắn, ở đây là văn bản thuần túy
    Body:        []byte(body),  // Nội dung tin nhắn
  })

2.5. Hoàn tất Mã Thực hiện Đẩy Tin Nhắn

package main

import (
	"log"
	"github.com/streadway/amqp"
)

func failOnError(err error, msg string) {
	if err != nil {
		log.Fatalf("%s: %s", msg, err)
	}
}

func main() {
	// Kết nối tới RabbitMQ
	conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
	failOnError(err, "Không thể kết nối tới RabbitMQ")
	defer conn.Close()

	// Tạo một kênh
	ch, err := conn.Channel()
	failOnError(err, "Không thể mở một kênh")
	defer ch.Close()

	// Khai báo một trung gian
	err = ch.ExchangeDeclare(
		"tizi365",   // Tên trung gian
		"fanout", // Loại trung gian, fanout cho chế độ đăng ký/nhận thông báo
		true,     // Bền vững
		false,    // Tự động xóa
		false,    // Nội bộ
		false,    // Không chờ đợi
		nil,      // Đối số
	)
	failOnError(err, "Không thể khai báo một trung gian")

	// Nội dung tin nhắn
	body := "Xin chào Tizi365.com!"
	// Đẩy tin nhắn
	err = ch.Publish(
		"tizi365",     // Trung gian (phù hợp với khai báo ở trên)
		"", // Khóa định tuyến, đối với trung gian loại fanout, khóa định tuyến sẽ tự động bị bỏ qua
		false,  // Bắt buộc
		false,  // Ngay lập tức
		amqp.Publishing {
			ContentType: "text/plain", // Loại nội dung tin nhắn, ở đây là văn bản thuần túy
			Body:        []byte(body),  // Nội dung tin nhắn
		})

	log.Printf("Đã gửi nội dung %s", body)
}

3. Nhận Tin Nhắn

Các bước đầu tiên để nhận tin nhắn—kết nối tới RabbitMQ, tạo một kênh, và khai báo một trung gian—giống như gửi tin nhắn. Tham khảo các phần trước đó 2.1, 2.2, và 2.3.

3.1. Khai báo một Queue

Khai báo queue sẽ được thao tác

q, err := ch.QueueDeclare(
		"",    // Tên queue, nếu không được chỉ định, một tên ngẫu nhiên sẽ được tạo
		false, // Bền vững
		false, // Xóa khi không sử dụng
		true,  // Độc quyền
		false, // Không chờ đợi
		nil,   // Các đối số
	)

3.2. Ràng buộc Queue với Exchange

Queue cần được ràng buộc với exchange để nhận các thông điệp

err = ch.QueueBind(
		q.Name, // Tên queue
		"",     // Khóa định tuyến, đối với loại trao đổi fanout, khóa định tuyến sẽ tự động bị bỏ qua
		"tizi365", // Tên exchange, phải trùng khớp với tên được định nghĩa bởi người gửi thông điệp
		false,
		nil)

Lưu ý: Trong ứng dụng thực tế, chúng ta có thể định nghĩa N queues, mỗi queue ràng buộc với cùng một exchange, để nhận các thông điệp được chuyển tiếp bởi exchange. Đây chính là nơi mà mô hình xuất bản/đăng ký được phản ánh.

3.3. Tạo Một Người tiêu dùng

msgs, err := ch.Consume(
		q.Name, // Tham chiếu tên queue từ trên
		"",     // Tên người tiêu dùng, nếu không được chỉ định, nó sẽ được tạo tự động
		true,   // Tự động xác nhận rằng thông điệp đã được xử lý
		false,  // Độc quyền
		false,  // Không địa phương
		false,  // Không chờ đợi
		nil,    // Các đối số
	)
	
// Vòng lặp xử lý các thông điệp
for d := range msgs {
	log.Printf("Nhận thông điệp=%s", d.Body)
}

3.4. Hoàn tất Mã người tiêu dùng

package main

import (
	"log"

	"github.com/streadway/amqp"
)

func failOnError(err error, msg string) {
	if err != nil {
		log.Fatalf("%s: %s", msg, err)
	}
}

func main() {
	// Kết nối tới RabbitMQ
	conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
	failOnError(err, "Không thể kết nối tới RabbitMQ")
	defer conn.Close()

	// Tạo một kênh, thường là một kênh cho mỗi người tiêu dùng
	ch, err := conn.Channel()
	failOnError(err, "Không thể mở một kênh")
	defer ch.Close()

	// Khai báo một exchange
	err = ch.ExchangeDeclare(
		"tizi365",   // Tên exchange, phải trùng khớp với tên được sử dụng bởi người gửi thông điệp
		"fanout", // Loại exchange
		true,     // Bền vững
		false,    // Tự động xóa
		false,    // Nội bộ
		false,    // Không chờ đợi
		nil,      // Các đối số
	)
	failOnError(err, "Không thể khai báo một exchange")

	// Khai báo queue để thao tác
	q, err := ch.QueueDeclare(
		"",    // Tên queue, nếu trống, một tên ngẫu nhiên sẽ được tạo
		false, // Bền vững
		false, // Xóa khi không sử dụng
		true,  // Độc quyền
		false, // Không chờ đợi
		nil,   // Các đối số
	)
	failOnError(err, "Không thể khai báo một queue")

	// Ràng buộc queue với exchange được chỉ định
	err = ch.QueueBind(
		q.Name, // Tên queue
		"",     // Khóa định tuyến, bị bỏ qua đối với các exchange fanout
		"tizi365", // Tên exchange, phải trùng khớp với tên được định nghĩa bởi người gửi thông điệp
		false,
		nil)
	failOnError(err, "Không thể ràng buộc một queue")

	// Tạo một người tiêu dùng
	msgs, err := ch.Consume(
		q.Name, // Tham chiếu tới tên queue ở trên
		"",     // Tên người tiêu dùng, s