CQRS यांत्रिकी
CQRS का मतलब होता है "कमांड क्वेरी जिम्मेदारी विभाजन"। इसमें कमांड (लिखने के अनुरोध) और क्वेरी (पढ़ने के अनुरोध) की जिम्मेदारी को अलग किया जाता है। लिखने के अनुरोध और पढ़ने के अनुरोध अलग-अलग ऑब्जेक्ट्स द्वारा संभाले जाते हैं।
यह CQRS है। हम डेटा स्टोरेज को भी अलग-अलग कर सकते हैं, जिसमें अलग-अलग पढ़ने और लिखने की स्टोरेज होती है। एक बार जब यह कार्य पूरा हो जाता है, तो अलग-अलग पढ़ने की स्टोरेज को विभिन्न प्रकार के क्वेरी को संभालने या कई बाउंडेड संदर्भों को दायर करने के लिए अनुकूलित किया जा सकता है। यद्यपि अलग-अलग पढ़ने/लिखने की स्टोरेज अक्सर CQRS से संबंधित चर्चा का विषय होती है, लेकिन यह CQRS खुद नहीं है। CQRS केवल कमांड और क्वेरी का पहला विभाजन है।
cqrs
घटक CQRS पैटर्न को लागू करने में सहायक एक व्यावसायिकताओं के ऊपर बनाई गई कुछ उपयोगी मानकीकरण प्रदान करता है, जो Pub/Sub और Router पर आधारित है।
आपको पूरा CQRS को अमल में लाने की आवश्यकता नहीं है। आमतौर पर, केवल घटक का इवेंट हिस्सा ईवेंट-ड्रिवन ऍप्लिकेशन बनाने के लिए प्रयोग किया जाता है।
निर्माण खंड
इवेंट्स
इवेंट्स किसी वाक्योंयिकी होने वाली चीज़ को प्रस्तुत करते हैं। इवेंट्स अपरिवर्तनीय होते हैं।
इवेंट बस
पूर्ण स्रोत कोड: github.com/ThreeDotsLabs/watermill/components/cqrs/event_bus.go
// ...
// इवेंटबस इवेंट हैंडलर्स को इवेंट ट्रांसपोर्ट करता है।
type EventBus struct {
// ...
पूर्ण स्रोत कोड: github.com/ThreeDotsLabs/watermill/components/cqrs/event_bus.go
// ...
type EventBusConfig struct {
// GeneratePublishTopic इवेंट्स को प्रकाशित करने के लिए विषय नाम जनरेट करने के लिए उपयोग किया जाता है।
GeneratePublishTopic GenerateEventPublishTopicFn
// प्रारंभ में ईवेंट को संदेश भेजने से पहले कॉल किया जाता है। यह *message.Message को संशोधित कर सकता है।
//
// यह विकल्प अनिवार्य नहीं है।
OnPublish OnEventSendFn
// मार्शलर इवेंट्स को इंकोडिंग और डिकोडिंग के लिए प्रयोग किया जाता है।
// यह अनिवार्य है।
Marshaler CommandEventMarshaler
// लॉगर इंस्टेंस लॉगिंग के लिए। अगर प्रदान नहीं किया गया है, तो watermill.NopLogger का उपयोग किया जाता है।
Logger watermill.LoggerAdapter
}
func (c *EventBusConfig) setDefaults() {
if c.Logger == nil {
c.Logger = watermill.NopLogger{}
}
}
// ...
घटना प्रोसेसर
पूरा कोड: github.com/ThreeDotsLabs/watermill/components/cqrs/event_processor.go
// ...
// EventProcessor का उपयोग इवेंट बस से प्राप्त होने वाली घटनाओं को संभालने वाले EventHandler को निर्धारित करने के लिए किया जाता है।
type EventProcessor struct {
// ...
पूरा कोड: github.com/ThreeDotsLabs/watermill/components/cqrs/event_processor.go
// ...
type EventProcessorConfig struct {
// GenerateSubscribeTopic का उपयोग इवेंटों की सदस्यता लेने के लिए विषय उत्पन्न करने के लिए किया जाता है।
// अगर घटना प्रोसेसर हैंडलर समूहों का उपयोग करता है, तो GenerateSubscribeTopic का उपयोग किया जाता है।
GenerateSubscribeTopic EventProcessorGenerateSubscribeTopicFn
// SubscriberConstructor, EventHandler के लिए एक सदस्य बनाने के लिए किया जाता है।
//
// इस फ़ंक्शन को हर EventHandler इंस्टेंस के लिए एक बार बुलाया जाता है।
// यदि आप एक सदस्य को कई हैंडलर के लिए फिर से उपयोग करना चाहते हैं, तो GroupEventProcessor का उपयोग करें।
SubscriberConstructor EventProcessorSubscriberConstructorFn
// OnHandle, घटना को संभालने से पहले बुलाया जाता है।
// OnHandle, मध्यवर्ती के तरह काम करता है: आप घटना को संभालने से पहले और बाद में अतिरिक्त लॉज़िक डाल सकते हैं।
//
// इसलिए, आपको स्पष्ट रूप से params.Handler.Handle() को घटना को संभालने के लिए बुलाना होगा।
//
// func(params EventProcessorOnHandleParams) (err error) {
// // संभालने से पहले लॉजिक
// // (...)
// err := params.Handler.Handle(params.Message.Context(), params.Event)
//
// // संभालने के बाद लॉजिक
// // (...)
// return err
// }
//
// यह विकल्प अनिवार्य नहीं है।
OnHandle EventProcessorOnHandleFn
// AckOnUnknownEvent, यह निर्धारित करने के लिए का उपयोग किया जाता है कि क्या संदेश को स्वीकृति दी जानी चाहिए जब घटना के लिए कोई परिभाषित हैंडलर नहीं होता है।
AckOnUnknownEvent bool
// Marshaler, घटनाओं को मार्शल और अनमार्शल करने के लिए किया जाता है।
// आवश्यक है।
Marshaler CommandEventMarshaler
// लॉगर इंस्टेंस, लॉगिंग के लिए।
// यदि उपलब्ध नहीं है, तो watermill.NopLogger का उपयोग किया जाएगा।
Logger watermill.LoggerAdapter
// disableRouterAutoAddHandlers, पिछले संगतता बनाए रखने के लिए।
// इस मान को NewEventProcessor का उपयोग करके EventProcessor बनाते समय सेट किया जाएगा। Deprecated: NewEventProcessorWithConfig में स्थानांतरित करें।
disableRouterAutoAddHandlers bool
}
func (c *EventProcessorConfig) setDefaults() {
if c.Logger == nil {
c.Logger = watermill.NopLogger{}
}
}
// ...
ईवेंट समूह प्रोसेसर
पूरी स्रोत कोड: github.com/ThreeDotsLabs/watermill/components/cqrs/event_processor_group.go
// ...
// EventGroupProcessor निर्धारित करता है कि ईवेंट बस से प्राप्त होने वाले ईवेंट प्रोसेसर को कौन संभालना चाहिए।
// EventProcessor की तुलना में, EventGroupProcessor एक ही सब्सक्राइबर इंस्टेंस को साझा करने की अनुमति देता है।
type EventGroupProcessor struct {
// ...
पूरी स्रोत कोड: github.com/ThreeDotsLabs/watermill/components/cqrs/event_processor_group.go
// ...
type EventGroupProcessorConfig struct {
// GenerateSubscribeTopic ग्रुप ईवेंट प्रोसेसर को सब्सक्राइब करने के लिए टॉपिक जनरेट करने के लिए उपयोग किया जाता है।
// इस विकल्प की आवश्यकता EventProcessor के लिए होती है जब प्रोसेसर समूह का उपयोग किया जाता है।
GenerateSubscribeTopic EventGroupProcessorGenerateSubscribeTopicFn
// SubscriberConstructor GroupEventHandler के लिए एक सब्सक्राइबर बनाने के लिए उपयोग किया जाता है।
// इस फ़ंक्शन को प्रति ईवेंट समूह के लिए एक सब्स्क्रिप्शन बनाने की अनुमति होती है।
// यह बहुत उपयोगी होता है जब हमें आर्दता में ईवेंट संभालना चाहते हैं।
SubscriberConstructor EventGroupProcessorSubscriberConstructorFn
// OnHandle ईवेंट को संभालने से पहले बुलाया जाता है।
// OnHandle मिडलवेयर के रूप में है: आप संभालने से पहले और बाद में अतिरिक्त तर्क इंजेक्ट कर सकते हैं।
//
// इसलिए, आपको स्पष्ट रूप से params.Handler.Handle () को ईवेंट को संभालने के लिए बुलाना होगा।
//
// func(params EventGroupProcessorOnHandleParams) (err error) {
// // संभालने से पहले तारीख
// // (...)
//
// err := params.Handler.Handle(params.Message.Context(), params.Event)
//
// // संभालने के बाद तारीख
// // (...)
//
// return err
// }
//
// इस विकल्प की आवश्यकता नहीं है।
OnHandle EventGroupProcessorOnHandleFn
// अनजाने ईवेंट पर वापसी प्राप्त करने के लिए ब्राउज़ करो
// यह डिफ़ाइंड हैंडलर नहीं है।
// अज्ञात ईवेंट पर स्वीकृत करने के लिए उपयोग किया जाता है।
AckOnUnknownEvent bool
// Marshaler का उपयोग ईवेंटों को एनकोड और डिकोड करने के लिए होता है।
// यह आवश्यक है।
Marshaler CommandEventMarshaler
// लॉगिंग के लिए इस्तेमाल की जाने वाली लॉगर इंस्टेंस।
// अगर प्रदान नहीं किया गया है, तो watermill.NopLogger का उपयोग किया जाएगा।
Logger watermill.LoggerAdapter
}
func (c *EventGroupProcessorConfig) setDefaults() {
if c.Logger == nil {
c.Logger = watermill.NopLogger{}
}
}
// ...
ईवेंट समूह प्रोसेसर के बारे में अधिक जानें।
ईवेंट हैंडलर
पूरी स्रोत कोड: github.com/ThreeDotsLabs/watermill/components/cqrs/event_handler.go
// ...
// EventHandler नए ईवेंट द्वारा परे गए ईवेंट प्राप्त करता है और उन्हें अपनी Handle विधि का उपयोग करके संभालता है।
// यदि DDD का उपयोग किया जा रहा है, तो ईवेंट हैंडलर अद्यतन और एकत्र कर सकता है।
// यह संयोजक प्रबंधक, सागा या केवल स्नहान शैली निर्माण कर सकता है।
//
// सदैव कमांड हैंडलर की तरह, प्रत्येक ईवेंट में कई ईवेंट हैंडलर हो सकते हैं।
//
// संदेश संभालन के दौरान, एक ही EventHandler इंस्टेंस का उपयोग करें।
// एक साथ कई ईवेंट्स पार करते समय, Handle विधि को कई बार समय साथ-साथ चलाया जा सकता है।
// इसलिए, Handle विधि को स्तर-सुरक्षित होना चाहिए!
type EventHandler interface {
// ...
कमांड
एक कमांड एक सरल डाटा संरचना है जो किसी कार्रवाई को करने का अनुरोध दर्शाती है।
कमांड बस
पूर्ण स्रोत कोड: github.com/ThreeDotsLabs/watermill/components/cqrs/command_bus.go
// ...
// कमांडबस कंपोनेंट है जो कमांड्स को कमांड हैंडलर्स को पहुंचाता है।
type कमांडबस struct {
// ...
पूर्ण स्रोत कोड: github.com/ThreeDotsLabs/watermill/components/cqrs/command_bus.go
// ...
type कमांडबसकॉन्फ़िग struct {
// GeneratePublishTopic कमांड्स को प्रकाशित करने के लिए विषय बनाने के लिए उपयोग किया जाता है।
GeneratePublishTopic कमांडबसजनरेटपब्लिशटॉपिकFn
// OnSend कमांड को प्रकाशित करने से पहले बुलाया जाता है।
// *मैसेज.मैसेज को संशोधित किया जा सकता है।
//
// यह विकल्प अनिवार्य नहीं है।
OnSend कमांडबसऑनसेंडFn
// मार्शलर कमांड्स को serialize और deserialize करने के लिए उपयोग किया जाता है।
// आवश्यक है।
Marshaler कमांडइवेंटमार्शलर
// लॉगर इंस्टेंस जो लॉगिंग के लिए उपयोग किया जाता है।
// अगर नहीं प्रादान किया गया है, तो watermill.NopLogger का उपयोग किया जाएगा।
Logger watermill.LoggerAdapter
}
func (c *कमांडबसकॉन्फ़िग) setDefaults() {
if c.Logger == nil {
c.Logger = watermill.NopLogger{}
}
}
// ...
कमांड प्रोसेसर
पूर्ण स्रोत कोड: github.com/ThreeDotsLabs/watermill/components/cqrs/command_processor.go
// ...
// कमांडप्रोसेसरसबस्क्राइबरकंस्ट्रक्टरFn कमांडहैंडलर के लिए सबस्क्राइबर बनाने के लिए उपयोग किया जाता है।
// इसे आपको प्रत्येक कमांड हैंडलर के लिए विशेष कस्टम सबस्क्राइबर बनाने की अनुमति देता है।
type कमांडप्रोसेसरसबस्क्राइबरकंस्ट्रक्टरFn func(कमांडप्रोसेसरसबस्क्राइबरकंस्ट्रक्टरपैराम्स) (message.Subscriber, error)
// ...
पूर्ण स्रोत कोड: github.com/ThreeDotsLabs/watermill/components/cqrs/command_processor.go
// ...
type कमांडप्रोसेसरकॉन्फ़िग struct {
// GenerateSubscribeTopic कमांड्स को सब्स्क्राइब करने के लिए विषय बनाने के लिए उपयोग किया जाता है।
GenerateSubscribeTopic कमांडप्रोसेसरजेनरेटसब्स्क्राइबटॉपिकFn
// सबस्क्राइबरकंस्ट्रक्टर कमांडहैंडलर के लिए सबस्क्राइबर बनाने के लिए उपयोग किया जाता है।
SubscriberConstructor कमांडप्रोसेसरसबस्क्राइबरकंस्ट्रक्टरFn
// OnHandle कमांड को हैंडल करने से पहले बुलाया जाता है।
// OnHandle का काम मिडलवेयर की तरह होता है: आप कमांड को हैंडल करने से पहले और बाद में अतिरिक्त लॉजिक इंजेक्शन कर सकते हैं।
//
// इसके कारण, आपको params.Handler.Handle() को स्पष्ट रूप से कमांड को हैंडल करने के लिए कहना होगा।
// func(params CommandProcessorOnHandleParams) (err error) {
// // हैंडल करने से पहले लॉजिक
// // (...)
//
// err := params.Handler.Handle(params.Command.Context(), params.Command)
//
// // हैंडल करने के बाद लॉजिक
// // (...)
//
// return err
// }
//
// यह विकल्प अनिवार्य नहीं है।
OnHandle कमांडप्रोसेसरऑनहैंडलFn
// मार्शलर कमांड्स को serialize और deserialize करने के लिए उपयोग किया जाता है।
// आवश्यक है।
Marshaler कमांडइवेंटमार्शलर
// लॉगर इंस्टेंस लॉगिंग के लिए।
// अगर प्रादान नहीं किया गया है, तो watermill.NopLogger का उपयोग किया जाएगा।
Logger watermill.LoggerAdapter
// अगर सही है, तो कमांडप्रोसेसर संदेशों को अनुमति देगा भले ही कमांडहैंडलर एक त्रुटि लौटाए।
// यदि RequestReplyBackend null नहीं है और जवाब भेज ना सके, तो संदेश फिर भी nacked हो जाएगा।
//
// चेतावनी: जब आप requestreply प्रावधान (requestreply.NewCommandHandler या requestreply.NewCommandHandlerWithResult) का उपयोग कर रहे हैं,
// तो इसे अनुशंसित नहीं किया जाता है, क्योंकि यह संदेश लौटाने में त्रुटि होने पर कमांड को एक कर देगा।
//
// requestreply का उपयोग करते समय, आपको requestreply.PubSubBackendConfig.AckCommandErrors का उपयोग करना चाहिए।
AckCommandHandlingErrors सही
// disableRouterAutoAddHandlers पिछड़ी संगतता के लिए उपयोग किया जाता है।
// जब NewCommandProcessor के साथ एक कमांडप्रोसेसर बनाया जाता है।
// Deprecated: कृपया NewCommandProcessorWithConfig में माइग्रेशन करें।
disableRouterAutoAddHandlers सही
}
func (c *कमांडप्रोसेसरकॉन्फ़िग) setDefaults() {
if c.Logger == nil {
c.Logger = watermill.NopLogger{}
}
}
// ...
कमांड प्रोसेसर
पूर्ण स्रोत कोड: github.com/ThreeDotsLabs/watermill/components/cqrs/command_handler.go
// ...
// CommandHandler NewCommand द्वारा परिभाषित कमांड प्राप्त करता है और इसे Handle मेथड का उपयोग करके संपादित करता है।
// DDD का उपयोग करते हुए, CommandHandler को एकल में परिवर्तित और दृढ़ करने के लिए कमांड प्राप्त करता है।
//
// EventHandler की तरह, प्रत्येक कमांड के पास केवल एक CommandHandler हो सकता है।
//
// संदेश संसाधन के दौरान, CommandHandler का एक उदाहरण उपयोग करें।
// जब एक साथ कई कमांड पहुंचाए जाते हैं, तो Handle मेथड को कई बार समवर्ण रूप से क्रियान्वित किया जा सकता है।
// इसलिए, Handle मेथड को समवर्ण सुरक्षित होने की आवश्यकता है!
type CommandHandler interface {
// ...
कमांड और ईवेंट मार्शलर
पूर्ण स्रोत कोड: github.com/ThreeDotsLabs/watermill/components/cqrs/marshaler.go
// ...
// CommandEventMarshaler कमांड और ईवेंट को Watermill संदेश में मार्शल करता है, और उल्टा करता है।
// कमांड के पेलोड को []bytes में मार्शल करना चाहिए।
type CommandEventMarshaler interface {
// Marshal कमांड या ईवेंट को एक Watermill संदेश में मार्शल करता है।
Marshal(v interface{}) (*message.Message, error)
// Unmarshal वॉटरमिल संदेश को v कमांड या ईवेंट में डिकोड करता है।
Unmarshal(msg *message.Message, v interface{}) (err error)
// Name कमांड या ईवेंट का नाम वापस करता है।
// नाम का उपयोग करके हम यह निर्धारित कर सकते हैं कि प्राप्त कमांड या ईवेंट हमारे द्वारा प्रसंस्करण करना चाहतें हैं या नहीं।
Name(v interface{}) string
// NameFromMessage वॉटरमिल संदेश से (मार्शल के द्वारा उत्पन्न) कमांड या ईवेंट का नाम वापस करता है।
//
// जब हम कमांड या ईवेंट को वॉटरमिल संदेश में मार्शल करते हैं, तो हमें अनावश्यक डिकोडिंग से बचने के लिए Name से बजाय NameFromMessage का उपयोग करना चाहिए।
NameFromMessage(msg *message.Message) string
}
// ...
उपयोग
उदाहरण डोमेन
एक सरल डोमेन का उपयोग करके जो एक होटल में कमरे आरक्षण को संभालने के लिए जिम्मेदार है।
हम इवेंट स्टॉर्मिंग प्रतीकों का उपयोग करेंगे जो इस डोमेन के मॉडल का प्रदर्शन करते हैं।
प्रतीक लीजेंड:
- नीले स्टिकी नोट्स कमांड होते हैं
- नारंगी स्टिकी नोट्स ईवेंट्स होते हैं
- हरा स्टिकी नोट्स इवेंट्स से असमवादित रीड मॉडल्स जो उत्तरदायी होते हैं
- बैंगनी स्टिकी नोट्स विशेष घटनाओं होते हैं; हम वहाँ क्षेत्रों को चिह्नित करते हैं जो अक्सर समस्याओं का सामना करते हैं
डोमेन सरल है:
- ग्राहक कमरे बुक कर सकते हैं।
-
जब भी कोई कमरा बुक होता है, हम ग्राहक के लिए एक बोतल बीयर ऑर्डर करते हैं (क्योंकि हमारे मेहमानों से हम प्यार करते हैं)।
- हम जानते हैं कि कभी-कभी बीयर ख़त्म हो जाती है।
- हम बुकिंग के आधार पर एक वित्तीय रिपोर्ट जनरेट करते हैं।
कमांड भेजना
सबसे पहले, हमें ग्राहक के क्रियाओं की वास्तविकता बनानी होगी।
पूर्ण स्रोत कोड: github.com/ThreeDotsLabs/watermill/_examples/basic/5-cqrs-protobuf
// ...
bookRoomCmd := &BookRoom{
RoomId: fmt.Sprintf("%d", i),
GuestName: "जॉन",
StartDate: startDate,
EndDate: endDate,
}
if err := commandBus.Send(context.Background(), bookRoomCmd); err != nil {
panic(err)
}
// ...
कमांड हैंडलर
BookRoomHandler
हमारे कमांड को संभालेगा।
पूरा स्रोत कोड: github.com/ThreeDotsLabs/watermill/_examples/basic/5-cqrs-protobuf/main.go
// ...
// BookRoomHandler एक कमांड हैंडलर है जो BookRoom कमांड को प्रोसेस करता है और RoomBooked इवेंट को निकालता है।
//
// CQRS में, एक कमांड को हैंडल करना जरुरी है।
// जब इस कमांड को हैंडल करने के लिए एक और हैंडलर जोड़ा जाएगा, तो एक त्रुटि वापस कर दी जाएगी।
type BookRoomHandler struct {
eventBus *cqrs.EventBus
}
func (b BookRoomHandler) HandlerName() string {
return "BookRoomHandler"
}
// NewCommand उन्हें कमांड की प्रकार लौटाता है जिसे यह हैंडलर प्रोसेस करना चाहिए। यह एक पॉइंटर होना चाहिए।
func (b BookRoomHandler) NewCommand() interface{} {
return &BookRoom{}
}
func (b BookRoomHandler) Handle(ctx context.Context, c interface{}) error {
// c हमेशा `NewCommand` द्वारा लौटाई जाने वाली प्रकार होती है, इसलिए प्रकार की असरदारता हमेशा सुरक्षित रहती है
cmd := c.(*BookRoom)
// कुछ यादृच्छिक मूल्य जो वास्तविक उत्पादन में एक अधिक समझदार तरीके में हो सकता है, उसे जोड़ा जा सकता है
मूल्य := (rand.Int63n(40) + 1) * 10
log.Printf(
"Booked %s, from %s to %s",
cmd.RoomId,
cmd.GuestName,
time.Unix(cmd.StartDate.Seconds, int64(cmd.StartDate.Nanos)),
time.Unix(cmd.EndDate.Seconds, int64(cmd.EndDate.Nanos)),
)
// RoomBooked को OrderBeerOnRoomBooked ईवेंट हैंडलर द्वारा हैंडल किया जाएगा,
// और भविष्य में, RoomBooked को कई ईवेंट हैंडलर द्वारा हैंडल किया जा सकता है
if err := b.eventBus.Publish(ctx, &RoomBooked{
रिज़र्वेशनId: watermill.NewUUID(),
RoomId: cmd.RoomId,
GuestName: cmd.GuestName,
मूल्य: मूल्य,
StartDate: cmd.StartDate,
EndDate: cmd.EndDate,
}); err != nil {
return err
}
return nil
}
// OrderBeerOnRoomBooked वह ईवेंट हैंडलर है जो RoomBooked ईवेंट को प्रोसेस करता है और OrderBeer कमांड निकालता है।
// ...
ईवेंट हैंडलर
पहले ही कहा गया है, हमें हर बार जब एक कमरा बुक होता है ("जब कमरा बुक हो जाता है" हेतु लेबल के साथ), हमें बीयर की बोतल ऑर्डर करनी है। हम OrderBeer
कमांड का उपयोग करके इसे प्राप्त करते हैं।
पूरा स्रोत कोड: github.com/ThreeDotsLabs/watermill/_examples/basic/5-cqrs-protobuf/main.go
// ...
// OrderBeerOnRoomBooked वह ईवेंट हैंडलर है जो RoomBooked ईवेंट को प्रोसेस करता है और OrderBeer कमांड निकालता है।
type OrderBeerOnRoomBooked struct {
commandBus *cqrs.CommandBus
}
func (o OrderBeerOnRoomBooked) HandlerName() string {
// यह नाम ईवेंट्स सब्सक्राइबर कंस्ट्रक्टर को कत्ता निर्माण के लिए पारित किया जाता है
return "OrderBeerOnRoomBooked"
}
func (OrderBeerOnRoomBooked) NewEvent() interface{} {
return &RoomBooked{}
}
func (o OrderBeerOnRoomBooked) Handle(ctx context.Context, e interface{}) error {
event := e.(*RoomBooked)
orderBeerCmd := &OrderBeer{
RoomId: event.RoomId,
Count: rand.Int63n(10) + 1,
}
return o.commandBus.Send(ctx, orderBeerCmd)
}
// OrderBeerHandler बहुत ही समान है BookRoomHandler के साथ। यहा का अंतर है कि यह कभी-कभी अंथर बीर न होने पर एक त्रुटि वापस करता है, जिससे कमांड दोबारा जारी किया जाए। आप [उदाहरण स्रोत कोड](https://github.com/ThreeDotsLabs/watermill/tree/master/_examples/basic/5-cqrs-protobuf/?utm_source=cqrs_doc) में पूरा अनुमानन देख सकते हैं।
ईवेंट हैंडलर समूह
डिफ़ॉल्ट रूप से, प्रत्येक इवेंट हैंडलर के लिए एक अलग सब्सक्राइबर इंस्टेंस होता है। यह उपाय तब तक ठीक है जब तक केवल एक इवेंट प्रकार केवल टॉपिक को भेजा जाता है।
टॉपिक पर एक से अधिक इवेंट प्रकार के मामले में, दो विकल्प होते हैं:
- आप
EventConfig.AckOnUnknownEvent
कोtrue
सेट कर सकते हैं - इससे हैंडलर द्वारा संभाले न गए सभी इवेंट को स्वीकृत किया जाएगा। - आप इवेंट हैंडलर समूह तंत्र का उपयोग कर सकते हैं।
इवेंट समूह का उपयोग करने के लिए, आपको EventConfig
में GenerateHandlerGroupSubscribeTopic
और GroupSubscriberConstructor
विकल्पों को सेट करना होगा।
फिर, आप EventProcessor
पर AddHandlersGroup
का उपयोग कर सकते हैं।
पूरा स्रोत कोड: github.com/ThreeDotsLabs/watermill/_examples/basic/5-cqrs-protobuf/main.go
// ...
err = eventProcessor.AddHandlersGroup(
"events",
OrderBeerOnRoomBooked{commandBus},
NewBookingsFinancialReport(),
cqrs.NewGroupEventHandler(func(ctx context.Context, event *BeerOrdered) error {
logger.Info("बीयर ऑर्डर किया गया", watermill.LogFields{
"room_id": event.RoomId,
})
return nil
}),
)
if err != nil {
// ...
GenerateHandlerGroupSubscribeTopic
और GroupSubscriberConstructor
दोनों समूह के नाम के बारे में जानकारी प्राप्त करते हैं फंक्शन पैरामीटर के रूप में।
सामान्य हैंडलर
Watermill v1.3 से शुरू होकर, सामान्य हैंडलर का उपयोग कमांड और इवेंट को हैंडल करने के लिए किया जा सकता है। यह बहुत उपयोगी होता है जब आपके पास कई कमांड/इवेंट होते हैं और आप नहीं चाहते कि हर एक के लिए एक अलग हैंडलर बनाना पड़े।
पूरा स्रोत कोड: github.com/ThreeDotsLabs/watermill/_examples/basic/5-cqrs-protobuf/main.go
// ...
cqrs.NewGroupEventHandler(func(ctx context.Context, event *BeerOrdered) error {
logger.Info("बीयर ऑर्डर किया गया", watermill.LogFields{
"room_id": event.RoomId,
})
return nil
}),
// ...
इसके पीछे, यह एक EventHandler या CommandHandler का निर्माण करता है। यह सभी प्रकार के हैंडलरों के लिए उपयुक्त है।
पूरा स्रोत कोड: github.com/ThreeDotsLabs/watermill/components/cqrs/command_handler.go
// ...
// NewCommandHandler एक नए CommandHandler का निर्माण करता है जो प्रदत्त फ़ंक्शन और फ़ंक्शन पैरामीटर से अनुमानित कमांड प्रकार पर आधारित होता है।
func NewCommandHandler[Command any](
// ...
पूरा स्रोत कोड: github.com/ThreeDotsLabs/watermill/components/cqrs/event_handler.go
// ...
// NewEventHandler एक नए EventHandler का निर्माण करता है जो प्रदत्त फ़ंक्शन और फ़ंक्शन पैरामीटर से अनुमानित इवेंट प्रकार पर आधारित होता है।
func NewEventHandler[T any](
// ...
पूरा स्रोत कोड: github.com/ThreeDotsLabs/watermill/components/cqrs/event_handler.go
// ...
// NewGroupEventHandler प्रदत्त फ़ंक्शन और फ़ंक्शन पैरामीटर से अनुमानित इवेंट प्रकार पर आधारित एक नए GroupEventHandler का निर्माण करता है।
func NewGroupEventHandler[T any](handleFunc func(ctx context.Context, event *T) error) GroupEventHandler {
// ...
घटना हैंडलर का उपयोग करके एक पठन मॉडल तैयार करना
पूर्ण स्रोत कोड: github.com/ThreeDotsLabs/watermill/_examples/basic/5-cqrs-protobuf/main.go
// ...
// BookingsFinancialReport एक पठन मॉडल है जो गणना करता है कि हम बुकिंग से कितना धन कमा सकते हैं।
// जब वे होते हैं, तो वह RoomBooked घटनाओं के लिए सुनती है।
//
// इस अनुमानन की बस मेमोरी में लिखा जाता है। एक उत्पादन वातावरण में, आपको किसी प्रकार के स्थायी संग्रह का उपयोग कर सकतें हैं।
type BookingsFinancialReport struct {
handledBookings map[string]struct{}
totalCharge int64
lock sync.Mutex
}
func NewBookingsFinancialReport() *BookingsFinancialReport {
return &BookingsFinancialReport{handledBookings: map[string]struct{}{}}
}
func (b BookingsFinancialReport) HandlerName() string {
// इस नाम को EventsSubscriberConstructor को पारित किया जाता है और कत्तिग नाम उत्पन्न करने के लिए उपयोग किया जाता है
return "BookingsFinancialReport"
}
func (BookingsFinancialReport) NewEvent() interface{} {
return &RoomBooked{}
}
func (b *BookingsFinancialReport) Handle(ctx context.Context, e interface{}) error {
// हैंडल को साथ में कॉल किया जा सकता है, इसलिए स्थिति सुरक्षा की आवश्यकता है।
b.lock.Lock()
defer b.lock.Unlock()
event := e.(*RoomBooked)
// यदि बार-बार वितरित करने वाली पब/सब का उपयोग किया जाता है, तो हमें संदेशों को डीड्यूप्लिकेट करने की आवश्यकता होती है।
// GoChannel पब/सब निश्चित-रूप से पहुंचने की सेमांटिक्स प्रदान करता है,
// लेकिन चलाएं इस उदाहरण को दूसरे पब/सब प्रवर्गनुयानों के लिए तैयार करें।
if _, ok := b.handledBookings[event.ReservationId]; ok {
return nil
}
b.handledBookings[event.ReservationId] = struct{}{}
b.totalCharge += event.Price
fmt.Printf(">>> $%d के लिए कमरा बुक हुआ\n", b.totalCharge)
return nil
}
var amqpAddress = "amqp://guest:guest@rabbitmq:5672/"
func main() {
// ...
सब कुछ कनेक्ट करें
हमारे पास पहले से ही सभी कॉम्पोनेंट्स मौजूद हैं जो CQRS ऐप्लिकेशन बनाने के लिए आवश्यक हैं।
हम AMQP (RabbitMQ) को हमारा मैसेज ब्रोकर के रूप में उपयोग करेंगे: AMQP.
CQRS के अंतर्निहित रूप से, Watermill का मैसेज राउटर उपयोग करता है। यदि आप इसके बारे में परिचित नहीं हैं और जानना चाहते हैं कि यह कैसे काम करता है, तो आपको गेटिंग स्टार्टेड गाइड देखना चाहिए। यह आपको मेट्रिक्स, पॉइज़न मैसेज कतारें, दर सीमित, सहमति, और हर मैसेज-नियंत्रित ऐप्लिकेशन द्वारा प्रयोग होने वाले अन्य उपकरणों जैसे कि मानक मैसेज पैटर्न का उपयोग कैसे करना है, भी दिखाएगा। ये उपकरण पहले से ही Watermill में शामिल हैं।
चलिए CQRS पर वापस आते हैं। जैसा कि आप पहले ही जानते हैं, CQRS कई कॉम्पोनेंट्स से मिलकर बना होता है जैसे कि कमांड या इवेंट बस, प्रोसेसर, और इत्यादि।
पूरा सोर्स कोड: github.com/ThreeDotsLabs/watermill/_examples/basic/5-cqrs-protobuf/main.go
// ...
func main() {
logger := watermill.NewStdLogger(false, false)
cqrsMarshaler := cqrs.ProtobufMarshaler{}
// You can use any Pub/Sub implementation from here: https://watermill.io/pubsubs/
// Detailed RabbitMQ implementation: https://watermill.io/pubsubs/amqp/
// Commands will be send to queue, because they need to be consumed once.
commandsAMQPConfig := amqp.NewDurableQueueConfig(amqpAddress)
commandsPublisher, err := amqp.NewPublisher(commandsAMQPConfig, logger)
if err != nil {
panic(err)
}
commandsSubscriber, err := amqp.NewSubscriber(commandsAMQPConfig, logger)
if err != nil {
panic(err)
}
// Events will be published to PubSub configured Rabbit, because they may be consumed by multiple consumers.
// (in that case BookingsFinancialReport and OrderBeerOnRoomBooked).
eventsPublisher, err := amqp.NewPublisher(amqp.NewDurablePubSubConfig(amqpAddress, nil), logger)
if err != nil {
panic(err)
}
// CQRS is built on messages router. Detailed documentation: https://watermill.io/docs/messages-router/
router, err := message.NewRouter(message.RouterConfig{}, logger)
if err != nil {
panic(err)
}
// Simple middleware which will recover panics from event or command handlers.
// More about router middlewares you can find in the documentation:
// https://watermill.io/docs/messages-router/#middleware
//
// List of available middlewares you can find in message/router/middleware.
router.AddMiddleware(middleware.Recoverer)
commandBus, err := cqrs.NewCommandBusWithConfig(commandsPublisher, cqrs.CommandBusConfig{
GeneratePublishTopic: func(params cqrs.CommandBusGeneratePublishTopicParams) (string, error) {
// we are using queue RabbitMQ config, so we need to have topic per command type
return params.CommandName, nil
},
OnSend: func(params cqrs.CommandBusOnSendParams) error {
logger.Info("Sending command", watermill.LogFields{
"command_name": params.CommandName,
})
params.Message.Metadata.Set("sent_at", time.Now().String())
return nil
},
Marshaler: cqrsMarshaler,
Logger: logger,
})
if err != nil {
panic(err)
}
commandProcessor, err := cqrs.NewCommandProcessorWithConfig(
router,
cqrs.CommandProcessorConfig{
GenerateSubscribeTopic: func(params cqrs.CommandProcessorGenerateSubscribeTopicParams) (string, error) {
// we are using queue RabbitMQ config, so we need to have topic per command type
return params.CommandName, nil
},
SubscriberConstructor: func(params cqrs.CommandProcessorSubscriberConstructorParams) (message.Subscriber, error) {
// we can reuse subscriber, because all commands have separated topics
return commandsSubscriber, nil
},
OnHandle: func(params cqrs.CommandProcessorOnHandleParams) error {
start := time.Now()
err := params.Handler.Handle(params.Message.Context(), params.Command)
logger.Info("Command handled", watermill.LogFields{
"command_name": params.CommandName,
"duration": time.Since(start),
"err": err,
})
return err
},
Marshaler: cqrsMarshaler,
Logger: logger,
},
)
if err != nil {
panic(err)
}
eventBus, err := cqrs.NewEventBusWithConfig(eventsPublisher, cqrs.EventBusConfig{
GeneratePublishTopic: func(params cqrs.GenerateEventPublishTopicParams) (string, error) {
// because we are using PubSub RabbitMQ config, we can use one topic for all events
return "events", nil
// we can also use topic per event type
// return params.EventName, nil
},
OnPublish: func(params cqrs.OnEventSendParams) error {
logger.Info("Publishing event", watermill.LogFields{
"event_name": params.EventName,
})
params.Message.Metadata.Set("published_at", time.Now().String())
return nil
},
Marshaler: cqrsMarshaler,
Logger: logger,
})
if err != nil {
panic(err)
}
eventProcessor, err := cqrs.NewEventGroupProcessorWithConfig(
router,
cqrs.EventGroupProcessorConfig{
GenerateSubscribeTopic: func(params cqrs.EventGroupProcessorGenerateSubscribeTopicParams) (string, error) {
return "events", nil
},
SubscriberConstructor: func(params cqrs.EventGroupProcessorSubscriberConstructorParams) (message.Subscriber, error) {
config := amqp.NewDurablePubSubConfig(
amqpAddress,
amqp.GenerateQueueNameTopicNameWithSuffix(params.EventGroupName),
)
return amqp.NewSubscriber(config, logger)
},
OnHandle: func(params cqrs.EventGroupProcessorOnHandleParams) error {
start := time.Now()
err := params.Handler.Handle(params.Message.Context(), params.Event)
logger.Info("Event handled", watermill.LogFields{
"event_name": params.EventName,
"duration": time.Since(start),
"err": err,
})
return err
},
Marshaler: cqrsMarshaler,
Logger: logger,
},
)
if err != nil {
panic(err)
}
err = commandProcessor.AddHandlers(
BookRoomHandler{eventBus},
OrderBeerHandler{eventBus},
)
if err != nil {
panic(err)
}
err = eventProcessor.AddHandlersGroup(
"events",
OrderBeerOnRoomBooked{commandBus},
NewBookingsFinancialReport(),
cqrs.NewGroupEventHandler(func(ctx context.Context, event *BeerOrdered) error {
logger.Info("Beer ordered", watermill.LogFields{
"room_id": event.RoomId,
})
return nil
}),
)
if err != nil {
panic(err)
}
// publish BookRoom commands every second to simulate incoming traffic
go publishCommands(commandBus)
// processors are based on router, so they will work when router will start
if err := router.Run(context.Background()); err != nil {
panic(err)
}
}
// ...
यही है। हमारे पास एक चलने योग्य CQRS एप्लिकेशन है।