وضع الانتظار البسيط في RabbitMQ بلغة Golang
شرح: P تمثل المنتج، C تمثل المستهلك، واللون الأحمر يمثل الطابور.
ملاحظة: إذا كنت غير ملم بـ RabbitMQ، يرجى قراءة الفقرة مفاهيم أساسية لـRabbitMQ أولاً.
1. تثبيت التبعيات
go get github.com/streadway/amqp
استيراد حزمة التبعية
import (
"github.com/streadway/amqp"
)
2. إرسال الرسائل
توضح الخطوات التالية كيفية إكمال منتج الرسالة للرسالة.
2.1. الاتصال بخادم RabbitMQ
// الاتصال بخادم RabbitMQ
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
defer conn.Close()
شرح عنوان الاتصال:
amqp://اسم المستخدم:كلمة السر@عنوانRabbitMQ:المنفذ/
2.2. إنشاء قناة
يتم تنفيذ معظم العمليات على القناة.
ch, err := conn.Channel()
defer ch.Close()
2.3. تعريف طابور
يمثل الطابور الذي نحتاج إلى قراءة أو كتابته.
q, err := ch.QueueDeclare(
"hello", // اسم الطابور
false, // استمرار الرسالة
false, // حذف الطابور عند عدم الاستخدام
false, // حصري
false, // عدم انتظار
nil, // Arguments
)
2.4. دفع الرسائل
// محتوى الرسالة
body := "مرحباً بالعالم!"
// دفع الرسالة
err = ch.Publish(
"", // تبادل (تجاهل هنا)
q.Name, // معلمة التوجيه، استخدام اسم الطابور كمعلمة توجيه
false, // إجباري
false, // فوري
amqp.Publishing {
ContentType: "text/plain",
Body: []byte(body), // محتوى الرسالة
})
2.5. الشفرة الكاملة لإرسال الرسائل
package main
// استيراد الحزم
import (
"log"
"github.com/streadway/amqp"
)
// التعامل مع الأخطاء
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}
func main() {
// الاتصال بـ RabbitMQ
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "فشل الاتصال بـ RabbitMQ")
defer conn.Close()
// إنشاء قناة
ch, err := conn.Channel()
failOnError(err, "فشل فتح قناة")
defer ch.Close()
// تعريف الطابور للعمل معه
q, err := ch.QueueDeclare(
"hello", // الاسم
false, // دائم
false, // حذف عند عدم الاستخدام
false, // حصري
false, // عدم انتظار
nil, // Arguments
)
failOnError(err, "فشل تعريف طابور")
// محتوى الرسالة للإرسال
body := "مرحباً بالعالم!"
// إرسال الرسالة
err = ch.Publish(
"", // تبادل
q.Name, // مفتاح التوجيه
false, // إلزامي
false, // فوري
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(body),
})
failOnError(err, "فشل نشر الرسالة")
log.Printf(" [x] Sent %s", body)
}
3. استقبال الرسائل
الخطوات الثلاث الأولى لاستقبال الرسائل تتطابق مع إرسال الرسائل، وتوافق على الفروع 2.1 و2.2 و2.3 على التوالي. يتضمن الكود الكامل لاستقبال الرسائل ما يلي:
package main
// استيراد الحزم
import (
"log"
"github.com/streadway/amqp"
)
// التعامل مع الأخطاء
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}
func main() {
// الاتصال بـ RabbitMQ
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "فشل الاتصال برابيت إم كيو")
defer conn.Close()
// إنشاء قناة
ch, err := conn.Channel()
failOnError(err, "فشل فتح القناة")
defer ch.Close()
// تعريف الطابور الذي سيُعمل عليه
q, err := ch.QueueDeclare(
"hello", // اسم الطابور يجب أن يكون متسقًا مع اسم الطابور لإرسال الرسائل
false, // متين
false, // حذف عند عدم الاستخدام
false, // حصري
false, // بدون انتظار
nil, // arguments
)
failOnError(err, "فشل في تعريف طابور")
// إنشاء مستهلك للرسائل
msgs, err := ch.Consume(
q.Name, // اسم الطابور
"", // اسم المستهلك، إذا لم يتم ملؤه، سيتم إنشاء معرف فريد تلقائيًا
true, // ما إذا كان تأكيد الرسائل تلقائيًا، أي إعلام رابيت إم كيو تلقائيًا أن الرسالة تمت معالجتها بنجاح
false, // حصري
false, // لا محلي
false, // بدون انتظار
nil, // args
)
failOnError(err, "فشل في تسجيل مستهلك")
// احصل على الرسائل من الطابور في حلقة
for d := range msgs {
// طباعة محتوى الرسالة
log.Printf("تم استلام رسالة: %s", d.Body)
}
}