الگوهای انتشار/اشتراک گذاری RabbitMQ (حالت پخش، حالت Fanout)
الگوی انتشار/اشتراک گذاری در RabbitMQ به این معناست که یک پیام ارسال شده توسط یک تولیدکننده (Producer) توسط چندین مصرفکننده (Consumer) پردازش خواهد شد.
توضیحات:
- P نشان دهنده تولیدکننده، C1 و C2 نشان دهنده مصرفکنندگان، قرمز نشان دهنده صفها و X نشان دهنده تبادل (Exchange) میباشد.
- تبادل مسئول انتقال پیام به تمامی صفهایی است که با تبادل متصل شدهاند.
- می توان چندین صف تعریف کرد که هر یک به همان تبادل متصل شوند.
- هر صف میتواند یک یا چندین مصرفکننده داشته باشد.
توجه: اگر با RabbitMQ آشنا نیستید لطفاً ابتدا بخش مفاهیم پایه RabbitMQ را مطالعه نمایید.
1. نصب بستهی وابستگی
go get github.com/streadway/amqp
2. ارسال پیامها
مراحل زیر نشان دهندهی این است که تولیدکننده پیام چگونه پیامها را ارسال میکند.
2.1. اتصال به سرور RabbitMQ
// اتصال به سرور RabbitMQ
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
defer conn.Close()
توضیحات آدرس اتصال:
amqp://نامکاربری:رمزعبور@آدرسRabbitMQ:پورت/
2.2. ایجاد یک کانال
بیشتر عملیاتها بر روی کانال انجام میشود.
ch, err := conn.Channel()
defer ch.Close()
2.3. تعریف یک تبادل
پیامها ابتدا به تبادل ارسال میشوند. تبادل پیامها را بر اساس استراتژی خود به صفها ارسال میکند.
err = ch.ExchangeDeclare(
"tizi365", // نام تبادل
"fanout", // نوع تبادل، در اینجا از نوع fanout استفاده میشود، یعنی الگوی انتشار/اشتراک گذاری
true, // پایدار
false, // خودکار پاک شونده
false, // داخلی
false, // بدون انتظار
nil, // آرگومانها
)
2.4. انتشار یک پیام
// محتوای پیام
body := "سلام Tizi365.com!"
// ارسال پیام
err = ch.Publish(
"tizi365", // تبادل (نام تبادل مطابق با تعریف قبلی)
"", // کلید مسیریابی، برای تبادل نوع fanout، کلید مسیریابی به طور خودکار نادیده گرفته میشود، بنابراین لازم نیست کلیدی ارائه شود
false, // اجباری
false, // فوری
amqp.Publishing {
ContentType: "text/plain", // نوع محتوای پیام، در اینجا متن ساده است
Body: []byte(body), // محتوای پیام
})
2.5. کد تکمیلی ارسال پیام
package main
import (
"log"
"github.com/streadway/amqp"
)
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}
func main() {
// اتصال به RabbitMQ
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "اتصال به RabbitMQ با شکست مواجه شد")
defer conn.Close()
// ایجاد یک کانال
ch, err := conn.Channel()
failOnError(err, "باز کردن یک کانال با شکست مواجه شد")
defer ch.Close()
// تعریف یک تبادل
err = ch.ExchangeDeclare(
"tizi365", // نام تبادل
"fanout", // نوع تبادل، fanout برای حالت انتشار/اشتراک گذاری
true, // پایدار
false, // خودکار پاک شونده
false, // داخلی
false, // بدون انتظار
nil, // آرگومانها
)
failOnError(err, "تعریف تبادل با شکست مواجه شد")
// محتوای پیام
body := "سلام Tizi365.com!"
// ارسال پیام
err = ch.Publish(
"tizi365", // تبادل (مطابق با تعریف بالا)
"", // کلید مسیریابی، برای تبادلهای نوع fanout کلید مسیریابی به طور خودکار نادیده گرفته میشود
false, // اجباری
false, // فوری
amqp.Publishing {
ContentType: "text/plain", // نوع محتوای پیام، اینجا متن ساده است
Body: []byte(body), // محتوای پیام
})
log.Printf("محتوای ارسالی %s", body)
}
3. دریافت پیامها
سه مرحله اول برای دریافت پیامها - اتصال به RabbitMQ، ایجاد کانال و تعریف یک تبادل - مانند ارسال پیامها هستند. به بخشهای قبلی 2.1، 2.2 و 2.3 مراجعه کنید.
3.1. اعلان یک صف
صف را برای عملیات تعریف کنید
q, err := ch.QueueDeclare(
"", // اگر اسم صف مشخص نشود، یک اسم تصادفی ایجاد میشود
false, // دائمی
false, // حذف در صورت عدم استفاده
true, // انحصاری
false, // بدون انتظار
nil, // آرگومانها
)
3.2. اتصال صف به تبادل
صف باید به تبادل متصل شود تا پیامها را دریافت کند
err = ch.QueueBind(
q.Name, // اسم صف
"", // کلید مسیریابی، برای تبادلهای نوع fanout، کلید مسیریابی به طور خودکار نادیده گرفته میشود
"tizi365", // اسم تبادل، باید با اسمی که توسط فرستندهی پیام تعریف شده است، مطابقت داشته باشد
false,
nil)
توجه: در برنامههای واقعی، میتوانیم N صف را تعریف کنیم که هر کدام به همان تبادل متصل شدهاند تا پیامهای رد شده توسط تبادل را دریافت کنند. اینجاست که الگوی انتشار/اشتراک به خوبی مشخص میشود.
3.3. ایجاد یک مصرفکننده
msgs, err := ch.Consume(
q.Name, // ارجاع به اسم صف از بالا
"", // اسم مصرفکننده، اگر مشخص نشود، به طور خودکار ایجاد میشود
true, // به طور خودکار تاوان اینکه پیام پردازش شده است
false, // انحصاری
false, // بدون محلی
false, // بدون انتظار
nil, // آرگومانها
)
// حلقه برای رسیدگی به پیامها
for d := range msgs {
log.Printf("پیام دریافت شد=%s", d.Body)
}
3.4. کد مصرفکننده را تکمیل کنید
package main
import (
"log"
"github.com/streadway/amqp"
)
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}
func main() {
// اتصال به RabbitMQ
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "اتصال به RabbitMQ ناموفق بود")
defer conn.Close()
// ایجاد یک کانال، معمولاً یک کانال برای هر مصرفکننده
ch, err := conn.Channel()
failOnError(err, "ناموفق در باز کردن یک کانال")
defer ch.Close()
// تعریف یک تبادل
err = ch.ExchangeDeclare(
"tizi365", // اسم تبادل، باید با اسمی که توسط فرستندهی پیام تعریف شده است، مطابقت داشته باشد
"fanout", // نوع تبادل
true, // دائمی
false, // حذف خودکار
false, // داخلی
false, // بدون انتظار
nil, // آرگومانها
)
failOnError(err, "ناموفق در تعریف یک تبادل")
// تعریف صف برای عملیات
q, err := ch.QueueDeclare(
"", // اسم صف، اگر خالی باشد یک اسم تصادفی ایجاد میشود
false, // دائمی
false, // حذف در صورت عدم استفاده
true, // انحصاری
false, // بدون انتظار
nil, // آرگومانها
)
failOnError(err, "ناموفق در تعریف یک صف")
// متصل کردن صف به تبادل مشخص شده
err = ch.QueueBind(
q.Name, // اسم صف
"", // کلید مسیریابی، برای تبادلهای fanout چشمپوشی میشود
"tizi365", // اسم تبادل، باید با اسم تعریف شده توسط فرستندهی پیام مطابقت داشته باشد
false,
nil)
failOnError(err, "ناموفق در متصل کردن یک صف")
// ایجاد یک مصرفکننده
msgs, err := ch.Consume(
q.Name, // ارجاع به اسم صف از بالا
"", // اسم مصرفکننده، اگر خالی باشد به طور خودکار ایجاد میشود
true, // خودکار تایید
false, // انحصاری
false, // بدون محلی
false, // بدون انتظار
nil, // آرگومانها
)
failOnError(err, "ناموفق در ثبت مصرفکننده")
// مصرفکردن پیامها از صف در یک حلقه
for d := range msgs {
log.Printf("پیام دریافت شده: %s", d.Body)
}
}
3.5. چندین مصرفکننده
به بخش حالت کار مراجعه کنید و بهسادگی با استفاده از گوروتینها چندین مصرفکننده را شروع کنید.