Golang তে RabbitMQ এ পাবলিশ/সাবস্ক্রাইব প্যাটার্ন (ব্রডকাস্ট মোড, ফ্যানআউট মোড)
RabbitMQ এ পাবলিশ/সাবস্ক্রাইব প্যাটার্ন বুঝাচ্ছে যে প্রোডিউসার দ্বারা প্রেরিত কোন মেসেজটি একাধিক কনসিউমার দ্বারা প্রসেস করা হবে।
ব্যাখ্যা:
- P প্রোডিউসারকে প্রদর্শন করে, C1 এবং C2 কনসিউমারদের প্রদর্শন করে, লাল ডিনামিক জড়িত কিউগুলিকে প্রদর্শন করে, এবং X এক্সচেঞ্জকে প্রদর্শন করে।
- এক্সচেঞ্জটি মেসেজগুলি সকল কিউগুলিতে ফরোয়ার্ড করার জন্য দায়বদ্ধ।
- একাধিক কিউ সংজ্ঞায়িত করা যেতে পারে, প্রতিটি এক্সচেঞ্জে জড়িত সমস্ত কিউগুলি।
- প্রতি কিউতে এক বা একাধিক কনসিউমার থাকতে পারে।
নোট: যদি আপনি RabbitMQ এ পরিচিত না হন, তবে প্রথমে RabbitMQ এর মৌলিক ধারণাগুলি বিভাগটি পড়ুন।
1. ডিপেন্ডেন্সি প্যাকেজ ইনস্টল করুন
go get github.com/streadway/amqp
2. মেসেজ প্রেরণ করুন
নিম্নলিখিত ধাপগুলি আপনাকে দেখাচ্ছে যেভাবে মেসেজ প্রোডিউসারটি মেসেজ প্রেরণ করে।
2.1. RabbitMQ সার্ভারে সংযুক্ত হোন
// RabbitMQ সার্ভারে সংযুক্ত হোন
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
defer conn.Close()
সংযোগের ঠিকানার ব্যাখ্যা:
amqp://ব্যবহারকারীরনাম:পাসওয়ার্ড@RabbitMQঠিকানা:পোর্ট/
2.2. একটি চ্যানেল তৈরি করুন
সবগুলি অপারেশন চ্যানেলে সম্পন্ন করা হয়।
ch, err := conn.Channel()
defer ch.Close()
2.3. একটি এক্সচেঞ্জ ঘোষণা করুন
মেসেজগুলি প্রথমে এক্সচেঞ্জে প্রেরণ করা হয়। এক্সচেঞ্জটি তার রণনীয়তা অনুযায়ী কিউগুলিতে মেসেজগুলি প্রেরণ করে।
err = ch.ExchangeDeclare(
"tizi365", // এক্সচেঞ্জের নাম
"fanout", // এক্সচেঞ্জের ধরণ, এখানে ফ্যানআউট ধরণ ব্যাবহার করা হয়, অর্থাৎ, প্রকাশ/সাবস্ক্রাইব প্যাটার্ন
true, // ট্রেইনসিয়েন্ট
false, // অটো-মুছে দেওয়া হবে
false, // অন্তর্নিহিত
false, // কোনও অপেক্ষা না করুন
false, // কোনও তাত্পর্য না রাখা
nil, // আর্গুমেন্ট
)
2.4. একটি মেসেজ প্রকাশ করুন
// মেসেজের বিষয়বস্তু
body := "হ্যালো Tizi365.com!"
// মেসেজ প্রকাশ করুন
err = ch.Publish(
"tizi365", // এক্সচেঞ্জ (উপরোক্ত ঘোষণা অনুরূপ এক্সচেঞ্জের নাম)
"", // রাউটিং কী, ফ্যানআউট ধরণের এক্সচেঞ্জের জন্য, রাউটিং কীটি স্বয়ংক্রিয়ভাবে বাদ দেওয়া হয়, তাই এটি প্রদান করতে প্রয়োজন নেই
false, // অবশ্যই
false, // তাত্পর্য
amqp.Publishing {
ContentType: "প্লেইন টেক্সট", // মেসেজের বিষয়বস্তু ধরণ, এখানে প্লেইন টেক্সট
Body: []byte(body), // মেসেজের বিষয়বস্তু
})
2.5. মেসেজ পুশ কোড সম্পূর্ণ করুন
package main
import (
"log"
"github.com/streadway/amqp"
)
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}
func main() {
// Rabbitmq সংযোগ করুন
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "RabbitMQ সার্ভারে সংযোগ করা যাচ্ছে না")
defer conn.Close()
// একটি চ্যানেল তৈরি করুন
ch, err := conn.Channel()
failOnError(err, "একটি চ্যানেল খুলা যাচ্ছে না")
defer ch.Close()
// একটি এক্সচেঞ্জ ঘোষণা করুন
err = ch.ExchangeDeclare(
"tizi365", // এক্সচেঞ্জের নাম
"fanout", // এক্সচেঞ্জের ধরণ, প্রকাশ/সাবস্ক্রাইব মোডের জন্য ফ্যানআউট
true, // ট্রেনসিয়েন্ট
false, // অটো-মুছে দেওয়া হবে
false, // অন্তর্নিহিত
false, // কোনও অপেক্ষা না করুন
nil, // আর্গুমেন্ট
)
failOnError(err, "একটি এক্সচেঞ্জ ঘোষণা করা যাচ্ছে না")
// মেসেজের বিষয়বস্তু
body := "হ্যালো Tizi365.com!"
// মেসেজ পুশ
err = ch.Publish(
"tizi365", // এক্সচেঞ্জ (উপরোক্ত ঘোষণা অনুরূপ)
"", // রাউটিং কী, ফ্যানআউট ধরণের এক্সচেঞ্জের রাউটিং কীটি স্বয়ংক্রিয়ভাবে বাদ
false, // অবশ্যই
false, // তাত্পর্য
amqp.Publishing {
ContentType: "প্লেইন টেক্সট", // মেসেজের বিষয়বস্তু ধরণ, এটি প্লেইন টেক্সট
Body: []byte(body), // মেসেজের বিষয়বস্তু
})
log.Printf("প্রেরিত বিষয়বস্তু %s", body)
}
3. মেসেজ গ্রহণ করুন
মেসেজ গ্রহণ করার জন্য প্রথম তিনটি ধাপ—RabbitMQ এ সংযোগ করা, একটি চ্যানেল তৈরি করা এবং একটি এক্সচেঞ্জ ঘোষণা করা—মেসেজ প্রেরণের জন্য সমান। আপনার ধন্যবাদে পূর্বে 2.1, 2.2 এবং 2.3 ধারণা দেখুন।
3.1. একটি কিউ ঘোষণা করুন
q, err := ch.QueueDeclare(
"", // কিউ নাম, যদি স্পষ্টভাবে উল্লেখ না করা হয়, তবে একটি এলাম নম্বর স্বয়ংক্রিয়ভাবে উৎপন্ন হবে
false, // Durable
false, // অব্যবহৃত হতে সরিয়ে দিতে
true, // Exclusive
false, // অপেক্ষা না করতে
nil, // Arguments
)
3.2. কিউটি এক্সচেঞ্জে বাঁধুন
বার্তা পেতে কিউটির এক্সচেঞ্জে বাঁধা লাগলে
err = ch.QueueBind(
q.Name, // কিউ নাম
"", // রাউটিং কি, ফ্যানআউট প্রকারের এক্সচেঞ্জের জন্য, রাউটিং কি স্বয়ংক্রিয়ভাবে উপেক্ষা করা হয়
"tizi365", // এক্সচেঞ্জ নাম, বার্তা পাঠানোর সময় যেহেতু প্রেরকটি সেট করেছে সেইটির সাথে মিলতে হবে
false,
nil)
লক্ষ্য: বাস্তব অ্যাপ্লিকেশনে, আমরা নির্ধারিত এক্সচেঞ্জের সাথে বাঁধা লাগানোর জন্য N কিউ সংজ্ঞায়িত করতে পারি, যাতে এক্সচেঞ্জ দ্বারা প্রেরিত বার্তা পেতে। এটি যেখানে প্রকাশ / সাবস্ক্রাইব প্যাটার্নের প্রতিবিম্বন হয়।
3.3. একটি কনসিউমার তৈরি করুন
msgs, err := ch.Consume(
q.Name, // উপরের কিউ নাম থেকে উল্লেখ করা
"", // কনসিউমার নাম, যদি স্পষ্টভাবে উল্লেখ না করা হয়, তবে স্বয়ংক্রিয়ভাবে উৎপন্ন হবে
true, // স্বয়ংক্রিয়ভাবে অবহিত করে দেওয়া হয়েছে যে বার্তা প্রসেস করা হয়েছে
false, // Exclusive
false, // অস্থায়ী
false, // অপেক্ষা না করতে
nil, // Args
)
// বার্তা নিয়ে হ্যান্ডেল করতে লুপ
for d := range msgs {
log.Printf("বার্তা পেয়েছি=%s", d.Body)
}
3.4. কনসিউমার কোড পূর্ণ করুন
package main
import (
"log"
"github.com/streadway/amqp"
)
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}
func main() {
// RabbitMQ সাথে সংযোগ করুন
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "RabbitMQ সাথে সংযোগ স্থাপন করতে ব্যর্থ হলো")
defer conn.Close()
// একটি চ্যানেল তৈরি করুন, সাধারণত প্রতি কনসিউমার একটি চ্যানেল
ch, err := conn.Channel()
failOnError(err, "একটি চ্যানেল খোলা ব্যর্থ হলো")
defer ch.Close()
// একটি এক্সচেঞ্জ ঘোষণা করুন
err = ch.ExchangeDeclare(
"tizi365", // এক্সচেঞ্জ নাম, প্রেরক দ্বারা ব্যবহৃত সেট করা এক্সচেঞ্জের সাথে মিলতে হবে
"fanout", // এক্সচেঞ্জ প্রকার
true, // ট্রেন্সিয়েন্ট
false, // অটো-মুছে ফেলা
false, // ইন্টারনাল
false, // অপেক্ষা না করতে
nil, // Arguments
)
failOnError(err, "একটি এক্সচেঞ্জ ঘোষণা করতে ব্যর্থ হলো")
// ঘোষণা করুন কিউ জন্য অপারেশন করার জন্য
q, err := ch.QueueDeclare(
"", // কিউ নাম, যদি খালি হয়, তবে একটি এলাম নম্বর স্বয়ংক্রিয়ভাবে উৎপন্ন হবে
false, // Durable
false, // অব্যবহৃত হতে সরিয়ে দিতে
true, // Exclusive
false, // অপেক্ষা না করতে
nil, // Arguments
)
failOnError(err, "একটি কিউ ঘোষণা করতে ব্যর্থ হলো")
// কিউটি নির্দিষ্ট এক্সচেঞ্জে বাঁধান
err = ch.QueueBind(
q.Name, // কিউ নাম
"", // রাউটিং কি, ফ্যানআউট এক্সচেঞ্জের জন্য উপেক্ষা করা হয়
"tizi365", // এক্সচেঞ্জ নাম, বার্তা প্রেরকটি দ্বারা সংজ্ঞায়িত করা এমনটির সাথে মিলতে হবে
false,
nil)
failOnError(err, "একটি কিউ বাঁধানোর জন্য ব্যর্থ হলো")
// একটি কনসিউমার তৈরি করুন
msgs, err := ch.Consume(
q.Name, // আগের কিউ নাম সম্প্রসারণ করুন
"", // কনসিউমার নাম, যদি খালি হয়, তবে স্বয়ংক্রিয়ভাবে উৎপন্ন হবে
true, // অটো-যাচাই
false, // সাবস্ক্রাইব করার জন্য অস্থায়ী
false, // স্থানীয় না
false, // অপেক্ষা না করতে
nil, // Args
)
failOnError(err, "একটি কনসিউমার নিবন্ধন করতে ব্যর্থ হলো")
// কিউ থেকে বার্তা নোটিফাই করার জন্য একটি লুপে বার্তা কাজ করুন
for d := range msgs {
log.Printf("বার্তা পেয়েছি: %s", d.Body)
}
}
3.5. একাধিক কনসিউমার
Work mode section দেখুন এবং গরুটিন ব্যবহার করে সহজভাবে একাধিক কনসিউমার চালু করুন।