Golang رابطہMQ پبلش/سبسکرائب پیٹرن (بین الاقوامی موڈ, فین آؤٹ موڈ)

رابطہMQ میں پبلش/سبسکرائب پیٹرن کا مطلب ہے کہ منتج کی طرف سے بھیجا گیا پیغام متعدد مصنوعات کی طرف سے پراسرار کیا جائے گا۔

فین آؤٹ موڈ

تشریح:

  • P منتج کو ظاہر کرتا ہے، C1 اور C2 صارفین، سرخ رنگ قیفیں کو ظاہر کرتے ہیں، اور X ایکسچینج کو ظاہر کرتا ہے۔
  • ایکسچینج پیغامات کو تمام قیفیں سے منسلک کرنے کے لئے ذمہ دار ہوتا ہے۔
  • متعدد قیفیں تعین کی جا سکتی ہیں، ہر ایکسچینج سے منسلک۔
  • ہر قیفی کے ایک یا مزید صارفین ہو سکتے ہیں۔

نوٹ: اگر آپ رابطہMQ سے واقف نہیں ہیں، تو براہ کرم پہلے رابطہMQ بنیادی تصورات سیکشن کو پڑھیں۔

1. تابعیت پیکیج کا انسٹال کریں

go get github.com/streadway/amqp

2. پیغامات بھیجیں

منتج کیسے پیغامات بھیجتا ہےآخر نیزشان کرتے ہیں۔

2.1. رابطہMQ سرور سے جڑنا

// رابطہMQ سرور سے جڑیں
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
defer conn.Close()

رابطہ کا پتہ بیان:

amqp://صارف_نام:پاسورڈ@رابطہMQ_پتہ:پورٹ/

2.2. چینل تخلیق کریں

اکثر آپریشنز چینل پر مکمل ہوتے ہیں۔

ch, err := conn.Channel()
defer ch.Close()

2.3. ایکسچینج کا اعلان کریں

پیغامات سب سے پہلے ایکسچینج کو بھیجے جاتے ہیں۔ ایکسچینج اپنی استریٹیجی کے مطابق پیغامات قیفیںوں کو فاروارد کرتا ہے۔

err = ch.ExchangeDeclare(
	"tizi365",   // ایکسچینج کا نام
	"fanout", // ایکسچینج کی قسم، یہاں فین آؤٹ ٹائپ استعمال ہو رہا ہے، یعنی، پبلش/سبسکرائب پیٹرن
	true,     // محفوظ
	false,    // خود بخود حذف کردہ
	false,    // داخلی
	false,    // بغیر انتظار
	nil,      // تحریرات
)

2.4. ایک پیغام شائع کریں

// پیغام مواد
body := "ہیلو Tizi365.com!"

// پیغام شائع کریں
err = ch.Publish(
  "tizi365",     // ایکسچینج (پچھلے اعلان کا نام)
  "", // راؤٹنگ کی کلید، فین آؤٹ ٹائپ ایکسچینج کے لئے، راؤٹنگ کی کلید خود بخود نظرانداز کر دی جاتی ہے، لہذا اسے فراہم کرنا ضروری نہیں ہے
  false,  // ضروری
  false,  // فوراً
  amqp.Publishing {
    ContentType: "ٹیکسٹ/plain", // پیغام مواد کی قسم، یہاں صاف مواد ہے
    Body:        []byte(body),  // پیغام مواد
  })

2.5. پیغام فراہمی کوڈ مکمل کریں

package main

import (
	"log"
	"github.com/streadway/amqp"
)

func failOnError(err error, msg string) {
	if err != nil {
		log.Fatalf("%s: %s", msg, err)
	}
}

func main() {
	// رابطہ رابطہMQ سے
	conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
	failOnError(err, "رابطہ رابطہMQ سے جڑنے میں ناکام رہا")
	defer conn.Close()

	// ایک چینل تخلیق کریں
	ch, err := conn.Channel()
	failOnError(err, "چینل کھولنے میں ناکام رہا")
	defer ch.Close()

	// ایک ایکسچینج کا اعلان کریں
	err = ch.ExchangeDeclare(
		"tizi365",   // ایکسچینج کا نام
		"fanout", // ایکسچینج کی قسم، پبلش/سبسکرائب موڈ کے لئے فین آؤٹ
		true,     // محفوظ
		false,    // خود بخود حذف کردہ
		false,    // داخلی
		false,    // بغیر انتظار
		nil,      // تحریرات
	)
	failOnError(err, "ایکسچینج کا اعلان کرنے میں ناکام رہا")

	// پیغام مواد
	body := "ہیلو Tizi365.com!"
	// پیغام فراہمی
	err = ch.Publish(
		"tizi365",     // ایکسچینج (پچھلے اعلان کے مطابق)
		"", // راؤٹنگ کی کلید، فین آؤٹ ٹائپ ایکسچینج کے لئے راؤٹنگ کی کلید خود بخود نظرانداز کر دی جاتی ہے
		false,  // ضروری
		false,  // فوراً
		amqp.Publishing {
			ContentType: "ٹیکسٹ/plain", // پیغام مواد کی قسم، یہاں صاف مواد ہے
			Body:        []byte(body),  // پیغام مواد
		})

	log.Printf("شائع مواد %s", body)
}

3. پیغامات کو وصول کریں

پیغامات کو وصول کرنے کے لئے پہلے تین اقدامات—رابطہ رابطہMQ سے جڑنا، چینل تخلیق کرنا، اور ایکسچینج کا اعلان کرنا—پیغامات بھیجنے کے لئے ہیں۔ پہلے سکشن 2.1، 2.2، اور 2.3 سے رجوع کریں۔

3.1. قیو قیمت کا اعلان کریں

قطار کا اعلان کریں جس پر عمل کیا جائے گا

q، خطا := ch.QueueDeclare(
		""،    // قطار کا نام، اگر وضاحت نہ کی گئی ہو تو ایک بے ترتیب نام پیش کیا جائے گا
		غلط،   // مستحکم
		غلط،   // بغیر استعمال کرنے کے حذف کریں
		درست،  // خصوصی
		غلط،   // انتظار نہیں
		nil،   // جھگڑے
	)

3.2. قطار کو ایکسچینج سے مبند کریں

پیغامات ملنے کے لئے قطار کو ایکسچینج سے مبند کرنا ضروری ہوتا ہے

خطا = ch.QueueBind(
		q.Name، // Queue name
		""،     // راستہ کلید، فین آؤٹ ٹائپ ایکسچینجز کے لئے راستہ کلید خود بخود نظرانداز کیا جاتا ہے
		"tizi365"، // ایکسچینج کا نام، جو پیغام بھیجنے والے کی طرف سے تعین کیا گیا ہو
		غلط،
		nil)

نوٹ: واقعی استعمال میں، ہم N قطاریں تعین کرسکتے ہیں، ہر ایک ایکسچینج سے ملنی چاہئیں، تاکہ ایکسچینج دوارہ بھیجے جانے والے پیغامات کو موصول کر سکیں۔ یہاں پر پبلش/سبسکرائب پیٹرن کو عکس الوضع میں دکھایا گیا ہے۔

3.3. ایک کنسومر بنائیں

msgs، خطا := ch.Consume(
		q.Name، // پہلے سے قطار کا نام حوالہ دیں
		""،     // کنسومر کا نام، اگر مخصوص نہیں کیا گیا ہو تو خود بخود پیدا ہو جائے گا
		درست،   // خود بخود تسلیم کریں کہ پیغام کا عمل کر لیے گیا ہے
		غلط،  // خصوصی
		غلط،  // مقامی نہیں
		غلط،  // انتظار نہیں
		nil،    // جھگڑے
	)
	
// پیغامات کو ہینڈل کرنے کے لئے لوپ
for d := range msgs {
	log.Printf("موصول ہونے والا پیغام=%s", d.Body)
}

3.4. کنسومر کوڈ مکمل کریں

package main

import (
	"log"

	"github.com/streadway/amqp"
)

func failOnError(err error, msg string) {
	if err != nil {
		log.Fatalf("%s: %s", msg, err)
	}
}

func main() {
	// ریبٹ ایم Queue کنکٹ کریں
	conn، خطا := amqp.Dial("amqp://guest:guest@localhost:5672/")
	failOnError(خطا، "ریبٹ Queue کے ساتھ منسلک نہیں ہو سکی")
	defer conn.Close()

	// ایک چینل تخلیق کریں، عام طور پر ہر کنسومر کے لئے ایک
	ch، خطا := conn.Channel()
	failOnError(خطا، "چینل کھولنے میں ناکامی")
	defer ch.Close()

	// ایک ایکسچینج کا اعلان کریں
	خطا = ch.ExchangeDeclare(
		"tizi365"،   // ایکسچینج کا نام، پیغام بھیجنے والے کے استعمال کیا گیا ہونا چاہئیے
		"fanout"، // ایکسچینج کی قسم
		درست،     // مستحکم
		غلط،    // خود بخود حذف
		غلط،    // بین الاخرہ
		غلط،    // انتظار نہیں
		nil،      // جھگڑے
	)
	failOnError(خطا، "ایکسچینج کا اعلان کرنے میں ناکامی")

	// قطار کا اعلان کریں جس پر عمل کیا جائے گا
	q، خطا := ch.QueueDeclare(
		""،    // قطار کا نام، اگر خالی ہو تو ایک بے ترتیب نام پیش کیا جائے گا
		غلط،   // مستحکم
		غلط،   // بغیر استعمال کرنے کے حذف کریں
		درست،  // خصوصی
		غلط،   // انتظار نہیں
		nil،   // جھگڑے
	)
	failOnError(خطا، "قطار کا اعلان کرنے میں ناکامی")

	// قطار کو مخصوص ایکسچینج سے ملایں
	خطا = ch.QueueBind(
		q.Name، // Queue name
		""،     // Routing key, fanout ایکسچینجس کے لئے نظرانداز کیا گیا
		"tizi365"، // ایکسچینج کا نام، پیغام بھیجنے والے کی طرف سے تعین کیا گیا ہو
		غلط،
		nil)
	failOnError(خطا، "قطار کو باند کرنے میں ناکامی")

	// ایک کنسومر تخلیق کریں
	msgs، خطا := ch.Consume(
		q.Name، // پیشاپیش نامی قطار کا حوالہ
		""،     // کنسومر کا نام، خود بخود تخلیق ہوگا اگر خالی ہو
		درست،   // خود بخود تسلیم
		غلط،  // خصوصی
		غلط،  // مقامی نہیں
		غلط،  // انتظار نہیں
		nil،    // جھگڑے
	)
	failOnError(خطا، "کنسومر رجسٹر کرنے میں ناکامی")

	// قطار سے پیغامات کو لوپ میں استعمال کرنا
	for d := range msgs {
		log.Printf("موصول ہونے والا پیغام: %s", d.Body)
	}
}

3.5. متعدد کنسومرز

ورک موڈ سیکشن کا حوالہ دیکھیں اور صرف گورٹینز کا استعمال کرکے متعدد کنسومرز کو شروع کریں۔