الگوهای انتشار/اشتراک گذاری RabbitMQ (حالت پخش، حالت Fanout)

الگوی انتشار/اشتراک گذاری در RabbitMQ به این معناست که یک پیام ارسال شده توسط یک تولیدکننده (Producer) توسط چندین مصرف‌کننده (Consumer) پردازش خواهد شد.

حالت Fanout

توضیحات:

  • P نشان دهنده تولیدکننده، C1 و C2 نشان دهنده مصرف‌کنندگان، قرمز نشان دهنده صف‌ها و X نشان دهنده تبادل (Exchange) می‌باشد.
  • تبادل مسئول انتقال پیام به تمامی صف‌هایی است که با تبادل متصل شده‌اند.
  • می توان چندین صف تعریف کرد که هر یک به همان تبادل متصل شوند.
  • هر صف می‌تواند یک یا چندین مصرف‌کننده داشته باشد.

توجه: اگر با RabbitMQ آشنا نیستید لطفاً ابتدا بخش مفاهیم پایه RabbitMQ را مطالعه نمایید.

1. نصب بسته‌ی وابستگی

go get github.com/streadway/amqp

2. ارسال پیام‌ها

مراحل زیر نشان دهنده‌ی این است که تولیدکننده پیام چگونه پیام‌ها را ارسال می‌کند.

2.1. اتصال به سرور RabbitMQ

// اتصال به سرور RabbitMQ
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
defer conn.Close()

توضیحات آدرس اتصال:

amqp://نام‌کاربری:رمزعبور@آدرسRabbitMQ:پورت/

2.2. ایجاد یک کانال

بیشتر عملیات‌ها بر روی کانال انجام می‌شود.

ch, err := conn.Channel()
defer ch.Close()

2.3. تعریف یک تبادل

پیام‌ها ابتدا به تبادل ارسال می‌شوند. تبادل پیام‌ها را بر اساس استراتژی خود به صف‌ها ارسال می‌کند.

err = ch.ExchangeDeclare(
	"tizi365",   // نام تبادل
	"fanout", // نوع تبادل، در اینجا از نوع fanout استفاده می‌شود، یعنی الگوی انتشار/اشتراک گذاری
	true,     // پایدار
	false,    // خودکار پاک شونده
	false,    // داخلی
	false,    // بدون انتظار
	nil,      // آرگومان‌ها
)

2.4. انتشار یک پیام

// محتوای پیام
body := "سلام Tizi365.com!"

// ارسال پیام
err = ch.Publish(
  "tizi365",     // تبادل (نام تبادل مطابق با تعریف قبلی)
  "", // کلید مسیریابی، برای تبادل نوع fanout، کلید مسیریابی به طور خودکار نادیده گرفته می‌شود، بنابراین لازم نیست کلیدی ارائه شود
  false,  // اجباری
  false,  // فوری
  amqp.Publishing {
    ContentType: "text/plain", // نوع محتوای پیام، در این‌جا متن ساده است
    Body:        []byte(body),  // محتوای پیام
  })

2.5. کد تکمیلی ارسال پیام

package main

import (
	"log"
	"github.com/streadway/amqp"
)

func failOnError(err error, msg string) {
	if err != nil {
		log.Fatalf("%s: %s", msg, err)
	}
}

func main() {
	// اتصال به RabbitMQ
	conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
	failOnError(err, "اتصال به RabbitMQ با شکست مواجه شد")
	defer conn.Close()

	// ایجاد یک کانال
	ch, err := conn.Channel()
	failOnError(err, "باز کردن یک کانال با شکست مواجه شد")
	defer ch.Close()

	// تعریف یک تبادل
	err = ch.ExchangeDeclare(
		"tizi365",   // نام تبادل
		"fanout", // نوع تبادل، fanout برای حالت انتشار/اشتراک گذاری
		true,     // پایدار
		false,    // خودکار پاک شونده
		false,    // داخلی
		false,    // بدون انتظار
		nil,      // آرگومان‌ها
	)
	failOnError(err, "تعریف تبادل با شکست مواجه شد")

	// محتوای پیام
	body := "سلام Tizi365.com!"
	// ارسال پیام
	err = ch.Publish(
		"tizi365",     // تبادل (مطابق با تعریف بالا)
		"", // کلید مسیریابی، برای تبادل‌های نوع fanout کلید مسیریابی به طور خودکار نادیده گرفته می‌شود
		false,  // اجباری
		false,  // فوری
		amqp.Publishing {
			ContentType: "text/plain", // نوع محتوای پیام، اینجا متن ساده است
			Body:        []byte(body),  // محتوای پیام
		})

	log.Printf("محتوای ارسالی %s", body)
}

3. دریافت پیام‌ها

سه مرحله اول برای دریافت پیام‌ها - اتصال به RabbitMQ، ایجاد کانال و تعریف یک تبادل - مانند ارسال پیام‌ها هستند. به بخش‌های قبلی 2.1، 2.2 و 2.3 مراجعه کنید.

3.1. اعلان یک صف

صف را برای عملیات تعریف کنید

q, err := ch.QueueDeclare(
		"",    // اگر اسم صف مشخص نشود، یک اسم تصادفی ایجاد می‌شود
		false, // دائمی
		false, // حذف در صورت عدم استفاده
		true,  // انحصاری
		false, // بدون انتظار
		nil,   // آرگومان‌ها
	)

3.2. اتصال صف به تبادل

صف باید به تبادل متصل شود تا پیام‌ها را دریافت کند

err = ch.QueueBind(
		q.Name, // اسم صف
		"",     // کلید مسیریابی، برای تبادل‌های نوع fanout، کلید مسیریابی به طور خودکار نادیده گرفته می‌شود
		"tizi365", // اسم تبادل، باید با اسمی که توسط فرستنده‌ی پیام تعریف شده است، مطابقت داشته باشد
		false,
		nil)

توجه: در برنامه‌های واقعی، می‌توانیم N صف را تعریف کنیم که هر کدام به همان تبادل متصل شده‌اند تا پیام‌های رد شده توسط تبادل را دریافت کنند. اینجاست که الگوی انتشار/اشتراک به خوبی مشخص می‌شود.

3.3. ایجاد یک مصرف‌کننده

msgs, err := ch.Consume(
		q.Name, // ارجاع به اسم صف از بالا
		"",     // اسم مصرف‌کننده، اگر مشخص نشود، به طور خودکار ایجاد می‌شود
		true,   // به طور خودکار تاوان اینکه پیام پردازش شده است
		false,  // انحصاری
		false,  // بدون محلی
		false,  // بدون انتظار
		nil,    // آرگومان‌ها
	)
	
// حلقه برای رسیدگی به پیام‌ها
for d := range msgs {
	log.Printf("پیام دریافت شد=%s", d.Body)
}

3.4. کد مصرف‌کننده را تکمیل کنید

package main

import (
	"log"

	"github.com/streadway/amqp"
)

func failOnError(err error, msg string) {
	if err != nil {
		log.Fatalf("%s: %s", msg, err)
	}
}

func main() {
	// اتصال به RabbitMQ
	conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
	failOnError(err, "اتصال به RabbitMQ ناموفق بود")
	defer conn.Close()

	// ایجاد یک کانال، معمولاً یک کانال برای هر مصرف‌کننده
	ch, err := conn.Channel()
	failOnError(err, "ناموفق در باز کردن یک کانال")
	defer ch.Close()

	// تعریف یک تبادل
	err = ch.ExchangeDeclare(
		"tizi365",   // اسم تبادل، باید با اسمی که توسط فرستنده‌ی پیام تعریف شده است، مطابقت داشته باشد
		"fanout", // نوع تبادل
		true,     // دائمی
		false,    // حذف خودکار
		false,    // داخلی
		false,    // بدون انتظار
		nil,      // آرگومان‌ها
	)
	failOnError(err, "ناموفق در تعریف یک تبادل")

	// تعریف صف برای عملیات
	q, err := ch.QueueDeclare(
		"",    // اسم صف، اگر خالی باشد یک اسم تصادفی ایجاد می‌شود
		false, // دائمی
		false, // حذف در صورت عدم استفاده
		true,  // انحصاری
		false, // بدون انتظار
		nil,   // آرگومان‌ها
	)
	failOnError(err, "ناموفق در تعریف یک صف")

	// متصل کردن صف به تبادل مشخص شده
	err = ch.QueueBind(
		q.Name, // اسم صف
		"",     // کلید مسیریابی، برای تبادل‌های fanout چشم‌پوشی می‌شود
		"tizi365", // اسم تبادل، باید با اسم تعریف شده توسط فرستنده‌ی پیام مطابقت داشته باشد
		false,
		nil)
	failOnError(err, "ناموفق در متصل کردن یک صف")

	// ایجاد یک مصرف‌کننده
	msgs, err := ch.Consume(
		q.Name, // ارجاع به اسم صف از بالا
		"",     // اسم مصرف‌کننده، اگر خالی باشد به طور خودکار ایجاد می‌شود
		true,   // خودکار تایید
		false,  // انحصاری
		false,  // بدون محلی
		false,  // بدون انتظار
		nil,    // آرگومان‌ها
	)
	failOnError(err, "ناموفق در ثبت مصرف‌کننده")

	// مصرف‌کردن پیام‌ها از صف در یک حلقه
	for d := range msgs {
		log.Printf("پیام دریافت شده: %s", d.Body)
	}
}

3.5. چندین مصرف‌کننده

به بخش حالت کار مراجعه کنید و به‌سادگی با استفاده از گوروتین‌ها چندین مصرف‌کننده را شروع کنید.