CQRS মেকানিজম

CQRS এর মানে "কমান্ড কুয়েরি দায়িত্ব শৃংখলা"। এটি কমান্ড (লেখা অনুরোধ) এবং কুয়েরি (পড়া অনুরোধ) এর দায়িত্ব আলাদা করে। লেখা অনুরোধ এবং পড়া অনুরোধ ভিন্ন অবজেক্ট দ্বারা হ্যান্ডেল করা হয়।

এটাই CQRS। আমরা ডেটা স্টোরেজকে আরও আলাদা করতে পারি, পাঠ্য এবং লেখা স্টোরেজ ভিন্ন হওয়ায়। এটা হল CQRS-এর দ্বিতীয় ভাগ। একবার এটা করা হলে, বিভিন্ন ধরণের কুয়েরি বা অনেক সীমাবদ্ধ প্রসঙ্গগুলির জন্য অনুরুপ বড়গুলি এবং এর জন্য অপটিমাইজ বড় যা বেয়াদবি করা হবে। হালকা পড়া/লেখা স্টোরেজ অফটেন CQRS-এর সাথে সম্পর্কিত আলোচনা এর বিষয় বলা হয়, তবে, এটা CQRS নয়। CQRS শুধুমাত্র প্রথম দেওয়া অনুষ্ঠান এবং কুয়েরি দুর্নীতি।

CQRS মেকানিজম ডায়াগ্রাম

cqrs কম্পোনেন্টটি CQRS প্যাটার্ন অনুসরণ করতে সাহায্য করতে, পাব/সাব এবং রাউটারের উপর নির্মিত কিছু কার্যকারী অবস্থান সরবরাহ করে।

আপনাকে পূর্ণ CQRS বাস্তবায়ন করতে হবে না। সাধারণত, কেবল উপাদানের ঘটনা ব্যবহার করা হয়, ইভেন্ট-ড্রিভেন অ্যাপ্লিকেশন তৈরি করতে।

গঠন ব্লক

ঘটনাগুলি

ঘটনাগুলি সেই কিছু প্রতিপাদিত করে যা ইতিহাস হলো। ঘটনাগুলি অপরিবর্তনশীল।

ইভেন্ট বাস

পূর্ণ উৎস কোড: github.com/ThreeDotsLabs/watermill/components/cqrs/event_bus.go

// ...
// ইভেন্ট বাস ঘটনা হ্যান্ডলারদের কাছে ইভেন্ট পরিবহন করে।
type EventBus struct {
// ...

পূর্ণ উৎস কোড: github.com/ThreeDotsLabs/watermill/components/cqrs/event_bus.go

// ...
type EventBusConfig struct {
    // GeneratePublishTopic ব্যবহার করা হয় টপিক নাম তৈরি করার জন্য ঘড়ি প্রকাশের জন্য।
    GeneratePublishTopic GenerateEventPublishTopicFn

    // OnPublish ঘটনা প্রেরণ করার আগে ডোমেনকে পরিবর্তন করতে পারে। এটি *message.Message পরিবর্তন করতে পারে।
    //
    // এই বিকল্পটি অবশ্যই প্রয়োজন।
    OnPublish OnEventSendFn

    // Marshaler ব্যবহার করা হয় ঘটনাগুলি কোডিং এবং ডিকোডিং এর জন্য।
    // এটি প্রয়োজন।
    Marshaler CommandEventMarshaler

    // লগার এর একটি উদাহরণ ব্যবহার করিয়ে লগগুলির জন্য। যদি সরবরাহ না করা হয়, তবে watermill.NopLogger ব্যবহার করা হয়।
    Logger watermill.LoggerAdapter
}

func (c *EventBusConfig) setDefaults() {
    if c.Logger == nil {
        c.Logger = watermill.NopLogger{}
    }
}
// ...

ইভেন্ট প্রসেসর

পূর্ণ কোড: github.com/ThreeDotsLabs/watermill/components/cqrs/event_processor.go

// ...
// EventProcessor ব্যবহার করা হয় যেসব EventHandler ওই ইভেন্ট বাস থেকে পেয়েছে, সেগুলি কোনটি ব্যবহার করবে তা নির্ধারণ করার জন্য।
type EventProcessor struct {
// ...

পূর্ণ কোড: github.com/ThreeDotsLabs/watermill/components/cqrs/event_processor.go

// ...
type EventProcessorConfig struct {
	// GenerateSubscribeTopic ব্যবহার করা হয় ইভেন্ট গুলির জন্য সাবস্ক্রাইব করতে কোন টপিক তৈরি করবে।
	// যদি ইভেন্ট প্রসেসরটি হ্যান্ডলার গ্রুপ ব্যবহার করে, তবে GenerateSubscribeTopic ব্যবহার করা হয়।
	GenerateSubscribeTopic EventProcessorGenerateSubscribeTopicFn

	// SubscriberConstructor ব্যবহার করা হয় ইভেন্ট হ্যান্ডলার এর জন্য সাবস্ক্রাইবার তৈরি করতে।
	//
	// এই ফাংশনটি প্রতি ইভেন্ট হ্যান্ডলার ইনস্ট্যান্সের জন্য একবার কল করা হয়।
	// যদি আপনি একই সাবস্ক্রাইবারটি একাধিক হ্যান্ডলারের জন্য পুনরাবৃত্তি করতে চান, তাহলে GroupEventProcessor ব্যবহার করুন।
	SubscriberConstructor EventProcessorSubscriberConstructorFn

	// OnHandle হ্যান্ডলিং ইভেন্ট এগিয়ে কল করা হয় পূর্বে।
	// OnHandle মিডলওয়ের মত কাজ করে: আপনি ইভেন্ট হ্যান্ডলিং এর আগে এবং পরে অতিরিক্ত লজিক আড়াবাড় করতে পারেন।
	//
	// তাই, আপনাকে প্রায়ই params.Handler.Handle() কল করতে হবে যেটা ইভেন্ট হ্যান্ডেল করার জন্য।
	//
	//  func(params EventProcessorOnHandleParams) (err error) {
	//      // হ্যান্ডেলিং এর আগের লজিক
	//      //  (...)

	//      err := params.Handler.Handle(params.Message.Context(), params.Event)
	//
	//      // হ্যান্ডেলিং পরের লজিক
	//      //  (...)

	//      return err
	//  }
	//
	// এই বিকল্পটি অনিবার্য নয়।
	OnHandle EventProcessorOnHandleFn

	// AckOnUnknownEvent ব্যবহার করা হয় যদি ইভেন্টের জন্য কোনো নির্ধারিত হ্যান্ডলার না থাকে, তবে মেসেজ কমন ইনডেকশন খুশি করবে কিনা তা নির্ধারণ করতে।
	AckOnUnknownEvent bool

	// Marshaler ব্যবহার করা হয় ইভেন্ট গুলির মার্শাল এবং আনমার্শাল করতে।
	// প্রয়োজন।
	Marshaler CommandEventMarshaler

	// লগ সিস্টেমের জন্য লগার ইনস্ট্যান্স।
	// যদি সরবরাহ না করা হয়, তবে watermill.NopLogger ব্যবহার করা হবে।
	Logger watermill.LoggerAdapter

	// ডিসেবল রাউটার অটো অ্যাড হ্যান্ডলারদের জন্য ব্যবহার করা হয় যাতে পূর্বসূচনা ভাঙ্গান।
	// এই মানটি নিয়মিত রাখার জন্য।
	// ইভেন্ট প্রসেসর তৈরি করার সময় NewEventProcessor ব্যবহার করা হয়।
	// প্রত্যাহার: NewEventProcessorWithConfig এ মাইগ্রেট করুন।
	disableRouterAutoAddHandlers bool
}

func (c *EventProcessorConfig) setDefaults() {
	if c.Logger == nil {
		c.Logger = watermill.NopLogger{}
	}
}
// ...

ইভেন্ট গ্রুপ প্রসেসর

পূর্ণ উৎস কোড: github.com/ThreeDotsLabs/watermill/components/cqrs/event_processor_group.go

// ...
// EventGroupProcessor নির্ধারণ করে যে ঘটনা প্রসেসর যেসব ইভেন্ট বাস থেকে পাওয়া ঘটনা প্রসেসরের কাছে পথ প্রদর্শন করবে।
// EventProcessor এর তুলনায়, EventGroupProcessor অনুমোদন দেয় একাধিক প্রসেসরের একই সাবস্ক্রাইবার ইনস্ট্যান্স ভাগ করতে।
type EventGroupProcessor struct {
// ...

পূর্ণ উৎস কোড: github.com/ThreeDotsLabs/watermill/components/cqrs/event_processor_group.go

// ...
type EventGroupProcessorConfig struct {
	// GenerateSubscribeTopic গ্রুপ ইভেন্ট প্রসেসরে সাবস্ক্রাইব করার জন্য টপিক তৈরি করতে ব্যবহৃত হয়।
	// এই অপশনটি EventProcessor ব্যবহার করতে যখন প্রসেসর গ্রুপ ব্যবহার করা হয়।
	GenerateSubscribeTopic EventGroupProcessorGenerateSubscribeTopicFn

	// SubscriberConstructor গ্রুপ ইভেন্ট হ্যাণ্ডলারের জন্য একটি সাবস্ক্রাইবার তৈরি করতে ব্যবহৃত হয়।
	// এই ফাংশনটি প্রতি ইভেন্ট গ্রুপে একবার কল করা হয় - প্রতি গ্রুপে একটি সাবস্ক্রাইপশন তৈরি করার অনুমতি দেয়।
	// এটি খুব উপকারী যখন আমরা চাই ইভেন্টগুলি অনুক্রমে একটি স্ট্রিম থেকে হ্যান্ডেল করতে।
	SubscriberConstructor EventGroupProcessorSubscriberConstructorFn

	// OnHandle ইভেন্ট হ্যান্ডলিং এর আগে কল করা হয়।
	// OnHandle মিডলওয়্যারের মতো: আপনি ইভেন্ট হ্যান্ডলিং এর আগে এবং পরে অতিরিক্ত লজিক ইনজেক্ট করতে পারেন।
	//
	// এই কারণে, আপনাকে নির্দিষ্টভাবে params.Handler.Handle() কল করতে হবে ইভেন্ট হ্যান্ডলিং এর জন্য।
	//
	// func(params EventGroupProcessorOnHandleParams) (err error) {
	//     // হ্যান্ডলিং এর আগে লজিক
	//     //  (...)
	//
	//     err := params.Handler.Handle(params.Message.Context(), params.Event)
	//
	//     // হ্যান্ডলিং পরে লজিক
	//     //  (...)
	//
	//     return err
	// }
	//
	// এই অপশনটি প্রয়োজন নয়।
	OnHandle EventGroupProcessorOnHandleFn

	// AckOnUnknownEvent ধরে রাখতে ব্যবহৃত হয় যদি ইভেন্টে কোনও নির্ধারিত হ্যান্ডলার না থাকে।
	AckOnUnknownEvent bool

	// Marshaler ইভেন্ট এনকোডিং এবং ডিকোডিং করার জন্য ব্যবহৃত হয়।
	// এটি প্রয়োজন।
	Marshaler CommandEventMarshaler

	// লগু ইন্সট্যান্স যা লগিং এর জন্য ব্যবহৃত হয়।
	// যদি সরবরাহ না করা হয়, watermill.NopLogger ব্যবহৃত হবে।
	Logger watermill.LoggerAdapter
}

func (c *EventGroupProcessorConfig) setDefaults() {
	if c.Logger == nil {
		c.Logger = watermill.NopLogger{}
	}
}
// ...

ইভেন্ট গ্রুপ প্রসেসর সম্পর্কে আরও জানুন।

ইভেন্ট হ্যান্ডলার

পূর্ণ উৎস কোড: github.com/ThreeDotsLabs/watermill/components/cqrs/event_handler.go

// ...
// EventHandler নতুন ইভেন্ট দ্বারা উল্লিখিত ঘটনা গ্রহণ করে এবং তাদের হ্যান্ডল অনুমতি দেয়।
// এসবের ব্যবহার করলে, ইভেন্ট হ্যান্ডলার মডিফাই করতে এবং মূলধারাকে সংরক্ষণ করতে পারে।
// এর মধ্যেই প্রসেস ম্যানেজার, সাগা এবং শুধুমাত্র রিড মডেল বিল্ড করতে পারে।
//
// আদেশ হ্যান্ডলারের বিপরীতে, প্রতিটি ইভেন্টে একাধিক ইভেন্ট হ্যান্ডলার দিতে পারে।
//
// বার্তা প্রসেসিং এর সময়, একটি EventHandler অনুযায়ী ব্যবহার করুন।
// একাধিক ইভেন্ট পাস কর্তে সময়, হ্যান্ডল মেথডটি অনুযায়ী একাধিক বার সময় অনুপ্রস্থ করা হতে পারে।
// তাই, হ্যান্ডল মেথডটি থ্রেড-সেফ হতে হবে!
type EventHandler interface {
// ...

আদেশ

একটি আদেশ হল সম্প্রতি কোনও প্রেরণ করার অনুরোধ প্রতিষ্ঠান করা একটি সাধারণ তথ্য সারণি।

কমান্ড বাস

সম্পূর্ণ সোর্স কোড: github.com/ThreeDotsLabs/watermill/components/cqrs/command_bus.go

// ...
// CommandBus হল কমান্ড হ্যান্ডলারদের কাছে কমান্ড পাঠানোর জন্যকে পরিবহন করে।
type CommandBus struct {
// ...

সম্পূর্ণ সোর্স কোড: github.com/ThreeDotsLabs/watermill/components/cqrs/command_bus.go

// ...
type CommandBusConfig struct {
	// GeneratePublishTopic কমান্ড প্রকাশের জন্য টপিক তৈরি করতে ব্যবহৃত হয়।
	GeneratePublishTopic CommandBusGeneratePublishTopicFn

	// OnSend কমান্ড প্রকাশ করার আগে কল করা হয়।
	// *message.Message পরিবর্তন করতে পারে।
	//
	// এই অপশন অবশ্যই প্রয়োজন নয়।
	OnSend CommandBusOnSendFn

	// Marshaler কোড সিরিয়ালাইজ এবং ডিসিরিয়ালাইজ করার জন্য ব্যবহৃত হয়।
	// প্রয়োজন।
	Marshaler CommandEventMarshaler

	// লগ ব্যবহার করা হয় কি না
	// যদি সরবরাহ না করা হয়, তাহলে watermill.NopLogger ব্যবহৃত হবে।
	Logger watermill.LoggerAdapter
}

func (c *CommandBusConfig) setDefaults() {
	if c.Logger == nil {
		c.Logger = watermill.NopLogger{}
	}
}
// ...

কমান্ড প্রসেসর

সম্পূর্ণ সোর্স কোড: github.com/ThreeDotsLabs/watermill/components/cqrs/command_processor.go

// ...
// CommandProcessorSubscriberConstructorFn কমান্ড হ্যান্ডলারের জন্য সাবস্ক্রাইবার তৈরি করার জন্য ব্যবহৃত হয়।
// এটি আপনাকে প্রতিটি কমান্ড হ্যান্ডলারের জন্য আলাদা কাস্টম সাবস্ক্রাইবার তৈরি করতে দেয়।
type CommandProcessorSubscriberConstructorFn func(CommandProcessorSubscriberConstructorParams) (message.Subscriber, error)
// ...

সম্পূর্ণ সোর্স কোড: github.com/ThreeDotsLabs/watermill/components/cqrs/command_processor.go

// ...
type CommandProcessorConfig struct {
	// GenerateSubscribeTopic কমান্ড সাবস্ক্রাইব করার জন্য টপিক তৈরি করতে ব্যবহৃত হয়।
	GenerateSubscribeTopic CommandProcessorGenerateSubscribeTopicFn

	// SubscriberConstructor কমান্ড হ্যান্ডলারের জন্য সাবস্ক্রাইবার তৈরি করতে ব্যবহৃত হয
#### কমান্ড প্রসেসর

সম্পূর্ণ সোর্স কোড: [github.com/ThreeDotsLabs/watermill/components/cqrs/command_handler.go](https://github.com/ThreeDotsLabs/watermill/tree/master/components/cqrs/command_handler.go#L7)

```go
// ...
// CommandHandler নতুন কমান্ড দ্বারা সংজ্ঞায়িত পেয়ে এবং তা Handle মেথড ব্যবহার করে হ্যান্ডেল করে।
// DDD ব্যবহার করি, CommandHandler জড়নেকে পরিবর্তন করতে পারে এবং সংরক্ষণ করতে পারে।
//
// EventHandler এর বিরুদ্ধে, প্রতিটি কমান্ডে শুধুমাত্র একটি CommandHandler থাকতে পারে।
//
// বার্তা হ্যান্ডলিং প্রক্রিয়ায়, CommandHandler এর একটি অনুলিপিটি ব্যবহার করুন।
// যখন একাধিক কমান্ড একত্রিত হন, তখন হ্যান্ডল মেথড অনুমোদিত হতে পারে বহুবার এবং একই সাথে ক্রিয়া করা হতে পারে।
// তাই, হ্যান্ডল মেথডটি থ্রেড-সেফ হওয়ার প্রয়োজন!

type CommandHandler interface {
// ...

কমান্ড এবং ইভেন্ট মার্শালার

সম্পূর্ণ সোর্স কোড: github.com/ThreeDotsLabs/watermill/components/cqrs/marshaler.go

// ...
// CommandEventMarshaler কমান্ড এবং ইভেন্টগুলি কে Watermill ম্যাসেজে মার্শাল করে এবং বিপরীত করে।
// কমান্ডের পেলোড []bytes হতে হবে।
type CommandEventMarshaler interface {
	// Marshal কমান্ড বা ইভেন্টগুলি Watermill ম্যাসেজে মার্শাল করে।
	Marshal(v interface{}) (*message.Message, error)

	// Unmarshal করে Watermill ম্যাসেজে এ ভি কমান্ড বা ইভেন্ট ডিকোড করে।
	Unmarshal(msg *message.Message, v interface{}) (err error)

	// নাম কমান্ড বা ইভেন্টের নাম ফির্তা করে।
	// নামটি ব্যবহার করতে পারি যদি প্রেরিত কমান্ড বা ইভেন্টটি আমরা প্রসেস করতে চাই।
	Name(v interface{}) string

	// ম্যাসেজ থেকে কমান্ড বা ইভেন্টের নাম ফির্তা করে (মার্শাল করা এর মাধ্যমে উত্পন্ন).
	//
	// যখন আমাদের কমান্ড বা ইভেন্টগুলি Watermill ম্যাসেজে মার্শাল করা হয়, তখন অনাবশ্যক ডিকোডিং থেকে বাঁচার জন্য NameFromMessage ব্যবহার করতে হবে।
	NameFromMessage(msg *message.Message) string
}
// ...

ব্যবহার

উদাহরণ ডোমেইন

একটি সাধারণ ডোমেইন ব্যবস্থাপনা করার জন্য একটি সাধারণ ডোমেইন ব্যবহার করা হয় যা একটি হোটেলে কক্ষ রাখার জন্য দায়িত্বশীল।

আমরা ইভেন্ট স্টর্মিং প্রতীকগুলি ব্যবহার করব এই ডোমেইনের মডেল সংক্রান্ত সাধারণ প্রদর্শিত করার জন্য।

প্রতীক প্রস্তুতি:

  • নীল স্টিকি নোট হল কমান্ড
  • কমলা স্টিকি নোট হল ইভেন্ট
  • সবুজ স্টিকি নোট হল ইভেন্টগুলি হতে অসংখ্য বিনামূল্যে জেনারেট করা হয়
  • গোলাপী স্টিকি নোট হল নীতি যা ইভেন্টগুলি দ্বারা প্রায়োজনীয় হোটে করে এবং কমান্ড জেনারেট করে
  • গুলাবি স্টিকি নোট তত্ত্বির জাদুঘর; আমরা সুযোগ করছি যে আয়-নাইতি তা অনেকবার সমস্যাগুলিতে পড়ে

সিকিউআরএস ইভেন্ট স্টর্মিং

ডোমেনটি সাধারণ:

  • গ্রাহকরা কক্ষ বুক করতে পারে।
  • যখনেই কোনও কক্ষ বুক করা হয়, আমরা গ্রাহকের জন্য বোতল বিয়ার অর্ডার করি (কারণ আমরা আমাদের অতিথিগণকে ভালোবাসি)।
    • আমরা জানি কিছুসময় বিয়ার শেষ হয়
  • আমরা বুকিং এর উপর ভিত্তি করে একটি আর্থিক রিপোর্ট তৈরি করি।

কমান্ড প্রেরণ

প্রথমে, আমাদের গ্রাহকের প্রক্রিয়া সমূহ প্রস্তুত করতে হবে।

সম্পূর্ণ সোর্স কোড: github.com/ThreeDotsLabs/watermill/_examples/basic/5-cqrs-protobuf

// ...
		bookRoomCmd := &BookRoom{
			RoomId:    fmt.Sprintf("%d", i),
			GuestName: "John",
			StartDate: startDate,
			EndDate:   endDate,
		}
		if err := commandBus.Send(context.Background(), bookRoomCmd); err != nil {
			panic(err)
		}
// ...

কমান্ড হ্যান্ডলার

BookRoomHandler আমাদের কমান্ড হ্যান্ডল করবে।

সম্পূর্ণ সোর্স কোড: github.com/ThreeDotsLabs/watermill/_examples/basic/5-cqrs-protobuf/main.go

// ...
// BookRoomHandler একটি কমান্ড হ্যান্ডলার যা BookRoom কমান্ড প্রসেস করে এবং RoomBooked ইভেন্ট প্রসারিত করে।
//
// CQRS-এ, একটি কমান্ডটি একটি হ্যান্ডলার দ্বারা প্রসেস করা আবশ্যক।
// যেমন যখন আরেকটি হ্যান্ডলার যোগ করা হবে এই কমান্ডটি প্রসেস করার জন্য, একটি ত্রুটি রিটার্ন হবে।
type BookRoomHandler struct {
	eventBus *cqrs.EventBus
}

func (b BookRoomHandler) HandlerName() string {
	return "BookRoomHandler"
}

// NewCommand রিটার্ন করে এই হ্যান্ডলারে কোন ধরনের কমান্ডটি প্রসেস করা উচিত, এটি একটি পয়েন্টার হতে হবে।
func (b BookRoomHandler) NewCommand() interface{} {
	return &BookRoom{}
}

func (b BookRoomHandler) Handle(ctx context.Context, c interface{}) error {
	// c হলো সবসময় `NewCommand` দ্বারা রিটার্ন হওয়া টাইপ, তাই টাইপ এসারশন সবসময় নিরাপদ
	cmd := c.(*BookRoom)

	// কিছু বিশেষ্য মূল্যায়ন, যা বাস্তব প্রোডাকশনে একটি আরও বোধগম্য উপায়ে গণনা করা হতে পারে
	price := (rand.Int63n(40) + 1) * 10

	log.Printf(
		"%s নিয়ে বুক করা হয়েছে, %{থেকে %{পর্যন্ত",
		cmd.RoomId,
		cmd.GuestName,
		time.Unix(cmd.StartDate.Seconds, int64(cmd.StartDate.Nanos)),
		time.Unix(cmd.EndDate.Seconds, int64(cmd.EndDate.Nanos)),
	)

	// RoomBooked ইভেন্ট হ্যান্ডেলার দ্বারা হ্যান্ডল করা হবে,
	// এবং ভবিষ্যতে, RoomBooked ইভেন্টটি একাধিক ইভেন্ট হ্যান্ডেলার দ্বারা হ্যান্ডেল করা যেতে পারে
	if err := b.eventBus.Publish(ctx, &RoomBooked{
		ReservationId: watermill.NewUUID(),
		RoomId:        cmd.RoomId,
		GuestName:     cmd.GuestName,
		Price:         price,
		StartDate:     cmd.StartDate,
		EndDate:       cmd.EndDate,
	}); err != nil {
		return err
	}

	return nil
}

// OrderBeerOnRoomBooked হলো একটি ইভেন্ট হ্যান্ডলার যা RoomBooked ইভেন্ট প্রসেস করে এবং OrderBeer কমান্ড প্রসারিত করে।
// ...

ইভেন্ট হ্যান্ডলার

আমাদের পূর্বে উল্লিখিত মত, আমরা প্রতিটি রুম বুক করার সময় (*) পোঁটাবাড়ি অর্ডার করতে চাই। আমরা এটা OrderBeer কমান্ড ব্যবহার করে এটি অর্জন করি।

সম্পূর্ণ সোর্স কোড: github.com/ThreeDotsLabs/watermill/_examples/basic/5-cqrs-protobuf/main.go

// ...
// OrderBeerOnRoomBooked হলো একটি ইভেন্ট হ্যান্ডলার যা RoomBooked ইভেন্ট প্রসেস করে এবং OrderBeer কমান্ড প্রসারিত করে।
type OrderBeerOnRoomBooked struct {
	commandBus *cqrs.CommandBus
}

func (o OrderBeerOnRoomBooked) HandlerName() string {
	// এই নামটি EventsSubscriberConstructor এ পাঠানো হয় যাতে কিউ নাম জেনারেট করা যায়
	return "OrderBeerOnRoomBooked"
}

func (OrderBeerOnRoomBooked) NewEvent() interface{} {
	return &RoomBooked{}
}

func (o OrderBeerOnRoomBooked) Handle(ctx context.Context, e interface{}) error {
	event := e.(*RoomBooked)

	orderBeerCmd := &OrderBeer{
		RoomId: event.RoomId,
		Count:  rand.Int63n(10) + 1,
	}

	return o.commandBus.Send(ctx, orderBeerCmd)
}

// OrderBeerHandler একটি কমান্ড হ্যান্ডলার যা OrderBeer কমান্ডটি প্রসেস করে এবং BeerOrdered ইভেন্ট প্রসারিত করে।
// ...

OrderBeerHandler খুব প্রায় BookRoomHandler এর মত, এর একমাত্র পার্থক্য হল এটি কখনও অধিক্ষেপ বাটে না থাকা সময় কিছু ত্রুটি রিটার্ন করে, যেটা কারণে কমান্ডটি পুনরায় ইস্যু হয়। পূর্ণাঙ্গ অনুসন্ধান এবং পারিপ্রেক্ষ্যএ বর্ণিত সমাপ্তি সম্পূর্ণ নিয়ে বিস্তারিত বৈধানিক বর্ণনাটি উদাহরণ সোর্স কোড থেকে পাওয়া যাবে।

ইভেন্ট হ্যান্ডলার গ্রুপ

ডিফল্টভাবে, প্রতি ইভেন্ট হ্যান্ডলারের একটি আলাদা সাবস্ক্রাইবার ইনস্ট্যান্স থাকে। যদি শুধুমাত্র একটি ইভেন্ট টাইপ টপিকে পাঠানো হয়, তাহলে এই অ্যাপ্রোচটি ঠিকভাবে কাজ করে।

তবে, টপিকে বহু ধরনের ইভেন্ট থাকা থাকলে, এই ডুইটা অপশন রয়েছে:

  1. আপনি EventConfig.AckOnUnknownEvent কে true সেট করতে পারেন - এটি সব ইভেন্টকে অ্যাকনলেজ করবে যা হ্যান্ডলার দ্বারা হ্যান্ডেল করা হয়নি।
  2. আপনি ইভেন্ট হ্যান্ডলার গ্রুপ ম্যাকানিজম ব্যবহার করতে পারেন।

ইভেন্ট গ্রুপ ব্যবহার করতে, আপনাকে EventConfig এ অপশন গুলি সেট করতে হবে - GenerateHandlerGroupSubscribeTopic এবং GroupSubscriberConstructor

তারপর, আপনি ইভেন্ট প্রসেসরে AddHandlersGroup ব্যবহার করতে পারেন।

সম্পূর্ণ উত্স কোড: github.com/ThreeDotsLabs/watermill/_examples/basic/5-cqrs-protobuf/main.go

// ...
	err = eventProcessor.AddHandlersGroup(
		"events",
		OrderBeerOnRoomBooked{commandBus},

		NewBookingsFinancialReport(),

		cqrs.NewGroupEventHandler(func(ctx context.Context, event *BeerOrdered) error {
			logger.Info("বিয়ার অর্ডার করা হয়েছে", watermill.LogFields{
				"room_id": event.RoomId,
			})
			return nil
		}),
	)
	if err != nil {
// ...

GenerateHandlerGroupSubscribeTopicGroupSubscriberConstructor উভয়েই গ্রুপের নামের তথ্য ফাংশন প্যারামিটার হিসাবে গ্রহণ করে।

জেনেরিক হ্যান্ডলারস

Watermill v1.3 থেকে, জেনেরিক হ্যান্ডলারস ব্যবহার করা যাবে কমান্ড এবং ইভেন্টগুলি হ্যান্ডল করার জন্য। এটা খুব উপকারী যখন একটি বড় সংখ্যক কমান্ড/ইভেন্ট থাকে এবং আপনি প্রতিটির জন্য একটি হ্যান্ডলার তৈরি করতে চান না।

সম্পূর্ণ উত্স কোড: github.com/ThreeDotsLabs/watermill/_examples/basic/5-cqrs-protobuf/main.go

// ...
		cqrs.NewGroupEventHandler(func(ctx context.Context, event *BeerOrdered) error {
			logger.Info("বিয়ার অর্ডার করা হয়েছে", watermill.LogFields{
				"room_id": event.RoomId,
			})
			return nil
		}),
// ...

পিছনের দিকে, এটি একটি EventHandler বা CommandHandler ইমপ্লিমেন্টেশন তৈরি করে। এটি সমস্ত প্রকারের হ্যান্ডলারের জন্য উপযুক্ত।

সম্পূর্ণ উত্স কোড: github.com/ThreeDotsLabs/watermill/components/cqrs/command_handler.go

// ...
// NewCommandHandler ফাংশন প্যারামিটার থেকে এবং ফাংশন প্যারামিটার থেকে ইনফার করা দ্বারা একটি নতুন CommandHandler ইমপ্লিমেন্টেশন তৈরি করে।
func NewCommandHandler[Command any](
// ...

সম্পূর্ণ উত্স কোড: github.com/ThreeDotsLabs/watermill/components/cqrs/event_handler.go

// ...
// NewEventHandler ফাংশন প্যারামিটার থেকে এবং ফাংশন প্যারামিটার থেকে ইনফার করা দ্বারা একটি নতুন EventHandler ইমপ্লিমেন্টেশন তৈরি করে।
func NewEventHandler[T any](
// ...

সম্পূর্ণ উত্স কোড: github.com/ThreeDotsLabs/watermill/components/cqrs/event_handler.go

// ...
// NewGroupEventHandler ফাংশন প্যারামিটার থেকে এবং ফাংশন প্যারামিটার থেকে ইনফার করা দ্বারা একটি নতুন GroupEventHandler ইমপ্লিমেন্টেশন তৈরি করে।
func NewGroupEventHandler[T any](handleFunc func(ctx context.Context, event *T) error) GroupEventHandler {
// ...

ইভেন্ট হ্যান্ডলার ব্যবহার করে একটি রিড মডেল তৈরি করা

সম্পূর্ণ সোর্স কোড: github.com/ThreeDotsLabs/watermill/_examples/basic/5-cqrs-protobuf/main.go

// ...
// BookingsFinancialReport হল একটি রিড মডেল যা গণনা করে যেমন আমরা বুকিং থেকে কত টাকা আয় করতে পারি।
// এটি RoomBooked ইভেন্ট এর জন্য শুনে।
//
// এই সংস্করণে এটি শুধুমাত্র মেমোরি তে লিখে। উৎপাদন পরিবেশে, আপনি স্থায়ী সংরক্ষণের কিছু ফর্ম ব্যবহার করতে পারেন।
type BookingsFinancialReport struct {
	handledBookings map[string]struct{}
	totalCharge     int64
	lock            sync.Mutex
}

func NewBookingsFinancialReport() *BookingsFinancialReport {
	return &BookingsFinancialReport{handledBookings: map[string]struct{}{}}
}

func (b BookingsFinancialReport) HandlerName() string {
	// এই নামটি EventsSubscriberConstructor এ পাঠানো হয় এবং ইউজ করা হয় কিউ নাম তৈরি করার জন্য
	return "BookingsFinancialReport"
}

func (BookingsFinancialReport) NewEvent() interface{} {
	return &RoomBooked{}
}

func (b *BookingsFinancialReport) Handle(ctx context.Context, e interface{}) error {
	// হ্যান্ডলটি সময়সীমা দিয়ে কল করা হতে পারে, তাই থ্রেড নিরাপত্তা প্রয়োজন।
	b.lock.Lock()
	defer b.lock.Unlock()

	event := e.(*RoomBooked)

	// একটি যেখানে পাব / সাব ব্যবহার করা হয় যা পূর্ণ ভাবে প্রদান করে না গন্তব্য-একবারে বার্তা ডিডিউপ্লিকেট.
	// GoChannel Pub / Sub সর্বনিম্ন অনুমতি সরবরাহ করে,
	// তবে আসুন আমরা অন্যান্য Pub / Sub অনুষ্ঠান প্রস্তুত করি এই উদাহরণের জন্য।
	if _, ok := b.handledBookings[event.ReservationId]; ok {
		return nil
	}
	b.handledBookings[event.ReservationId] = struct{}{}

	b.totalCharge += event.Price

	fmt.Printf(">>> কোটির জন্য কক্ষ বুক করা হয়েছে $%d\n", b.totalCharge)
	return nil
}

var amqpAddress = "amqp://guest:guest@rabbitmq:5672/"

func main() {
// ...

সব কিছু সংযোজন করুন

আমাদের পাশেই CQRS অ্যাপ্লিকেশন তৈরি করার জন্য প্রয়োজনীয় সমস্ত উপাদান আছে।

আমরা আমাদের বার্তা ব্রোকার হিসাবে AMQP (RabbitMQ) ব্যবহার করব: AMQP।

CQRS এর নিচে, Watermill's মেসেজ রাউটার ব্যবহার করে। যদি আপনি এটা জানি না এবং কীভাবে এটা কাজ করে তা জানতে চান, তবে আপনাকে getting started গাইড দেখতে হবে। এটি আপনাকে এটি যেসব প্রমাণিত বার্তা নেতিবাচক প্যাটার্ন, প্যাইসন ম্যাসেজ কিউ, রেট লিমিটিং, সম্পর্ক, এবং অন্যান্য সর্বাধিক বার্তা-নিযুক্ত অ্যাপ্লিকেশন ব্যবহৃত উপায় ব্যবহার করার প্রদর্শন করবে। এই সরঞ্জামগুলি ইতিমধ্যে Watermill এ নির্মিত।

আসুন CQRS এ ফিরে যাই। আপনি তো জানেন ইতিমধ্যে, CQRS এর মধ্যে অনেকগুলি উপাদান, যেমন command বা event buses, processors, এবং অন্যান্য।

সম্পূর্ণ সোর্স কোড: github.com/ThreeDotsLabs/watermill/_examples/basic/5-cqrs-protobuf/main.go

// ...
func main() {
	logger := watermill.NewStdLogger(false, false)
	cqrsMarshaler := cqrs.ProtobufMarshaler{}

	// You can use any Pub/Sub implementation from here: https://watermill.io/pubsubs/
	// Detailed RabbitMQ implementation: https://watermill.io/pubsubs/amqp/
	// Commands will be send to queue, because they need to be consumed once.
	commandsAMQPConfig := amqp.NewDurableQueueConfig(amqpAddress)
	commandsPublisher, err := amqp.NewPublisher(commandsAMQPConfig, logger)
	if err != nil {
		panic(err)
	}
	commandsSubscriber, err := amqp.NewSubscriber(commandsAMQPConfig, logger)
	if err != nil {
		panic(err)
	}

	// Events will be published to PubSub configured Rabbit, because they may be consumed by multiple consumers.
	// (in that case BookingsFinancialReport and OrderBeerOnRoomBooked).
	eventsPublisher, err := amqp.NewPublisher(amqp.NewDurablePubSubConfig(amqpAddress, nil), logger)
	if err != nil {
		panic(err)
	}

	// CQRS is built on messages router. Detailed documentation: https://watermill.io/docs/messages-router/
	router, err := message.NewRouter(message.RouterConfig{}, logger)
	if err != nil {
		panic(err)
	}

	// Simple middleware which will recover panics from event or command handlers.
	// More about router middlewares you can find in the documentation:
	// https://watermill.io/docs/messages-router/#middleware
	//
	// List of available middlewares you can find in message/router/middleware.
	router.AddMiddleware(middleware.Recoverer)

	commandBus, err := cqrs.NewCommandBusWithConfig(commandsPublisher, cqrs.CommandBusConfig{
		GeneratePublishTopic: func(params cqrs.CommandBusGeneratePublishTopicParams) (string, error) {
			// we are using queue RabbitMQ config, so we need to have topic per command type
			return params.CommandName, nil
		},
		OnSend: func(params cqrs.CommandBusOnSendParams) error {
			logger.Info("Sending command", watermill.LogFields{
				"command_name": params.CommandName,
			})

			params.Message.Metadata.Set("sent_at", time.Now().String())

			return nil
		},
		Marshaler: cqrsMarshaler,
		Logger:    logger,
	})
	if err != nil {
		panic(err)
	}

	commandProcessor, err := cqrs.NewCommandProcessorWithConfig(
		router,
		cqrs.CommandProcessorConfig{
			GenerateSubscribeTopic: func(params cqrs.CommandProcessorGenerateSubscribeTopicParams) (string, error) {
				// we are using queue RabbitMQ config, so we need to have topic per command type
				return params.CommandName, nil
			},
			SubscriberConstructor: func(params cqrs.CommandProcessorSubscriberConstructorParams) (message.Subscriber, error) {
				// we can reuse subscriber, because all commands have separated topics
				return commandsSubscriber, nil
			},
			OnHandle: func(params cqrs.CommandProcessorOnHandleParams) error {
				start := time.Now()

				err := params.Handler.Handle(params.Message.Context(), params.Command)

				logger.Info("Command handled", watermill.LogFields{
					"command_name": params.CommandName,
					"duration":     time.Since(start),
					"err":          err,
				})

				return err
			},
			Marshaler: cqrsMarshaler,
			Logger:    logger,
		},
	)
	if err != nil {
		panic(err)
	}

	eventBus, err := cqrs.NewEventBusWithConfig(eventsPublisher, cqrs.EventBusConfig{
		GeneratePublishTopic: func(params cqrs.GenerateEventPublishTopicParams) (string, error) {
			// because we are using PubSub RabbitMQ config, we can use one topic for all events
			return "events", nil

			// we can also use topic per event type
			// return params.EventName, nil
		},

		OnPublish: func(params cqrs.OnEventSendParams) error {
			logger.Info("Publishing event", watermill.LogFields{
				"event_name": params.EventName,
			})

			params.Message.Metadata.Set("published_at", time.Now().String())

			return nil
		},

		Marshaler: cqrsMarshaler,
		Logger:    logger,
	})
	if err != nil {
		panic(err)
	}

	eventProcessor, err := cqrs.NewEventGroupProcessorWithConfig(
		router,
		cqrs.EventGroupProcessorConfig{
			GenerateSubscribeTopic: func(params cqrs.EventGroupProcessorGenerateSubscribeTopicParams) (string, error) {
				return "events", nil
			},
			SubscriberConstructor: func(params cqrs.EventGroupProcessorSubscriberConstructorParams) (message.Subscriber, error) {
				config := amqp.NewDurablePubSubConfig(
					amqpAddress,
					amqp.GenerateQueueNameTopicNameWithSuffix(params.EventGroupName),
				)

				return amqp.NewSubscriber(config, logger)
			},

			OnHandle: func(params cqrs.EventGroupProcessorOnHandleParams) error {
				start := time.Now()

				err := params.Handler.Handle(params.Message.Context(), params.Event)

				logger.Info("Event handled", watermill.LogFields{
					"event_name": params.EventName,
					"duration":   time.Since(start),
					"err":        err,
				})

				return err
			},

			Marshaler: cqrsMarshaler,
			Logger:    logger,
		},
	)
	if err != nil {
		panic(err)
	}

	err = commandProcessor.AddHandlers(
		BookRoomHandler{eventBus},
		OrderBeerHandler{eventBus},
	)
	if err != nil {
		panic(err)
	}

	err = eventProcessor.AddHandlersGroup(
		"events",
		OrderBeerOnRoomBooked{commandBus},

		NewBookingsFinancialReport(),

		cqrs.NewGroupEventHandler(func(ctx context.Context, event *BeerOrdered) error {
			logger.Info("Beer ordered", watermill.LogFields{
				"room_id": event.RoomId,
			})
			return nil
		}),
	)
	if err != nil {
		panic(err)
	}

	// publish BookRoom commands every second to simulate incoming traffic
	go publishCommands(commandBus)

	// processors are based on router, so they will work when router will start
	if err := router.Run(context.Background()); err != nil {
		panic(err)
	}
}
// ...

এটা। আমাদের একটি চালনাযোগ্য CQRS অ্যাপ্লিকেশন আছে।