نمط النشر/الاشتراك في RabbitMQ باستخدام النمط (وضع البث، وضع فانوت)
نمط النشر/الاشتراك في RabbitMQ يعني أن الرسالة المرسلة بواسطة المنتج سيتم معالجتها بواسطة عدة مستهلكين.
الشرح:
- P يمثل المنتج، و C1 و C2 يمثلان المستهلكين، واللون الأحمر يمثل الصفوف، و X يمثل التبادل.
- التبادل مسؤول عن توجيه الرسائل إلى جميع الصفوف المرتبطة بالتبادل.
- يمكن تعريف عدة صفوف، كل واحدة مرتبطة بنفس التبادل.
- يمكن أن يحتوي كل صف على مستهلك واحد أو أكثر.
ملاحظة: إذا كنت غير ملم بـ RabbitMQ، يرجى قراءة الجزء مفاهيم RabbitMQ الأساسية أولاً.
1. تثبيت حزمة الاعتماد
go get github.com/streadway/amqp
2. إرسال الرسائل
توضح الخطوات التالية كيفية إرسال رسائل المُنتج.
2.1. الاتصال بخادم RabbitMQ
// الاتصال بخادم RabbitMQ
conn، خطأ := amqp.Dial("amqp://guest:guest@localhost:5672/")
defer conn.Close()
شرح عنوان الاتصال:
amqp://اسم_المستخدم:كلمة_المرور@عنوان_RabbitMQ:المنفذ/
2.2. إنشاء قناة
يتم إنجاز معظم العمليات على القناة.
ch، خطأ := conn.Channel()
defer ch.Close()
2.3. تعريف تبادل
تُرسل الرسائل أولاً إلى التبادل. يوجه التبادل الرسائل إلى الصفوف استنادًا إلى استراتيجيته.
خطأ := ch.ExchangeDeclare(
"tizi365", // اسم التبادل
"fanout", // نوع التبادل، مستخدمًا النوع فانوت هنا، أي، نمط النشر/الاشتراك
true, // دائم
false, // يتم الحذف تلقائيًا
false, // داخلي
false, // بدون انتظار
nil, // الوسيطات
)
2.4. نشر رسالة
// محتوى الرسالة
body := "مرحبًا بكم في Tizi365.com!"
// نشر الرسالة
خطأ := ch.Publish(
"tizi365", // التبادل (اسم التبادل المقابل للتعريف السابق)
"", // مفتاح التوجيه، بالنسبة لنوع التبادل فانوت، يتم تجاهل مفتاح التوجيه تلقائيًا، لذلك ليس من الضروري توفيره
false, // إلزامي
false, // فوري
amqp.Publishing {
ContentType: "text/plain", // نوع محتوى الرسالة، هنا هو نص عادي
Body: []byte(body), // محتوى الرسالة
})
2.5. إتمام كود دفع الرسالة
package main
import (
"log"
"github.com/streadway/amqp"
)
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}
func main() {
// الاتصال برابيت إم كيو
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "فشل الاتصال بـ RabbitMQ")
defer conn.Close()
// إنشاء قناة
ch, err := conn.Channel()
failOnError(err, "فشل في فتح قناة")
defer ch.Close()
// تعريف التبادل
err = ch.ExchangeDeclare(
"tizi365", // اسم التبادل
"fanout", // نوع التبادل، فانوت لنمط النشر/الاشتراك
true, // دائم
false, // يتم الحذف تلقائيًا
false, // داخلي
false, // بدون انتظار
nil, // الوسيطات
)
failOnError(err, "فشل في تعريف تبادل")
// محتوى الرسالة
body := "مرحبًا بكم في Tizi365.com!"
// دفع الرسالة
err = ch.Publish(
"tizi365", // التبادل (مطابق للتعريف أعلاه)
"", // مفتاح التوجيه، بالنسبة لتبادلات نوع فانوت، يتم تجاهل مفتاح التوجيه تلقائيًا
false, // إلزامي
false, // فوري
amqp.Publishing {
ContentType: "text/plain", // نوع محتوى الرسالة، هنا هو نص عادي
Body: []byte(body), // محتوى الرسالة
})
log.Printf("تم إرسال المحتوى %s", body)
}
3. استقبال الرسائل
الخطوات الأولى الثلاث لاستقبال الرسائل — الاتصال بـ RabbitMQ، إنشاء قناة، وتعريف تبادل — هي نفسها لإرسال الرسائل. يُرجى الرجوع إلى الأقسام السابقة 2.1، 2.2، و 2.3.
3.1. إعلان قائمة انتظار
قم بإعلان القائمة انتظار التي سيتم التحكم فيها.
q, err := ch.QueueDeclare(
"", // اسم القائمة انتظار، في حال عدم تحديده، سيتم إنشاء اسم عشوائي
false, // دائم
false, // حذف عندما تصبح غير مستخدمة
true, // حصري
false, // بدون انتظار
nil, // الوسيطات
)
3.2. ربط القائمة انتظار بالتبادل
تحتاج القائمة انتظار إلى أن تُربط بالتبادل لاستقبال الرسائل
err = ch.QueueBind(
q.Name, // اسم القائمة انتظار
"", // مفتاح التوجيه، بالنسبة لتبادلات فانوت، يُتجاهل مفتاح التوجيه تلقائيًا
"tizi365", // اسم التبادل، يجب أن يتطابق مع الاسم المحدد من قبل مرسل الرسالة
false,
nil)
ملاحظة: في التطبيقات الفعلية، يمكننا تحديد N قوائم انتظار، يتم ربط كلٍ منها بنفس التبادل، لكي يتم استقبال الرسائل الموجهة من قبل التبادل. وهنا يتجلى نمط النشر/الاشتراك.
3.3. إنشاء مستهلك
msgs, err := ch.Consume(
q.Name, // الإشارة إلى اسم القائمة انتظار من الأعلى
"", // اسم المستهلك، إذا لم يتم تحديده، سيتم إنشاؤه تلقائيًا
true, // التأكيد التلقائي بأن الرسالة تمت معالجتها
false, // حصري
false, // بدون المحلي
false, // بدون انتظار
nil, // الوسيطات
)
// حلقة للتعامل مع الرسائل
for d := range msgs {
log.Printf("تم استقبال الرسالة=%s", d.Body)
}
3.4. إكمال كود المستهلك
package main
import (
"log"
"github.com/streadway/amqp"
)
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}
func main() {
// الاتصال بRabbitMQ
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "فشل الاتصال بRabbitMQ")
defer conn.Close()
// إنشاء قناة، عادة ما تكون قناة واحدة لكل مستهلك
ch, err := conn.Channel()
failOnError(err, "فشل في فتح قناة")
defer ch.Close()
// إعلان تبادل
err = ch.ExchangeDeclare(
"tizi365", // اسم التبادل، يجب أن يتطابق مع الاسم المستخدم من قبل مرسل الرسالة
"fanout", // نوع التبادل
true, // دائم
false, // حذف تلقائي
false, // داخلي
false, // بدون انتظار
nil, // الوسيطات
)
failOnError(err, "فشل في إعلان تبادل")
// إعلان القائمة المراد التحكم فيها
q, err := ch.QueueDeclare(
"", // اسم القائمة انتظار، في حال فارغ سيتم إنشاء اسم عشوائي
false, // دائم
false, // حذف عندما تصبح غير مستخدمة
true, // حصري
false, // بدون انتظار
nil, // الوسيطات
)
failOnError(err, "فشل في إعلان قائمة انتظار")
// ربط القائمة بالتبادل المحدد
err = ch.QueueBind(
q.Name, // اسم القائمة انتظار
"", // مفتاح التوجيه، يتم تجاهله في حالة تبادلات الفانوت
"tizi365", // اسم التبادل، يجب أن يتطابق مع الاسم المحدد من قبل مرسل الرسالة
false,
nil)
failOnError(err, "فشل في ربط القائمة")
// إنشاء مستهلك
msgs, err := ch.Consume(
q.Name, // الإشارة إلى اسم القائمة من الأعلى
"", // اسم المستهلك، سيتم إنشاؤه تلقائيًا في حال كان فارغًا
true, // تأكيد تلقائي
false, // حصري
false, // بدون المحلي
false, // بدون انتظار
nil, // الوسيطات
)
failOnError(err, "فشل في تسجيل المستهلك")
// استهلاك الرسائل من القائمة في حلقة
for d := range msgs {
log.Printf("تم استقبال الرسالة: %s", d.Body)
}
}
3.5. متعدد المستهلكين
يرجى الرجوع إلى قسم وضع العمل وببساطة بدء عدة مستهلكين باستخدام goroutines.