Golang তে RabbitMQ এ পাবলিশ/সাবস্ক্রাইব প্যাটার্ন (ব্রডকাস্ট মোড, ফ্যানআউট মোড)

RabbitMQ এ পাবলিশ/সাবস্ক্রাইব প্যাটার্ন বুঝাচ্ছে যে প্রোডিউসার দ্বারা প্রেরিত কোন মেসেজটি একাধিক কনসিউমার দ্বারা প্রসেস করা হবে।

ফ্যানআউট মোড

ব্যাখ্যা:

  • P প্রোডিউসারকে প্রদর্শন করে, C1 এবং C2 কনসিউমারদের প্রদর্শন করে, লাল ডিনামিক জড়িত কিউগুলিকে প্রদর্শন করে, এবং X এক্সচেঞ্জকে প্রদর্শন করে।
  • এক্সচেঞ্জটি মেসেজগুলি সকল কিউগুলিতে ফরোয়ার্ড করার জন্য দায়বদ্ধ।
  • একাধিক কিউ সংজ্ঞায়িত করা যেতে পারে, প্রতিটি এক্সচেঞ্জে জড়িত সমস্ত কিউগুলি।
  • প্রতি কিউতে এক বা একাধিক কনসিউমার থাকতে পারে।

নোট: যদি আপনি RabbitMQ এ পরিচিত না হন, তবে প্রথমে RabbitMQ এর মৌলিক ধারণাগুলি বিভাগটি পড়ুন।

1. ডিপেন্ডেন্সি প্যাকেজ ইনস্টল করুন

go get github.com/streadway/amqp

2. মেসেজ প্রেরণ করুন

নিম্নলিখিত ধাপগুলি আপনাকে দেখাচ্ছে যেভাবে মেসেজ প্রোডিউসারটি মেসেজ প্রেরণ করে।

2.1. RabbitMQ সার্ভারে সংযুক্ত হোন

// RabbitMQ সার্ভারে সংযুক্ত হোন
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
defer conn.Close()

সংযোগের ঠিকানার ব্যাখ্যা:

amqp://ব্যবহারকারীরনাম:পাসওয়ার্ড@RabbitMQঠিকানা:পোর্ট/

2.2. একটি চ্যানেল তৈরি করুন

সবগুলি অপারেশন চ্যানেলে সম্পন্ন করা হয়।

ch, err := conn.Channel()
defer ch.Close()

2.3. একটি এক্সচেঞ্জ ঘোষণা করুন

মেসেজগুলি প্রথমে এক্সচেঞ্জে প্রেরণ করা হয়। এক্সচেঞ্জটি তার রণনীয়তা অনুযায়ী কিউগুলিতে মেসেজগুলি প্রেরণ করে।

err = ch.ExchangeDeclare(
	"tizi365",   // এক্সচেঞ্জের নাম
	"fanout", // এক্সচেঞ্জের ধরণ, এখানে ফ্যানআউট ধরণ ব্যাবহার করা হয়, অর্থাৎ, প্রকাশ/সাবস্ক্রাইব প্যাটার্ন
	true,     // ট্রেইনসিয়েন্ট
	false,    // অটো-মুছে দেওয়া হবে
	false,    // অন্তর্নিহিত
	false,    // কোনও অপেক্ষা না করুন
	false,    // কোনও তাত্পর্য না রাখা
	nil,      // আর্গুমেন্ট
)

2.4. একটি মেসেজ প্রকাশ করুন

// মেসেজের বিষয়বস্তু
body := "হ্যালো Tizi365.com!"

// মেসেজ প্রকাশ করুন
err = ch.Publish(
  "tizi365",     // এক্সচেঞ্জ (উপরোক্ত ঘোষণা অনুরূপ এক্সচেঞ্জের নাম)
  "", // রাউটিং কী, ফ্যানআউট ধরণের এক্সচেঞ্জের জন্য, রাউটিং কীটি স্বয়ংক্রিয়ভাবে বাদ দেওয়া হয়, তাই এটি প্রদান করতে প্রয়োজন নেই
  false,  // অবশ্যই
  false,  // তাত্পর্য
  amqp.Publishing {
    ContentType: "প্লেইন টেক্সট", // মেসেজের বিষয়বস্তু ধরণ, এখানে প্লেইন টেক্সট
    Body:        []byte(body),  // মেসেজের বিষয়বস্তু
  })

2.5. মেসেজ পুশ কোড সম্পূর্ণ করুন

package main

import (
	"log"
	"github.com/streadway/amqp"
)

func failOnError(err error, msg string) {
	if err != nil {
		log.Fatalf("%s: %s", msg, err)
	}
}

func main() {
	// Rabbitmq সংযোগ করুন
	conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
	failOnError(err, "RabbitMQ সার্ভারে সংযোগ করা যাচ্ছে না")
	defer conn.Close()

	// একটি চ্যানেল তৈরি করুন
	ch, err := conn.Channel()
	failOnError(err, "একটি চ্যানেল খুলা যাচ্ছে না")
	defer ch.Close()

	// একটি এক্সচেঞ্জ ঘোষণা করুন
	err = ch.ExchangeDeclare(
		"tizi365",   // এক্সচেঞ্জের নাম
		"fanout", // এক্সচেঞ্জের ধরণ, প্রকাশ/সাবস্ক্রাইব মোডের জন্য ফ্যানআউট
		true,     // ট্রেনসিয়েন্ট
		false,    // অটো-মুছে দেওয়া হবে
		false,    // অন্তর্নিহিত
		false,    // কোনও অপেক্ষা না করুন
		nil,      // আর্গুমেন্ট
	)
	failOnError(err, "একটি এক্সচেঞ্জ ঘোষণা করা যাচ্ছে না")

	// মেসেজের বিষয়বস্তু
	body := "হ্যালো Tizi365.com!"
	// মেসেজ পুশ
	err = ch.Publish(
		"tizi365",     // এক্সচেঞ্জ (উপরোক্ত ঘোষণা অনুরূপ)
		"", // রাউটিং কী, ফ্যানআউট ধরণের এক্সচেঞ্জের রাউটিং কীটি স্বয়ংক্রিয়ভাবে বাদ
		false,  // অবশ্যই
		false,  // তাত্পর্য
		amqp.Publishing {
			ContentType: "প্লেইন টেক্সট", // মেসেজের বিষয়বস্তু ধরণ, এটি প্লেইন টেক্সট
			Body:        []byte(body),  // মেসেজের বিষয়বস্তু
		})

	log.Printf("প্রেরিত বিষয়বস্তু %s", body)
}

3. মেসেজ গ্রহণ করুন

মেসেজ গ্রহণ করার জন্য প্রথম তিনটি ধাপ—RabbitMQ এ সংযোগ করা, একটি চ্যানেল তৈরি করা এবং একটি এক্সচেঞ্জ ঘোষণা করা—মেসেজ প্রেরণের জন্য সমান। আপনার ধন্যবাদে পূর্বে 2.1, 2.2 এবং 2.3 ধারণা দেখুন।

3.1. একটি কিউ ঘোষণা করুন

q, err := ch.QueueDeclare(
		"",     // কিউ নাম, যদি স্পষ্টভাবে উল্লেখ না করা হয়, তবে একটি এলাম নম্বর স্বয়ংক্রিয়ভাবে উৎপন্ন হবে
		false,  // Durable
		false,  // অব্যবহৃত হতে সরিয়ে দিতে
		true,   // Exclusive
		false,  // অপেক্ষা না করতে
		nil,    // Arguments
	)

3.2. কিউটি এক্সচেঞ্জে বাঁধুন

বার্তা পেতে কিউটির এক্সচেঞ্জে বাঁধা লাগলে

err = ch.QueueBind(
		q.Name,        // কিউ নাম
		"",            // রাউটিং কি, ফ্যানআউট প্রকারের এক্সচেঞ্জের জন্য, রাউটিং কি স্বয়ংক্রিয়ভাবে উপেক্ষা করা হয়
		"tizi365",     // এক্সচেঞ্জ নাম, বার্তা পাঠানোর সময় যেহেতু প্রেরকটি সেট করেছে সেইটির সাথে মিলতে হবে
		false,
		nil)

লক্ষ্য: বাস্তব অ্যাপ্লিকেশনে, আমরা নির্ধারিত এক্সচেঞ্জের সাথে বাঁধা লাগানোর জন্য N কিউ সংজ্ঞায়িত করতে পারি, যাতে এক্সচেঞ্জ দ্বারা প্রেরিত বার্তা পেতে। এটি যেখানে প্রকাশ / সাবস্ক্রাইব প্যাটার্নের প্রতিবিম্বন হয়।

3.3. একটি কনসিউমার তৈরি করুন

msgs, err := ch.Consume(
		q.Name, // উপরের কিউ নাম থেকে উল্লেখ করা
		"",     // কনসিউমার নাম, যদি স্পষ্টভাবে উল্লেখ না করা হয়, তবে স্বয়ংক্রিয়ভাবে উৎপন্ন হবে
		true,   // স্বয়ংক্রিয়ভাবে অবহিত করে দেওয়া হয়েছে যে বার্তা প্রসেস করা হয়েছে
		false,  // Exclusive
		false,  // অস্থায়ী
		false,  // অপেক্ষা না করতে
		nil,    // Args
	)

// বার্তা নিয়ে হ্যান্ডেল করতে লুপ
for d := range msgs {
	log.Printf("বার্তা পেয়েছি=%s", d.Body)
}

3.4. কনসিউমার কোড পূর্ণ করুন

package main

import (
	"log"

	"github.com/streadway/amqp"
)

func failOnError(err error, msg string) {
	if err != nil {
		log.Fatalf("%s: %s", msg, err)
	}
}

func main() {
	// RabbitMQ সাথে সংযোগ করুন
	conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
	failOnError(err, "RabbitMQ সাথে সংযোগ স্থাপন করতে ব্যর্থ হলো")
	defer conn.Close()

	// একটি চ্যানেল তৈরি করুন, সাধারণত প্রতি কনসিউমার একটি চ্যানেল
	ch, err := conn.Channel()
	failOnError(err, "একটি চ্যানেল খোলা ব্যর্থ হলো")
	defer ch.Close()

	// একটি এক্সচেঞ্জ ঘোষণা করুন
	err = ch.ExchangeDeclare(
		"tizi365",   // এক্সচেঞ্জ নাম, প্রেরক দ্বারা ব্যবহৃত সেট করা এক্সচেঞ্জের সাথে মিলতে হবে
		"fanout", // এক্সচেঞ্জ প্রকার
		true,     // ট্রেন্সিয়েন্ট
		false,    // অটো-মুছে ফেলা
		false,    // ইন্টারনাল
		false,    // অপেক্ষা না করতে
		nil,      // Arguments
	)
	failOnError(err, "একটি এক্সচেঞ্জ ঘোষণা করতে ব্যর্থ হলো")

	// ঘোষণা করুন কিউ জন্য অপারেশন করার জন্য
	q, err := ch.QueueDeclare(
		"",    // কিউ নাম, যদি খালি হয়, তবে একটি এলাম নম্বর স্বয়ংক্রিয়ভাবে উৎপন্ন হবে
		false, // Durable
		false, // অব্যবহৃত হতে সরিয়ে দিতে
		true,  // Exclusive
		false, // অপেক্ষা না করতে
		nil,   // Arguments
	)
	failOnError(err, "একটি কিউ ঘোষণা করতে ব্যর্থ হলো")

	// কিউটি নির্দিষ্ট এক্সচেঞ্জে বাঁধান
	err = ch.QueueBind(
		q.Name, // কিউ নাম
		"",     // রাউটিং কি, ফ্যানআউট এক্সচেঞ্জের জন্য উপেক্ষা করা হয়
		"tizi365", // এক্সচেঞ্জ নাম, বার্তা প্রেরকটি দ্বারা সংজ্ঞায়িত করা এমনটির সাথে মিলতে হবে
		false,
		nil)
	failOnError(err, "একটি কিউ বাঁধানোর জন্য ব্যর্থ হলো")

	// একটি কনসিউমার তৈরি করুন
	msgs, err := ch.Consume(
		q.Name, // আগের কিউ নাম সম্প্রসারণ করুন
		"",     // কনসিউমার নাম, যদি খালি হয়, তবে স্বয়ংক্রিয়ভাবে উৎপন্ন হবে
		true,   // অটো-যাচাই
		false,  // সাবস্ক্রাইব করার জন্য অস্থায়ী
		false,  // স্থানীয় না
		false,  // অপেক্ষা না করতে
		nil,    // Args
	)
	failOnError(err, "একটি কনসিউমার নিবন্ধন করতে ব্যর্থ হলো")

	// কিউ থেকে বার্তা নোটিফাই করার জন্য একটি লুপে বার্তা কাজ করুন
	for d := range msgs {
		log.Printf("বার্তা পেয়েছি: %s", d.Body)
	}
}

3.5. একাধিক কনসিউমার

Work mode section দেখুন এবং গরুটিন ব্যবহার করে সহজভাবে একাধিক কনসিউমার চালু করুন।