حالت صف ساده در RabbitMQ به زبان Golang
توضیحات: P نماینده تولیدکننده، C نماینده مصرفکننده و رنگ قرمز نماینده صف است.
توجه: اگر با RabbitMQ آشنا نیستید، لطفاً ابتدا بخش مفاهیم پایه RabbitMQ را بخوانید.
1. نصب وابستگیها
go get github.com/streadway/amqp
وارد کردن بسته وابسته
import (
"github.com/streadway/amqp"
)
2. ارسال پیامها
در مراحل زیر نشان داده شده است که تولیدکننده پیام چگونه پیام را ارسال میکند.
2.1. اتصال به سرور RabbitMQ
// اتصال به سرور RabbitMQ
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
defer conn.Close()
توضیح آدرس اتصال:
amqp://نامکاربری:رمزعبور@آدرسRabbitMQ:پورت/
2.2. ایجاد یک کانال
بیشتر عملیات بر روی کانال انجام میشود.
ch, err := conn.Channel()
defer ch.Close()
2.3. تعریف یک صف
نمایاند صفی که باید از آن بخوانیم یا به آن بنویسیم.
q, err := ch.QueueDeclare(
"hello", // نام صف
false, // ماندگاری پیام
false, // حذف صف هنگام عدم استفاده
false, // اختصاصی
false, // بدون انتظار
nil, // آرگومانها
)
2.4. ارسال پیامها
// محتوای پیام
body := "سلام دنیا!"
// ارسال پیام
err = ch.Publish(
"", // تبادل (در اینجا نادیده بگیرید)
q.Name, // پارامتر مسیریابی، استفاده از نام صف به عنوان پارامتر مسیریابی
false, // الزامی
false, // فوری
amqp.Publishing {
ContentType: "text/plain",
Body: []byte(body), // محتوای پیام
})
2.5. کد کامل برای ارسال پیامها
package main
// وارد کردن بستهها
import (
"log"
"github.com/streadway/amqp"
)
// بررسی خطاها
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}
func main() {
// اتصال به RabbitMQ
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "ارتباط با RabbitMQ برقرار نشد")
defer conn.Close()
// ایجاد یک کانال
ch, err := conn.Channel()
failOnError(err, "برای باز کردن یک کانال ناموفق بود")
defer ch.Close()
// تعریف صف برای عملیات
q, err := ch.QueueDeclare(
"hello", // نام
false, // ماندگار
false, // حذف هنگام عدم استفاده
false, // اختصاصی
false, // بدون انتظار
nil, // آرگومانها
)
failOnError(err, "تعریف یک صف ناموفق بود")
// محتوای پیام برای ارسال
body := "سلام دنیا!"
// ارسال پیام
err = ch.Publish(
"", // تبادل
q.Name, // کلید مسیریابی
false, // الزامی
false, // فوری
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(body),
})
failOnError(err, "ارسال پیام ناموفق بود")
log.Printf(" [x] ارسال شد %s", body)
}
۳. دریافت پیامها
سه مرحله اول دریافت پیامها مشابه ارسال پیامها است و به ترتیب با بخشهای ۲.۱، ۲.۲ و ۲.۳ مطابقت دارد. کد کامل برای دریافت پیامها به صورت زیر است:
package main
// وارد کردن پکیجها
import (
"log"
"github.com/streadway/amqp"
)
// رفع خطا
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}
func main() {
// برقراری ارتباط با RabbitMQ
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "ارتباط با RabbitMQ ناموفق بود")
defer conn.Close()
// ایجاد یک کانال
ch, err := conn.Channel()
failOnError(err, "ناموفق در باز کردن یک کانال")
defer ch.Close()
// اعلام صف مورد استفاده
q, err := ch.QueueDeclare(
"hello", // نام صف باید با نام صف برای ارسال پیامها مطابقت داشته باشد
false, // دائمی
false, // حذف کردن در صورت عدم استفاده
false, // اختصاصی
false, // بدون انتظار
nil, // آرگومانها
)
failOnError(err, "تعیین یک صف ناموفق بود")
// ایجاد یک مصرفکننده پیام
msgs, err := ch.Consume(
q.Name, // نام صف
"", // نام مصرفکننده، اگر پر نشود، یک شناسه یکتا به صورت خودکار تولید میشود
true, // آیا باید پیامها به صورت خودکار تأیید شوند؟ یعنی به طور خودکار به RabbitMQ اطلاع داده شود که پیام با موفقیت پردازش شده است
false, // اختصاصی
false, // بدون محلی
false, // بدون انتظار
nil, // آرگومانها
)
failOnError(err, "ثبت یک مصرفکننده ناموفق بود")
// دریافت پیامها از صف در یک حلقه
for d := range msgs {
// چاپ محتوای پیام
log.Printf("یک پیام دریافت شد: %s", d.Body)
}
}