गोलांग RabbitMQ पब्लिश/सब्सक्राइब पैटर्न (ब्रॉडकास्ट मोड, फैनआउट मोड)

RabbitMQ में पब्लिश/सब्सक्राइब पैटर्न का मतलब है कि जो प्रोड्यूसर द्वारा भेजी गई संदेश को कई उपभोक्ताओं द्वारा प्रसंस्कृत किया जाएगा।

फैनआउट मोड

व्याख्या:

  • P निर्माता को प्रस्तुत करता है, C1 और C2 उपभोक्ताएँ प्रतिनिधित्व करती हैं, लाल रंग कताएँ को दर्शाता है, और X विनिमय को प्रतिनिधित्व करता है।
  • विनिमय किसे सभी कताओं को आगे भेजने के लिए जिम्मेदार होता है।
  • कई कताएँ परिभाषित की जा सकती हैं, जो सभी एक समान विनिमय से बंधी होती हैं।
  • प्रत्येक क्यूओं में एक या एक से अधिक उपभोक्ताएँ हो सकती हैं।

नोट: अगर आप RabbitMQ से अनजान हैं, तो कृपया सबसे पहले RabbitMQ मूल अवधारणाएँ अनुभाग पढ़ें।

1. डिपेंडेंसी पैकेज स्थापित करें

go get github.com/streadway/amqp

2. संदेश भेजें

निम्नलिखित चरण संदेश निर्माता द्वारा संदेश भेजने को दर्शाते हैं।

2.1. RabbitMQ सर्वर से कनेक्ट करें

// RabbitMQ सर्वर से कनेक्ट करें
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
defer conn.Close()

कनेक्शन पता का व्याख्या:

amqp://उपयोगकर्तानाम:पासवर्ड@RabbitMQपता:पोर्ट/

2.2. एक चैनल बनाएँ

अधिकांश ऑपरेशन चैनल पर पूरे होते हैं।

ch, err := conn.Channel()
defer ch.Close()

2.3. विनिमय का घोषणा करें

संदेश सबसे पहले विनिमय को भेजा जाता है। विनिमय अपनी रणनीति के आधार पर संदेशों को कताओं को आगे भेजता है।

err = ch.ExchangeDeclare(
	"tizi365",   // विनिमय का नाम
	"fanout", // विनिमय प्रकार, यहां फैनआउट प्रकार का उपयोग कर रहे हैं, अर्थात पब्लिश/सब्सक्राइब पैटर्न
	true,     // स्थायी
	false,    // स्वचालित हटाने
	false,    // आंतरिक
	false,    // कोई प्रतीक्षा नहीं
	nil,      // विवाद
)

2.4. एक संदेश प्रकाशित करें

// संदेश का विषय
body := "नमस्ते Tizi365.com!"

// संदेश प्रकाशित करें
err = ch.Publish(
  "tizi365",     // विनिमय (पूर्ववर्ती घोषणा के विनिमय का नाम)
  "", // रूटिंग कुंजी, फैनआउट प्रकार विनिमय के लिए, रूटिंग कुंजी स्वचालित रूप से नजरअंदाज की जाती है, इसलिए इसे प्रदान करना आवश्यक नहीं है
  false,  // अनिवार्य
  false,  // तत्कालिक
  amqp.Publishing {
    ContentType: "text/plain", // संदेश सामग्री प्रकार, यहां यह सादा पाठ है
    Body:        []byte(body),  // संदेश सामग्री
  })

2.5. संदेश पुश कोड पूरा करें

package main

import (
	"log"
	"github.com/streadway/amqp"
)

func failOnError(err error, msg string) {
	if err != nil {
		log.Fatalf("%s: %s", msg, err)
	}
}

func main() {
	// RabbitMQ से कनेक्ट करें
	conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
	failOnError(err, "RabbitMQ से कनेक्ट करने में विफल रहा")
	defer conn.Close()

	// एक चैनल बनाएँ
	ch, err := conn.Channel()
	failOnError(err, "चैनल खोलने में विफल रहा")
	defer ch.Close()

	// एक विनिमय का घोषणा करें
	err = ch.ExchangeDeclare(
		"tizi365",   // विनिमय का नाम
		"fanout", // विनिमय प्रकार, प्रकाशन/सब्सक्राइब मोड के लिए फैनआउट
		true,     // स्थायी
		false,    // स्वचालित हटाने
		false,    // आंतरिक
		false,    // कोई प्रतीक्षा नहीं
		nil,      // विवाद
	)
	failOnError(err, "विनिमय का घोषणा करने में विफल रहा")

	// संदेश का विषय
	body := "नमस्ते Tizi365.com!"
	// संदेश पुश करें
	err = ch.Publish(
		"tizi365",     // विनिमय (घोषणा के समान विनिमय)
		"", // रूटिंग कुंजी, फैनआउट प्रकार विनिमय के लिए रूटिंग कुंजी स्वचालित रूप से नजरअंदाज की जाती है
		false,  // अनिवार्य
		false,  // तत्कालिक
		amqp.Publishing {
			ContentType: "text/plain", // संदेश सामग्री प्रकार, यहां सादा पाठ है
			Body:        []byte(body),  // संदेश सामग्री
		})

	log.Printf("सामग्री %s भेजी गई", body)
}

3. संदेश प्राप्त करें

संदेश प्राप्त करने के लिए पहले तीन कदम - RabbitMQ से कनेक्ट करना, एक चैनल बनाना, और विनिमय का घोषणा करना - संदेश भेजने के लिए समान हैं। उपरोक्त खंड 2.1, 2.2, और 2.3 का उल्लेख करें।

3.1. कतार घोषित करें

क्रियान्वित करने के लिए कतार घोषित करें

q, err := ch.QueueDeclare(
		"",    // कतार नाम, यदि निर्धारित नहीं किया गया हो, तो एक यादृच्छिक नाम जनरेट होगा
		false, // स्थायी
		false, // अनावश्यक होने पर मिटायें
		true,  // विशेषित
		false, // प्रतीक्षा न करें
		nil,   // तर्क
	)

3.2. कु नाम में कतार लगाएं

मैसेज प्राप्त करने के लिए कतार को ईक्सचेंज में लगाने की आवश्यकता होती है

err = ch.QueueBind(
		q.Name, // कतार नाम
		"",     // रूटिंग की, फैनआउट प्रकार के ईक्सचेंज के लिए, रूटिंग की स्वचालित रूप से अनदेखा कर दी जाती है
		"tizi365", // ईक्सचेंज नाम, संदेश भेजने वाले द्वारा परिभासित एक से मेल खाता होना चाहिए
		false,
		nil)

नोट: वास्तविक अनुप्रयोग में, हम N कतार डिफाइन कर सकते हैं, जो स्वरूप द्वारा फॉरवर्ड किए गए संदेश प्राप्त करने के लिए एक ही ईक्सचेंज से बांधी गई हैं। यहाँ प्रकाशित किए जाने वाले प्रदर्शन/छापो के पैटर्न का परिचायकरण होता है।

3.3. उपभोक्ता बनाएँ

msgs, err := ch.Consume(
		q.Name, // पूर्वलिखित कतार नाम को जरोरत है
		"",     // उपभोक्ता नाम, यदि निर्धारित नहीं किया जाता है, तो स्वचालित रूप से उत्पन्न हो जाएगा
		true,   // स्वचालित रूप से स्वीकार करें कि संदेश को प्रसंस्करण किया गया है
		false,  // विशेषित
		false,  // स्थानीय
		false,  // प्रतीक्षा न करें
		nil,    // तर्क
	)
	
// संदेश को संभालने के लिए लूप को हैंडल करना
for d := range msgs {
	log.Printf("संदेश मिला=%s", d.Body)
}

3.4. उपभोक्ता कोड पूरा करें

package main

import (
	"log"

	"github.com/streadway/amqp"
)

func failOnError(err error, msg string) {
	if err != nil {
		log.Fatalf("%s: %s", msg, err)
	}
}

func main() {
	// RabbitMQ से कनेक्ट करें
	conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
	failOnError(err, "RabbitMQ से कनेक्ट करने में विफल रहा")
	defer conn.Close()

	// चैनल बनाएं, सामान्यत: एक प्रति उपभोक्ता
	ch, err := conn.Channel()
	failOnError(err, "चैनल खोलने में विफल रहा")
	defer ch.Close()

	// एक ईक्सचेंज घोषित करें
	err = ch.ExchangeDeclare(
		"tizi365",   // ईक्सचेंज नाम, संदेश भेजने वाले द्वारा परिभासित एक से मेल खाता होना चाहिए
		"fanout", // ईक्सचेंज प्रकार
		true,     // स्थायी
		false,    // स्वचालित मिटायें
		false,    // आंतरिक
		false,    // प्रतीक्षा न करें
		nil,      // तर्क
	)
	failOnError(err, "ईक्सचेंज घोषणा करने में विफल रहा")

	// कतार घोषित करें क्रियान्वित करने के लिए
	q, err := ch.QueueDeclare(
		"",    // कतार नाम, यदि रिक्त हो तो एक यादृच्छिक नाम जेनरेट हो जाएगा
		false, // स्थायी
		false, // अनावश्यक होने पर मिटायें
		true,  // विशेषित
		false, // प्रतीक्षा न करें
		nil,   // तर्क
	)
	failOnError(err, "कतार घोषणा करने में विफल रहा")

	// कतार को निर्दिष्ट ईक्सचेंज से बाँधे
	err = ch.QueueBind(
		q.Name, // कतार नाम
		"",     // रूटिंग की, फैनआउट ईक्सचेंज के लिए अनदेखा है
		"tizi365", // ईक्सचेंज नाम, संदेश भेजने वाले द्वारा परिभासित एक से मेल खाता होना चाहिए
		false,
		nil)
	failOnError(err, "कतार को बाँधने में विफल रहा")

	// उपभोक्ता बनाएँ
	msgs, err := ch.Consume(
		q.Name, // पूर्वलिखित कतार नाम को जरूरत है
		"",     // उपभोक्ता नाम, रिक्त होगा तो स्वचालित रूप से जनरेट हो जाएगा
		true,   // स्वत: स्वीकृति
		false,  // विशेषित
		false,  // स्थानीय
		false,  // प्रतीक्षा न करें
		nil,    // तर्क
	)
	failOnError(err, "उपभोक्ता को पंजीकृत करने में विफल रहा")

	// संदेश कोड को लूप में संभोधित करें
	for d := range msgs {
		log.Printf("संदेश मिला: %s", d.Body)
	}
}

3.5. एकाधिक उपभोक्ताएँ

Work mode section को देखें और सिंपली गोरूटीन का उपयोग करके एकाधिक उपभोक्ताएँ शुरू करें।