गोलांग RabbitMQ पब्लिश/सब्सक्राइब पैटर्न (ब्रॉडकास्ट मोड, फैनआउट मोड)
RabbitMQ में पब्लिश/सब्सक्राइब पैटर्न का मतलब है कि जो प्रोड्यूसर द्वारा भेजी गई संदेश को कई उपभोक्ताओं द्वारा प्रसंस्कृत किया जाएगा।
व्याख्या:
- P निर्माता को प्रस्तुत करता है, C1 और C2 उपभोक्ताएँ प्रतिनिधित्व करती हैं, लाल रंग कताएँ को दर्शाता है, और X विनिमय को प्रतिनिधित्व करता है।
- विनिमय किसे सभी कताओं को आगे भेजने के लिए जिम्मेदार होता है।
- कई कताएँ परिभाषित की जा सकती हैं, जो सभी एक समान विनिमय से बंधी होती हैं।
- प्रत्येक क्यूओं में एक या एक से अधिक उपभोक्ताएँ हो सकती हैं।
नोट: अगर आप RabbitMQ से अनजान हैं, तो कृपया सबसे पहले RabbitMQ मूल अवधारणाएँ अनुभाग पढ़ें।
1. डिपेंडेंसी पैकेज स्थापित करें
go get github.com/streadway/amqp
2. संदेश भेजें
निम्नलिखित चरण संदेश निर्माता द्वारा संदेश भेजने को दर्शाते हैं।
2.1. RabbitMQ सर्वर से कनेक्ट करें
// RabbitMQ सर्वर से कनेक्ट करें
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
defer conn.Close()
कनेक्शन पता का व्याख्या:
amqp://उपयोगकर्तानाम:पासवर्ड@RabbitMQपता:पोर्ट/
2.2. एक चैनल बनाएँ
अधिकांश ऑपरेशन चैनल पर पूरे होते हैं।
ch, err := conn.Channel()
defer ch.Close()
2.3. विनिमय का घोषणा करें
संदेश सबसे पहले विनिमय को भेजा जाता है। विनिमय अपनी रणनीति के आधार पर संदेशों को कताओं को आगे भेजता है।
err = ch.ExchangeDeclare(
"tizi365", // विनिमय का नाम
"fanout", // विनिमय प्रकार, यहां फैनआउट प्रकार का उपयोग कर रहे हैं, अर्थात पब्लिश/सब्सक्राइब पैटर्न
true, // स्थायी
false, // स्वचालित हटाने
false, // आंतरिक
false, // कोई प्रतीक्षा नहीं
nil, // विवाद
)
2.4. एक संदेश प्रकाशित करें
// संदेश का विषय
body := "नमस्ते Tizi365.com!"
// संदेश प्रकाशित करें
err = ch.Publish(
"tizi365", // विनिमय (पूर्ववर्ती घोषणा के विनिमय का नाम)
"", // रूटिंग कुंजी, फैनआउट प्रकार विनिमय के लिए, रूटिंग कुंजी स्वचालित रूप से नजरअंदाज की जाती है, इसलिए इसे प्रदान करना आवश्यक नहीं है
false, // अनिवार्य
false, // तत्कालिक
amqp.Publishing {
ContentType: "text/plain", // संदेश सामग्री प्रकार, यहां यह सादा पाठ है
Body: []byte(body), // संदेश सामग्री
})
2.5. संदेश पुश कोड पूरा करें
package main
import (
"log"
"github.com/streadway/amqp"
)
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}
func main() {
// RabbitMQ से कनेक्ट करें
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "RabbitMQ से कनेक्ट करने में विफल रहा")
defer conn.Close()
// एक चैनल बनाएँ
ch, err := conn.Channel()
failOnError(err, "चैनल खोलने में विफल रहा")
defer ch.Close()
// एक विनिमय का घोषणा करें
err = ch.ExchangeDeclare(
"tizi365", // विनिमय का नाम
"fanout", // विनिमय प्रकार, प्रकाशन/सब्सक्राइब मोड के लिए फैनआउट
true, // स्थायी
false, // स्वचालित हटाने
false, // आंतरिक
false, // कोई प्रतीक्षा नहीं
nil, // विवाद
)
failOnError(err, "विनिमय का घोषणा करने में विफल रहा")
// संदेश का विषय
body := "नमस्ते Tizi365.com!"
// संदेश पुश करें
err = ch.Publish(
"tizi365", // विनिमय (घोषणा के समान विनिमय)
"", // रूटिंग कुंजी, फैनआउट प्रकार विनिमय के लिए रूटिंग कुंजी स्वचालित रूप से नजरअंदाज की जाती है
false, // अनिवार्य
false, // तत्कालिक
amqp.Publishing {
ContentType: "text/plain", // संदेश सामग्री प्रकार, यहां सादा पाठ है
Body: []byte(body), // संदेश सामग्री
})
log.Printf("सामग्री %s भेजी गई", body)
}
3. संदेश प्राप्त करें
संदेश प्राप्त करने के लिए पहले तीन कदम - RabbitMQ से कनेक्ट करना, एक चैनल बनाना, और विनिमय का घोषणा करना - संदेश भेजने के लिए समान हैं। उपरोक्त खंड 2.1, 2.2, और 2.3 का उल्लेख करें।
3.1. कतार घोषित करें
क्रियान्वित करने के लिए कतार घोषित करें
q, err := ch.QueueDeclare(
"", // कतार नाम, यदि निर्धारित नहीं किया गया हो, तो एक यादृच्छिक नाम जनरेट होगा
false, // स्थायी
false, // अनावश्यक होने पर मिटायें
true, // विशेषित
false, // प्रतीक्षा न करें
nil, // तर्क
)
3.2. कु नाम में कतार लगाएं
मैसेज प्राप्त करने के लिए कतार को ईक्सचेंज में लगाने की आवश्यकता होती है
err = ch.QueueBind(
q.Name, // कतार नाम
"", // रूटिंग की, फैनआउट प्रकार के ईक्सचेंज के लिए, रूटिंग की स्वचालित रूप से अनदेखा कर दी जाती है
"tizi365", // ईक्सचेंज नाम, संदेश भेजने वाले द्वारा परिभासित एक से मेल खाता होना चाहिए
false,
nil)
नोट: वास्तविक अनुप्रयोग में, हम N कतार डिफाइन कर सकते हैं, जो स्वरूप द्वारा फॉरवर्ड किए गए संदेश प्राप्त करने के लिए एक ही ईक्सचेंज से बांधी गई हैं। यहाँ प्रकाशित किए जाने वाले प्रदर्शन/छापो के पैटर्न का परिचायकरण होता है।
3.3. उपभोक्ता बनाएँ
msgs, err := ch.Consume(
q.Name, // पूर्वलिखित कतार नाम को जरोरत है
"", // उपभोक्ता नाम, यदि निर्धारित नहीं किया जाता है, तो स्वचालित रूप से उत्पन्न हो जाएगा
true, // स्वचालित रूप से स्वीकार करें कि संदेश को प्रसंस्करण किया गया है
false, // विशेषित
false, // स्थानीय
false, // प्रतीक्षा न करें
nil, // तर्क
)
// संदेश को संभालने के लिए लूप को हैंडल करना
for d := range msgs {
log.Printf("संदेश मिला=%s", d.Body)
}
3.4. उपभोक्ता कोड पूरा करें
package main
import (
"log"
"github.com/streadway/amqp"
)
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}
func main() {
// RabbitMQ से कनेक्ट करें
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "RabbitMQ से कनेक्ट करने में विफल रहा")
defer conn.Close()
// चैनल बनाएं, सामान्यत: एक प्रति उपभोक्ता
ch, err := conn.Channel()
failOnError(err, "चैनल खोलने में विफल रहा")
defer ch.Close()
// एक ईक्सचेंज घोषित करें
err = ch.ExchangeDeclare(
"tizi365", // ईक्सचेंज नाम, संदेश भेजने वाले द्वारा परिभासित एक से मेल खाता होना चाहिए
"fanout", // ईक्सचेंज प्रकार
true, // स्थायी
false, // स्वचालित मिटायें
false, // आंतरिक
false, // प्रतीक्षा न करें
nil, // तर्क
)
failOnError(err, "ईक्सचेंज घोषणा करने में विफल रहा")
// कतार घोषित करें क्रियान्वित करने के लिए
q, err := ch.QueueDeclare(
"", // कतार नाम, यदि रिक्त हो तो एक यादृच्छिक नाम जेनरेट हो जाएगा
false, // स्थायी
false, // अनावश्यक होने पर मिटायें
true, // विशेषित
false, // प्रतीक्षा न करें
nil, // तर्क
)
failOnError(err, "कतार घोषणा करने में विफल रहा")
// कतार को निर्दिष्ट ईक्सचेंज से बाँधे
err = ch.QueueBind(
q.Name, // कतार नाम
"", // रूटिंग की, फैनआउट ईक्सचेंज के लिए अनदेखा है
"tizi365", // ईक्सचेंज नाम, संदेश भेजने वाले द्वारा परिभासित एक से मेल खाता होना चाहिए
false,
nil)
failOnError(err, "कतार को बाँधने में विफल रहा")
// उपभोक्ता बनाएँ
msgs, err := ch.Consume(
q.Name, // पूर्वलिखित कतार नाम को जरूरत है
"", // उपभोक्ता नाम, रिक्त होगा तो स्वचालित रूप से जनरेट हो जाएगा
true, // स्वत: स्वीकृति
false, // विशेषित
false, // स्थानीय
false, // प्रतीक्षा न करें
nil, // तर्क
)
failOnError(err, "उपभोक्ता को पंजीकृत करने में विफल रहा")
// संदेश कोड को लूप में संभोधित करें
for d := range msgs {
log.Printf("संदेश मिला: %s", d.Body)
}
}
3.5. एकाधिक उपभोक्ताएँ
Work mode section को देखें और सिंपली गोरूटीन का उपयोग करके एकाधिक उपभोक्ताएँ शुरू करें।