نمط النشر/الاشتراك في RabbitMQ باستخدام النمط (وضع البث، وضع فانوت)

نمط النشر/الاشتراك في RabbitMQ يعني أن الرسالة المرسلة بواسطة المنتج سيتم معالجتها بواسطة عدة مستهلكين.

وضع فانوت

الشرح:

  • P يمثل المنتج، و C1 و C2 يمثلان المستهلكين، واللون الأحمر يمثل الصفوف، و X يمثل التبادل.
  • التبادل مسؤول عن توجيه الرسائل إلى جميع الصفوف المرتبطة بالتبادل.
  • يمكن تعريف عدة صفوف، كل واحدة مرتبطة بنفس التبادل.
  • يمكن أن يحتوي كل صف على مستهلك واحد أو أكثر.

ملاحظة: إذا كنت غير ملم بـ RabbitMQ، يرجى قراءة الجزء مفاهيم RabbitMQ الأساسية أولاً.

1. تثبيت حزمة الاعتماد

go get github.com/streadway/amqp

2. إرسال الرسائل

توضح الخطوات التالية كيفية إرسال رسائل المُنتج.

2.1. الاتصال بخادم RabbitMQ

// الاتصال بخادم RabbitMQ
conn، خطأ := amqp.Dial("amqp://guest:guest@localhost:5672/")
defer conn.Close()

شرح عنوان الاتصال:

amqp://اسم_المستخدم:كلمة_المرور@عنوان_RabbitMQ:المنفذ/

2.2. إنشاء قناة

يتم إنجاز معظم العمليات على القناة.

ch، خطأ := conn.Channel()
defer ch.Close()

2.3. تعريف تبادل

تُرسل الرسائل أولاً إلى التبادل. يوجه التبادل الرسائل إلى الصفوف استنادًا إلى استراتيجيته.

خطأ := ch.ExchangeDeclare(
	"tizi365",   // اسم التبادل
	"fanout", // نوع التبادل، مستخدمًا النوع فانوت هنا، أي، نمط النشر/الاشتراك
	true,     // دائم
	false,    // يتم الحذف تلقائيًا
	false,    // داخلي
	false,    // بدون انتظار
	nil,      // الوسيطات
)

2.4. نشر رسالة

// محتوى الرسالة
body := "مرحبًا بكم في Tizi365.com!"

// نشر الرسالة
خطأ := ch.Publish(
  "tizi365",     // التبادل (اسم التبادل المقابل للتعريف السابق)
  "", // مفتاح التوجيه، بالنسبة لنوع التبادل فانوت، يتم تجاهل مفتاح التوجيه تلقائيًا، لذلك ليس من الضروري توفيره
  false,  // إلزامي
  false,  // فوري
  amqp.Publishing {
    ContentType: "text/plain", // نوع محتوى الرسالة، هنا هو نص عادي
    Body:        []byte(body),  // محتوى الرسالة
  })

2.5. إتمام كود دفع الرسالة

package main

import (
	"log"
	"github.com/streadway/amqp"
)

func failOnError(err error, msg string) {
	if err != nil {
		log.Fatalf("%s: %s", msg, err)
	}
}

func main() {
	// الاتصال برابيت إم كيو
	conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
	failOnError(err, "فشل الاتصال بـ RabbitMQ")
	defer conn.Close()

	// إنشاء قناة
	ch, err := conn.Channel()
	failOnError(err, "فشل في فتح قناة")
	defer ch.Close()

	// تعريف التبادل
	err = ch.ExchangeDeclare(
		"tizi365",   // اسم التبادل
		"fanout", // نوع التبادل، فانوت لنمط النشر/الاشتراك
		true,     // دائم
		false,    // يتم الحذف تلقائيًا
		false,    // داخلي
		false,    // بدون انتظار
		nil,      // الوسيطات
	)
	failOnError(err, "فشل في تعريف تبادل")

	// محتوى الرسالة
	body := "مرحبًا بكم في Tizi365.com!"
	// دفع الرسالة
	err = ch.Publish(
		"tizi365",     // التبادل (مطابق للتعريف أعلاه)
		"", // مفتاح التوجيه، بالنسبة لتبادلات نوع فانوت، يتم تجاهل مفتاح التوجيه تلقائيًا
		false,  // إلزامي
		false,  // فوري
		amqp.Publishing {
			ContentType: "text/plain", // نوع محتوى الرسالة، هنا هو نص عادي
			Body:        []byte(body),  // محتوى الرسالة
		})

	log.Printf("تم إرسال المحتوى %s", body)
}

3. استقبال الرسائل

الخطوات الأولى الثلاث لاستقبال الرسائل — الاتصال بـ RabbitMQ، إنشاء قناة، وتعريف تبادل — هي نفسها لإرسال الرسائل. يُرجى الرجوع إلى الأقسام السابقة 2.1، 2.2، و 2.3.

3.1. إعلان قائمة انتظار

قم بإعلان القائمة انتظار التي سيتم التحكم فيها.

q, err := ch.QueueDeclare(
		"",    // اسم القائمة انتظار، في حال عدم تحديده، سيتم إنشاء اسم عشوائي
		false, // دائم
		false, // حذف عندما تصبح غير مستخدمة
		true,  // حصري
		false, // بدون انتظار
		nil,   // الوسيطات
	)

3.2. ربط القائمة انتظار بالتبادل

تحتاج القائمة انتظار إلى أن تُربط بالتبادل لاستقبال الرسائل

err = ch.QueueBind(
		q.Name, // اسم القائمة انتظار
		"",     // مفتاح التوجيه، بالنسبة لتبادلات فانوت، يُتجاهل مفتاح التوجيه تلقائيًا
		"tizi365", // اسم التبادل، يجب أن يتطابق مع الاسم المحدد من قبل مرسل الرسالة
		false,
		nil)

ملاحظة: في التطبيقات الفعلية، يمكننا تحديد N قوائم انتظار، يتم ربط كلٍ منها بنفس التبادل، لكي يتم استقبال الرسائل الموجهة من قبل التبادل. وهنا يتجلى نمط النشر/الاشتراك.

3.3. إنشاء مستهلك

msgs, err := ch.Consume(
		q.Name, // الإشارة إلى اسم القائمة انتظار من الأعلى
		"",     // اسم المستهلك، إذا لم يتم تحديده، سيتم إنشاؤه تلقائيًا
		true,   // التأكيد التلقائي بأن الرسالة تمت معالجتها
		false,  // حصري
		false,  // بدون المحلي
		false,  // بدون انتظار
		nil,    // الوسيطات
	)
	
// حلقة للتعامل مع الرسائل
for d := range msgs {
	log.Printf("تم استقبال الرسالة=%s", d.Body)
}

3.4. إكمال كود المستهلك

package main

import (
	"log"

	"github.com/streadway/amqp"
)

func failOnError(err error, msg string) {
	if err != nil {
		log.Fatalf("%s: %s", msg, err)
	}
}

func main() {
	// الاتصال بRabbitMQ
	conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
	failOnError(err, "فشل الاتصال بRabbitMQ")
	defer conn.Close()

	// إنشاء قناة، عادة ما تكون قناة واحدة لكل مستهلك
	ch, err := conn.Channel()
	failOnError(err, "فشل في فتح قناة")
	defer ch.Close()

	// إعلان تبادل
	err = ch.ExchangeDeclare(
		"tizi365",   // اسم التبادل، يجب أن يتطابق مع الاسم المستخدم من قبل مرسل الرسالة
		"fanout", // نوع التبادل
		true,     // دائم
		false,    // حذف تلقائي
		false,    // داخلي
		false,    // بدون انتظار
		nil,      // الوسيطات
	)
	failOnError(err, "فشل في إعلان تبادل")

	// إعلان القائمة المراد التحكم فيها
	q, err := ch.QueueDeclare(
		"",    // اسم القائمة انتظار، في حال فارغ سيتم إنشاء اسم عشوائي
		false, // دائم
		false, // حذف عندما تصبح غير مستخدمة
		true,  // حصري
		false, // بدون انتظار
		nil,   // الوسيطات
	)
	failOnError(err, "فشل في إعلان قائمة انتظار")

	// ربط القائمة بالتبادل المحدد
	err = ch.QueueBind(
		q.Name, // اسم القائمة انتظار
		"",     // مفتاح التوجيه، يتم تجاهله في حالة تبادلات الفانوت
		"tizi365", // اسم التبادل، يجب أن يتطابق مع الاسم المحدد من قبل مرسل الرسالة
		false,
		nil)
	failOnError(err, "فشل في ربط القائمة")

	// إنشاء مستهلك
	msgs, err := ch.Consume(
		q.Name, // الإشارة إلى اسم القائمة من الأعلى
		"",     // اسم المستهلك، سيتم إنشاؤه تلقائيًا في حال كان فارغًا
		true,   // تأكيد تلقائي
		false,  // حصري
		false,  // بدون المحلي
		false,  // بدون انتظار
		nil,    // الوسيطات
	)
	failOnError(err, "فشل في تسجيل المستهلك")

	// استهلاك الرسائل من القائمة في حلقة
	for d := range msgs {
		log.Printf("تم استقبال الرسالة: %s", d.Body)
	}
}

3.5. متعدد المستهلكين

يرجى الرجوع إلى قسم وضع العمل وببساطة بدء عدة مستهلكين باستخدام goroutines.