CQRS میکانیزم
CQRS کا مطلب ہے "کمانڈ کوئری ذمہ داری علیحدگی". یہ کمانڈ (لکھیں درخواستیں) اور کوئری (پڑھیں درخواستیں) کی ذمہ داری کو علیحدہ کرتا ہے۔ لکھیں درخواستیں اور پڑھیں درخواستیں مختلف اشیاء دوار ہوتے ہیں۔
یہ CQRS ہے۔ ہم اعداد و شمار کو علیحدہ کر سکتے ہیں، علیحدہ پڑھنے اور لکھنے کو۔ ایک بار یہ کر لیا جائے تو، مختلف قسم کے کئی پڑھنے ایندروں کو ہینڈل کرنے یا بہت زیادہ متعلق حدود کو دہرانے کے لئے بصیرت پوری کرنے کے لئے مثبت پرائیڈ بنیادی سے منسلک ہوں گے۔ علیحدہ پڑھنے/لکھنے کو عموماً CQRS سے متعلق گفتگو کا موضوع بنایا جاتا ہے، لیکن یہ خود CQRS نہیں ہے۔ CQRS صرف کمانڈ اور کوئری کی پہلی علیحدگی ہے۔
cqrs
کمپوننٹ کچھ مفید تجربات فراہم کرتا ہے، جو Pub/Sub اور Router پر تشکیل دیا گیا ہے، تاکہ CQRS پیٹرن کی تنصیب کرنے کی مدد کر سکے۔
آپ کو پورے CQRS کو نہیں پیچھے چلانے کی ضرورت نہیں ہوتی۔ عام طور پر، صرف کمپوننٹ کا واقع حصہ استعمال کیا جاتا ہے تاکہ واقعہ سرگرم ایپلیکیشن بنایا جا سکے۔
تعمیراتی بلاکس
واقعات
واقعات کوئی چیز ہیں جو پہلے ہو چکی ہے۔ واقعات بے تغیر ہوتے ہیں۔
واقعہ باس
کامل سورس کوڈ: github.com/ThreeDotsLabs/watermill/components/cqrs/event_bus.go
// ...
// EventBus واقعات کو واقعات ہینڈلرز تک منتقل کرتا ہے۔
type EventBus struct {
// ...
کامل سورس کوڈ: github.com/ThreeDotsLabs/watermill/components/cqrs/event_bus.go
// ...
type EventBusConfig struct {
// ٹاپک کا نام بھیجنے کے لئے GeneratePublishTopic استعمال ہوتا ہے۔
GeneratePublishTopic GenerateEventPublishTopicFn
// بھیجنے سے پہلے یہ بلوک پیغام *message.Message کو موڈیفائی کر سکتا ہے۔
//
// یہ اختیار ضرو ضروری نہیں ہے۔
OnPublish OnEventSendFn
// مارشلر واقعات کو انکوڈنگ اور ڈیکوڈنگ کے لئے استعمال ہوتا ہے۔
// یہ ضروری ہے۔
Marshaler CommandEventMarshaler
// لاگر کا مثال۔ اگر فراہم نہ کیا گیا ہو، تو watermill.NopLogger استعمال ہوتا ہے۔
Logger watermill.LoggerAdapter
}
func (c *EventBusConfig) setDefaults() {
if c.Logger == nil {
c.Logger = watermill.NopLogger{}
}
}
// ...
واقعہ پروسیسر
کامل کوڈ: github.com/ThreeDotsLabs/watermill/components/cqrs/event_processor.go
// ...
// EventProcessor استعمال ہوتا ہے تاکہ تعین کیا جائے کہ واقعہ باس سے موصول ہونے والے واقعات کو کونسا ہینڈلر ہینڈل کرے۔
type EventProcessor struct {
// ...
کامل کوڈ: github.com/ThreeDotsLabs/watermill/components/cqrs/event_processor.go
// ...
type EventProcessorConfig struct {
// GenerateSubscribeTopic سبسکرائب کرنے کے لئے موضوع تخلیق کرنے کے لئے استعمال ہوتا ہے۔
// اگر واقعہ پروسیسر ہینڈلر گروپس استعمال کرتا ہے تو GenerateSubscribeTopic استعمال ہوتا ہے۔
GenerateSubscribeTopic EventProcessorGenerateSubscribeTopicFn
// SubscriberConstructor استعمال ہوتا ہے تاکہ ہینڈلر کے لئے ایک سبسکرائبر تخلیق کیا جائے۔
//
// اس فنکشن کو ہر ہینڈلر کی مثال کے لئے ایک بار بلایا جاتا ہے۔
// اگر آپ چاہتے ہیں کہ ایک سبسکرائبر کو مختلف ہینڈلرز کے لئے دوبارہ استعمال کیا جائے تو GroupEventProcessor استعمال کریں۔
SubscriberConstructor EventProcessorSubscriberConstructorFn
// OnHandle واقعہ ہینڈل کرنے سے پہلے بلایا جاتا ہے۔
// OnHandle میں مشابہ مڈلویئر کی طرح کام کرتا ہے: آپ واقعہ کو ہینڈل کرنے سے پہلے اور بعد میں اضافی منطق شامل کرسکتے ہیں۔
//
// لہذا، آپ کو واضح طور پر params.Handler.Handle() کو بلانا ہوگا تاکہ واقعہ کا آپ برخ کر سکیں۔
//
// func(params EventProcessorOnHandleParams) (err error) {
// // Handling سے پہلے منطق
// // (...)
// err := params.Handler.Handle(params.Message.Context(), params.Event)
//
// // Handling کے بعد منطق
// // (...)
// return err
// }
//
// یہ اختیار ضروری نہیں ہے۔
OnHandle EventProcessorOnHandleFn
// AckOnUnknownEvent استعمال ہوتا ہے تاکہ تصدیق کیا جائے کہ کیا پیغام کو تصدیق کیا جائے جب واقعہ کا ہینڈلر تعین نہ ہو۔
AckOnUnknownEvent bool
// Marshaler واقعات کو مارشل اور انمارشل کرنے کے لئے استعمال ہوتا ہے۔
// ضروری ہے۔
Marshaler CommandEventMarshaler
// تصویر گر براہ راستور کے لئے استعمال ہونے والا لاگر انسٹینس۔
// اگر فراہم نہ کیا گیا ہو تو watermill.NopLogger استعمال کیا جائے گا۔
Logger watermill.LoggerAdapter
// disableRouterAutoAddHandlers پس منظوری کو برقرار رکھنے کے لئے ہے۔
// جب EventProcessor کو NewEventProcessor کا استعمال کرتے ہوئے بنایا جائے گا تو اس قدم کی قدیم کمپیٹیبلیٹی میں استعمال ہوگا۔
// منسلک: NewEventProcessorWithConfig۔
disableRouterAutoAddHandlers bool
}
func (c *EventProcessorConfig) setDefaults() {
if c.Logger == nil {
c.Logger = watermill.NopLogger{}
}
}
// ...
واقعہ گروپ پروسیسر
کامل سورس کوڈ: github.com/ThreeDotsLabs/watermill/components/cqrs/event_processor_group.go
// ...
// EventGroupProcessor واقعہ بس سے موصول واقعات کو ہینڈل کرنے والے کون سا واقعہ پروسیسر ہینڈل کرے گا۔
// EventProcessor کے مقابلے میں، EventGroupProcessor اجازت دیتا ہے کہ متعدد پروسیسرز ایک ہی سبسکرائبر کو شئیر کریں۔
type EventGroupProcessor struct {
// ...
کامل سورس کوڈ: github.com/ThreeDotsLabs/watermill/components/cqrs/event_processor_group.go
// ...
type EventGroupProcessorConfig struct {
// GenerateSubscribeTopic گروپ واقعہ پروسیسرز کو سبسکرائب کرنے کے لئے موضوع تخلیق کرنے کے لئے استعمال ہوتا ہے۔
// یہ اختیار EventProcessor کے لئے ضروری ہے جب پروسیسر گروپس کا استعمال کیا جائے۔
GenerateSubscribeTopic EventGroupProcessorGenerateSubscribeTopicFn
// SubscriberConstructor گروپ ہینڈلر کے لئے سبسکرائبر تخلیق کرنے کے لئے استعمال ہوتا ہے۔
// یہ فعل ہر واقعہ گروپ کے لئے ایک بار بلایا جاتا ہے - جو ہر گروپ کے لئے ایک سبسکرپشن بنانے کی اجازت دیتا ہے۔
// جب ہم واقعات کو ترتیب سے ہینڈل کرنا چاہتے ہیں تو یہ بہت کارآمد ہے۔
SubscriberConstructor EventGroupProcessorSubscriberConstructorFn
// Handle سے پہلے واقعہ ہینڈل کرنے سے قبل بلایا جاتا ہے۔
// Handle middleware کی طرح ہے: آپ میں ضمنی منطق شامل کر سکتے ہیں واقعہ ہینڈل کرنے سے پہلے اور بعد میں۔
//
// لہذا، آپ کو صریحاً params.Handler.Handle() کو کال کرنا ہوگا تاکہ واقعہ ہینڈل کیا جائے۔
//
// func(params EventGroupProcessorOnHandleParams) (err error) {
// // ہینڈل کرنے سے پہلے منطق
// // (...)
//
// err := params.Handler.Handle(params.Message.Context(), params.Event)
//
// // ہینڈل کرنے کے بعد منطق
// // (...)
//
// return err
// }
//
// یہ اختیار ضروری نہیں ہے۔
OnHandle EventGroupProcessorOnHandleFn
// AckOnUnknownEvent واضح ہونے والے ہینڈلر کے موجود نہ ہونے کی صورت میں کیا تسلیم کرنا ہے، یہ موجود ہیں یا نہیں؟
AckOnUnknownEvent bool
// Marshaler واقعات کو کوڈ کرنے اور ان کا حل کرنے کے لئے استعمال ہوتا ہے۔
// یہ ضروری ہے۔
Marshaler CommandEventMarshaler
// استعمال ہونے والا لاگر وقت کی لاگ بکس کے لئے
// اگر فراہم نہ کیا گیا ہو تو، تو watermill.NopLogger استعمال ہوگا۔
Logger watermill.LoggerAdapter
}
func (c *EventGroupProcessorConfig) setDefaults() {
if c.Logger == nil {
c.Logger = watermill.NopLogger{}
}
}
// ...
واقعہ گروپ پروسیسر کے بارے میں مزید جانیں۔
واقعہ ہینڈلر
کامل سورس کوڈ: github.com/ThreeDotsLabs/watermill/components/cqrs/event_handler.go
// ...
// EventHandler نےو واقعات کو [نیا واقعہ] میں تعینات کیا ہے اور انہیں اس کے ہینڈل میثاق کا استعمال کرتے ہوئے ہینڈل کرتا ہے۔
// اگر DDD کا استعمال کیا جائے تو واقعہ ہینڈلر توڑا وضاحت دے سکتا ہے اور توہین یوں واقعے کو محفوظ کرسکتا ہے۔
// یہ ساگر، سگا یا بس صرف پڑھی ہینڈلرز کو بنانا اور فراہم کرنا بھی ہو سکتا ہیں۔
//
// کمانڈ ہینڈلر کے مخالفت میں، ہر واقعہ کے پاس مختلف واقعہ ہینڈلرز ہو سکتا ہیں۔
//
// میسیج ہینڈلنگ کے دوران، ایک ہینڈلر انسٹینس استعمال کریں۔
// جب متعدد واقعات کو ایک ساتھ پاس کیا جائے تو، ہینڈل میثاق کو متعدد مرتب کرنے کا طریقہ اختیار کیا جا سکتا ہے۔
// اسلئے، ہینڈل میثاق کو ٹھریڈ کے مطابق بنایا گیا ہوتا ہے!
type EventHandler interface {
// ...
کمانڈ
کمانڈ ایک سادہ ڈیٹا سٹرکچر ہوتا ہے جو کسی عمل کو انجام دینے کی درخواست کو ظاہر کرتا ہے۔
کمانڈ بس
کامل سورس کوڈ: github.com/ThreeDotsLabs/watermill/components/cqrs/command_bus.go
// ...
// CommandBus کمپوننٹ ہے جو کمانڈ ہینڈلر کو کمانڈ منتقل کرتا ہے۔
type CommandBus struct {
// ...
کامل سورس کوڈ: github.com/ThreeDotsLabs/watermill/components/cqrs/command_bus.go
// ...
type CommandBusConfig struct {
// GeneratePublishTopic کمانڈز کو منتشر کرنے کے لئے موضوع تخلیق کرنے کے لئے استعمال ہوتا ہے۔
GeneratePublishTopic CommandBusGeneratePublishTopicFn
// OnSend کمانڈ منتشر کرنے سے پہلے بلایا جاتا ہے۔
// *message.Message کو ترتیب دی جا سکتی ہے۔
//
// یہ اختیار ضروری نہیں ہے۔
OnSend CommandBusOnSendFn
// Marshaler کمانڈز کو سیریلائز اور ڈیسیریلائز کرنے کے لئے استعمال ہوتا ہے۔
// ضروری ہے۔
Marshaler CommandEventMarshaler
// تبادلہ کار کی اور گزار کی گئی کمانڈز کو منظور کرنے والا کمانڈ بس کے لئے استعمال کیا جاتا ہے۔
// اگر فراہم نہ کیا گیا ہو تو ، watermill.NopLogger استعمال کیا جائے گا۔
Logger watermill.LoggerAdapter
}
func (c *CommandBusConfig) setDefaults() {
if c.Logger == nil {
c.Logger = watermill.NopLogger{}
}
}
// ...
کمانڈ پروسیسر
کامل سورس کوڈ: github.com/ThreeDotsLabs/watermill/components/cqrs/command_processor.go
// ...
// کمانڈ ہینڈلر کیلئے مشترک سبسکرائبر بنانے کے لئے CommandProcessorSubscriberConstructorFn استعمال ہوتا ہے۔
// یہ آپ کو ہر کمانڈ ہینڈلر کے لئے علیحدہ کسٹم سبسکرائبر بنانے کی اجازت دیتا ہے۔
type CommandProcessorSubscriberConstructorFn func(CommandProcessorSubscriberConstructorParams) (message.Subscriber, error)
// ...
کامل سورس کوڈ: github.com/ThreeDotsLabs/watermill/components/cqrs/command_processor.go
// ...
type CommandProcessorConfig struct {
// GenerateSubscribeTopic کمانڈز کو سبسکرائب کرنے کے لئے موضوع تخلیق کرنے کے لئے استعمال ہوتا ہے۔
GenerateSubscribeTopic CommandProcessorGenerateSubscribeTopicFn
// کمانڈ ہینڈلر کے لئے سبسکرائبر بنانے کے لئے SubscriberConstructor استعمال ہوتا ہے۔
SubscriberConstructor CommandProcessorSubscriberConstructorFn
// OnHandle کمانڈ کو ہینڈل کرنے سے پہلے بلایا جاتا ہے۔
// OnHandle کام ہوتا ہے جیسے مِڈلویئر: آپ بیچ میں اضافی منطق داخل کر سکتے ہیں کمانڈ کو ہینڈل کرنے سے پہلے اور بعد ازاں۔
//
// اس وجہ سے آپ کو params.Handler.Handle() کو صریح طور پر کمانڈ کو ہینڈل کرنے کے لئے بلانا ہو گا۔
// func(params CommandProcessorOnHandleParams) (err error) {
// // logic before handling
// // (...)
//
// err := params.Handler.Handle(params.Message.Context(), params.Command)
//
// // logic after handling
// // (...)
//
// return err
// }
//
// یہ اوپشن لازمی نہیں ہے۔
OnHandle CommandProcessorOnHandleFn
// مارشل کے لئے استعمال ہوتا ہے سیریلائز اور ڈیسیریلائز کرنے کے لئے کمانڈ کا۔
// ضروری ہے۔
Marshaler CommandEventMarshaler
// لاگنگ کے لئے لاگنگ کا مانڈلر موجود ہے۔
// اگر فراہم نہ کیا گیا ہو تو ، watermill.NopLogger استعمال کیا جائے گا۔
Logger watermill.LoggerAdapter
// اگر سچا ہو ، تو کمانڈ پروسیسر منسلک دیکھیں جانے کے بعد بھی منظورایلاس کرے گا۔
// اگر RequestReplyBackend خالص ہو نہ ہو اور جواب بھیجنے میں ناکامی ہوتی ہے ، تو پیغام خلاص ہو جائے گا۔
//
// ہتا: RequestReply پوئنٹ پروسیسر استعمال کرتے وقت یہ اختیار استعمال کرنا منصوص نہیں ہے.
// جیسا کہ یہ جواب بھیجنے میں ناکامی کے وقت کمانڈ اختیارایررز کومنڈ کر سکتا ہے.
//
// جب آپ requestreply کا استعمال کر رہے ہوں تو آپ کو requestreply.PubSubBackendConfig.AckCommandErrors استعمال کرنا چاہئے۔
AckCommandHandlingErrors bool
// disableRouterAutoAddHandlers واپسی موافقت کیلئے استعمال کیا جاتا ہے۔
// یہ NewCommandProcessor کے ساتھ CommandProcessor کو بنانے کے دوران مثبت کر دیا جاتا ہے۔
// منسوخ: براہ کرم نیا CommandProcessorWithConfig میں ترقی کریں۔
disableRouterAutoAddHandlers bool
}
func (c *CommandProcessorConfig) setDefaults() {
if c.Logger == nil {
c.Logger = watermill.NopLogger{}
}
}
// ...
کمانڈ پروسیسر
مکمل سورس کوڈ: github.com/ThreeDotsLabs/watermill/components/cqrs/command_handler.go
// ...
// CommandHandler کمانڈ کو جو Handle میں استعمال کر کے منظور کرتا ہے جو NewCommand سے تعین کیا گیا ہے
// اگر DDD کا استعمال ہو رہا ہے تو CommandHandler ممکن ہے کہ مہار کو ترتیب دینے، تبدیل کرنے اور ذخیرہ کرے۔
//
// EventHandler کے برخلاف، ہر Command کے پاس صرف ایک CommandHandler ہوتا ہے۔
//
// پیغام ہینڈلنگ کے دوران، ایک CommandHandler کا استعمال کریں۔
// جب بھی متعدد کمانڈز ایک ساتھ منسلک ہوتے ہیں، تو Handle میتھڈ کو متعدد مرتبہ متزامن طور پر اجرا کیا جا سکتا ہے۔
// لہذا Handle میتھڈ میں تھریڈ کی حفاظت ہونی چاہئے!
type CommandHandler interface {
// ...
کمانڈ اور واقعہ مارشلر
مکمل سورس کوڈ: github.com/ThreeDotsLabs/watermill/components/cqrs/marshaler.go
// ...
// CommandEventMarshaler کمانڈز اور واقعات کو واٹرمل پیغاموں میں تبدیل کرتا ہے، اور الٹے میں۔
// کمانڈ کا مسلسل طور پر [] باۓس میں تبدیل کیا جانا ضروری ہوتا ہے۔
type CommandEventMarshaler interface {
// Marshal کمانڈ یا واقعہ کو واٹرمل پیغام میں تبدیل کرتا ہے۔
Marshal(v interface{}) (*message.Message, error)
// Unmarshal واٹرمل پیغام کو v کمانڈ یا واقعہ میں کوڈنگ کرتا ہے۔
Unmarshal(msg *message.Message, v interface{}) (err error)
// Name کمانڈ یا واقعے کا نام واپس کرتا ہے۔
// نام کو استعمال کیا جا سکتا ہے تاکہ ہم پرکار کرنا چاہتے ہیں وصول شدہ کمانڈ یا واقعہ پتہ چل سکے۔
Name(v interface{}) string
// NameFromMessage واٹرمل پیغام سے (مارشل کردہ کیا جانے والا) کمانڈ یا واقعے کا نام واپس کرتا ہے۔
//
// جب ہم کمانڈز یا واقعات واٹرمل پیغاموں میں مارشل کرتے ہیں تو Name کی بجائے NameFromMessage کا استعمال کریں۔
NameFromMessage(msg *message.Message) string
}
// ...
استعمال
مثال ڈومین
ہوٹل میں روم رزرویشن کا سبب ملازمہ ہونے والی ایک سادہ ڈومین کا استعمال کریں۔
ہمیں اس ڈومین کے ماڈل کا پیش کرنے کے لیے Event Storming کے علامات کا استعمال ہوگا۔
علامت کی رہنمائی:
- نیلا اسٹیکی نوٹز کمانڈز ہیں
- نارنجی اسٹیکی نوٹز واقعات ہیں
- سبز اسٹیکی نوٹز واقعات سے غیر تبدیل شدہ ماڈلز ہیں
- جامنی اسٹیکی نوٹز واقعات سے حاصل کردہ کمانڈز ہیں
- گلابی اسٹیکی نوٹز ہانٹسپاٹس ہیں؛ ہم ایسے علاقے نشان کرتے ہیں جو کامیابی سے کبھی کبھار متاثر ہوتے ہیں۔
ڈومین سادہ ہے:
- کسٹمرز روم بک کر سکتے ہیں۔
-
جب بھی روم بک ہوتا ہے، ہم کسٹمر کے لیے بوتل بیئر آرڈر کرتے ہیں (کیونکہ ہم اپنے مہمانوں سے محبت کرتے ہیں)۔
- ہمیں پتا ہے کہ کبھی بیئر ختم ہو جاتی ہے۔
- ہم رزرویشن کے بنیادی پر مالی رپورٹ بناتے ہیں۔
کمانڈز بھیجنا
پہلے، ہمیں کسٹمر ایکشن کی نمائش کرنی ہوگی۔
مکمل سورس کوڈ: github.com/ThreeDotsLabs/watermill/_examples/basic/5-cqrs-protobuf
// ...
بکرروم کمانڈ := &BookRoom{
RoomId: fmt.Sprintf("%d", i),
GuestName: "جان",
StartDate: startDate,
EndDate: endDate,
}
if err := commandBus.Send(context.Background(), بکرروم کمانڈ); err != nil {
panic(err)
}
// ...
کمانڈ ہینڈلر
BookRoomHandler
ہمارے کمانڈز کو ہینڈل کرے گا۔
مکمل سورس کوڈ: github.com/ThreeDotsLabs/watermill/_examples/basic/5-cqrs-protobuf/main.go
// ...
// BookRoomHandler کمانڈ ہینڈلر ہے جو BookRoom کمانڈ کو پروسیس کرے گا اور RoomBooked اونٹ کرے گا۔
//
// CQRS میں، کمانڈ کو ہینڈل کرنا ضروری ہے۔
// اس کمانڈ کو ہینڈل کرنے کے لئے اور ہینڈلر شامل کرنے کی صورت میں، ایک خطا واپس کیا جائے گا۔
type BookRoomHandler struct {
eventBus *cqrs.EventBus
}
func (b BookRoomHandler) HandlerName() string {
return "BookRoomHandler"
}
// NewCommand وہ نئی کمانڈ کی قسم واپس کرتا ہے جس کو یہ ہینڈلر پروسیس کرنا چاہئے۔ یہ ایک پوائنٹر ہونا ضروری ہے۔
func (b BookRoomHandler) NewCommand() interface{} {
return &BookRoom{}
}
func (b BookRoomHandler) Handle(ctx context.Context, c interface{}) error {
// c ہمیشہ NewCommand کے ذریعے واپس کی جانے والی قسم ہوتا ہے، لہذا قسم کیتاکید بے خطر ہوتا ہے
cmd := c.(*BookRoom)
// کچھ بے قاعدہ پرائسنگ، جو عینی تولید میں ایک معقول طریقے سے حساب کیا جا سکتا ہے
price := (rand.Int63n(40) + 1) * 10
log.Printf(
"Booked %s, from %s to %s",
cmd.RoomId,
cmd.GuestName,
time.Unix(cmd.StartDate.Seconds, int64(cmd.StartDate.Nanos)),
time.Unix(cmd.EndDate.Seconds, int64(cmd.EndDate.Nanos)),
)
// RoomBooked کو OrderBeerOnRoomBooked وینٹ ہینڈلر کی طرف سے سنبھالا جائے گا،
// اور مستقبل میں، RoomBooked کو مختلف وینٹ ہینڈلرز کی طرف سے سنبھالا جا سکتا ہے
if err := b.eventBus.Publish(ctx, &RoomBooked{
ReservationId: watermill.NewUUID(),
RoomId: cmd.RoomId,
GuestName: cmd.GuestName,
Price: price,
StartDate: cmd.StartDate,
EndDate: cmd.EndDate,
}); err != nil {
return err
}
return nil
}
// OrderBeerOnRoomBooked وہ وینٹ ہینڈلر ہے جو RoomBooked وینٹ کو پروسیس کرتا ہے اور OrderBeer کمانڈ کو ڈالتا ہے۔
// ...
وینٹ ہینڈلرز
جیسا کہ پہلے ذکر ہوا، ہم ہر بار جب کوئی کمرے کی بکنگ ہوتی ہے ("جب کمرے کی بکنگ ہوتی ہے" کے نام سے تشہیر کیا گیا ہے ) ہم OrderBeer
کمانڈ کا استعمال کرکے یہ حاصل کرتے ہیں۔
مکمل سورس کوڈ: github.com/ThreeDotsLabs/watermill/_examples/basic/5-cqrs-protobuf/main.go
// ...
// OrderBeerOnRoomBooked وہ وینٹ ہینڈلر ہے جو RoomBooked وینٹ کو پروسیس کرتا ہے اور OrderBeer کمانڈ کو ڈالتا ہے۔
type OrderBeerOnRoomBooked struct {
commandBus *cqrs.CommandBus
}
func (o OrderBeerOnRoomBooked) HandlerName() string {
// یہ نام EventsSubscriberConstructor کو قیمتیں تیار کرنے کے لئے بھیجا جاتا ہے
return "OrderBeerOnRoomBooked"
}
func (OrderBeerOnRoomBooked) NewEvent() interface{} {
return &RoomBooked{}
}
func (o OrderBeerOnRoomBooked) Handle(ctx context.Context, e interface{}) error {
event := e.(*RoomBooked)
orderBeerCmd := &OrderBeer{
RoomId: event.RoomId,
Count: rand.Int63n(10) + 1,
}
return o.commandBus.Send(ctx, orderBeerCmd)
}
// OrderBeerHandler وہ کمانڈ ہینڈلر ہے جو OrderBeer کمانڈ کو پروسیس کرتا ہے اور BeerOrdered وینٹ کو ڈالتا ہے۔
// ...
OrderBeerHandler
BookRoomHandler
کے بہت مشابہ ہے۔ صرف یہ فرق ہے کہ کبھی کبھی کم بیئر کی کمی ہونے پر وہ خاموشی سے خاموش ہوجاتا ہے، جس کی وجہ سے کمانڈ کو دوبارہ جاری کیا جاتا ہے۔ آپ مکمل تصویر کوڈ میں تفصیلات کو یہاں سمجھ سکتے ہیں۔
ایونٹ ہینڈلر گروپس
پیش فالت، ہر ایونٹ ہینڈلر کے لئے الگ الگ سبسکرائبر انسٹنس ہوتا ہے۔ اگر صرف ایک ایونٹ کسی ٹاپک کو بھیجا جائے تو یہ ترتیب بہتر ہے۔
اگر ٹاپک پر مختلف ایونٹ کی قسمیں ہوں، تو دو انتخابات ہیں:
- آپ
EventConfig.AckOnUnknownEvent
کوtrue
پر سیٹ کرسکتے ہیں - یہ سب ایونٹس کو تسلیم کرے گا جو ہینڈلر کے ذریعے نہیں سنی۔ - آپ ایونٹ ہینڈلر گروپ کا نظام استعمال کرسکتے ہیں۔
ایونٹ گروپس استعمال کرنے کے لئے، آپ کو EventConfig
میں GenerateHandlerGroupSubscribeTopic
اور GroupSubscriberConstructor
اختیارات سیٹ کرنے ہوتے ہیں۔
پھر، آپ EventProcessor
پر AddHandlersGroup
استعمال کرسکتے ہیں۔
مکمل ماخذ کوڈ: github.com/ThreeDotsLabs/watermill/_examples/basic/5-cqrs-protobuf/main.go
// ...
err = eventProcessor.AddHandlersGroup(
"events",
OrderBeerOnRoomBooked{commandBus},
NewBookingsFinancialReport(),
cqrs.NewGroupEventHandler(func(ctx context.Context, event *BeerOrdered) error {
logger.Info("Beer ordered", watermill.LogFields{
"room_id": event.RoomId,
})
return nil
}),
)
if err != nil {
// ...
GenerateHandlerGroupSubscribeTopic
اور GroupSubscriberConstructor
دونوں گروپ نام کی معلومات فعلی پیرامیٹر کے طور پر وصول کرتے ہیں۔
عام ہینڈلرز
Watermill v1.3 سے شروع ہوتے ہوئے، عام ہینڈلرز کا استعمال کمانڈز اور ایونٹس کو ہینڈل کرنے کے لئے کیا جا سکتا ہے۔ جب آپ کے پاس بہت سارے کمانڈز/ایونٹس ہوں اور آپ ہر ایک کے لئے الگ الگ ہینڈلر نہیں بنانا چاہتے تو یہ کام کرتا ہے۔
پورا ماخذ کوڈ: github.com/ThreeDotsLabs/watermill/_examples/basic/5-cqrs-protobuf/main.go
// ...
cqrs.NewGroupEventHandler(func(ctx context.Context, event *BeerOrdered) error {
logger.Info("Beer ordered", watermill.LogFields{
"room_id": event.RoomId,
})
return nil
}),
// ...
اس کے پیچھے، یہ ایک EventHandler یا CommandHandler کا اطلاق کرتا ہے۔ یہ تمام قسم کے ہینڈلرز کے لئے مناسب ہے۔
پورا ماخذ کوڈ: github.com/ThreeDotsLabs/watermill/components/cqrs/command_handler.go
// ...
// NewCommandHandler[Command any] فنکشن پیرامیٹرز سے حاصل کردہ کمانڈ کی قسم پر نئے کمانڈ ہینڈلر کا نظام بناتا ہے۔
func NewCommandHandler[Command any](
// ...
پورا ماخذ کوڈ: github.com/ThreeDotsLabs/watermill/components/cqrs/event_handler.go
// ...
// NewEventHandler[T any] فنکشن پیرامیٹرز سے حاصل کردہ ایونٹ کی قسم پر نئے ایونٹ ہینڈلر کا نظام بناتا ہے۔
func NewEventHandler[T any](
// ...
پورا ماخذ کوڈ: github.com/ThreeDotsLabs/watermill/components/cqrs/event_handler.go
// ...
// NewGroupEventHandler[T any] فنکشن پیرامیٹرز سے حاصل کردہ ایونٹ کی قسم پر نئے گروپ ایونٹ ہینڈلر کا نظام بناتا ہے۔
func NewGroupEventHandler[T any](handleFunc func(ctx context.Context, event *T) error) GroupEventHandler {
// ...
تقنی کوڈ (انتظار کر رہا ہے کہ ترجمے کا مواد آئے)
ترجمہ نتیجہ
ہر چیز کو منسلک کریں
ہمارے پاس پہلے ہی CQRS ایپلیکیشن بنانے کے لئے ضروری تمام جزو ہیں۔
ہم AMQP (RabbitMQ) کو اپنے پیغام بروکر کے طور پر استعمال کریں گے: AMQP.
CQRS تحت، واٹرمل کا پیغام راؤٹر استعمال ہوتا ہے۔ اگر آپ اس کے بارے میں واقف نہیں ہیں اور یہ جاننا چاہتے ہیں کہ یہ کیسے کام کرتا ہے، تو آپ کو گیٹنگ شروع ہونے والی ہدایت نامہ چیک کرنا چاہئے۔ یہ آپ کو ایسے معیاری پیغام بھیجنے کے پیٹرنز جیسے کہ میٹرکس، زہریلا پیغام قطار، شرح محدودیت، تعلق اور ہر ایک پیغام چلانے والے ایپلیکیشن کیلئے استعمال ہونے والے دیگر ٹولز کا استعمال کرنا سکھاے گا۔ یہ تمام ٹولز پہلے ہی واٹرمل میں شامل ہیں۔
چلیں واپس CQRS پر آتے ہیں۔ جیسا کہ آپ جانتے ہیں، CQRS کئی کمپوننٹس جیسے کہ کمانڈ یا واقعات کی بس، پروسیسرز وغیرہ سے مشتمل ہوتا ہے۔
مکمل سورس کوڈ: github.com/ThreeDotsLabs/watermill/_examples/basic/5-cqrs-protobuf/main.go
// ...
func main() {
logger := watermill.NewStdLogger(false, false)
cqrsMarshaler := cqrs.ProtobufMarshaler{}
// You can use any Pub/Sub implementation from here: https://watermill.io/pubsubs/
// Detailed RabbitMQ implementation: https://watermill.io/pubsubs/amqp/
// Commands will be send to queue, because they need to be consumed once.
commandsAMQPConfig := amqp.NewDurableQueueConfig(amqpAddress)
commandsPublisher, err := amqp.NewPublisher(commandsAMQPConfig, logger)
if err != nil {
panic(err)
}
commandsSubscriber, err := amqp.NewSubscriber(commandsAMQPConfig, logger)
if err != nil {
panic(err)
}
// Events will be published to PubSub configured Rabbit, because they may be consumed by multiple consumers.
// (in that case BookingsFinancialReport and OrderBeerOnRoomBooked).
eventsPublisher, err := amqp.NewPublisher(amqp.NewDurablePubSubConfig(amqpAddress, nil), logger)
if err != nil {
panic(err)
}
// CQRS is built on messages router. Detailed documentation: https://watermill.io/docs/messages-router/
router, err := message.NewRouter(message.RouterConfig{}, logger)
if err != nil {
panic(err)
}
// Simple middleware which will recover panics from event or command handlers.
// More about router middlewares you can find in the documentation:
// https://watermill.io/docs/messages-router/#middleware
//
// List of available middlewares you can find in message/router/middleware.
router.AddMiddleware(middleware.Recoverer)
commandBus, err := cqrs.NewCommandBusWithConfig(commandsPublisher, cqrs.CommandBusConfig{
GeneratePublishTopic: func(params cqrs.CommandBusGeneratePublishTopicParams) (string, error) {
// we are using queue RabbitMQ config, so we need to have topic per command type
return params.CommandName, nil
},
OnSend: func(params cqrs.CommandBusOnSendParams) error {
logger.Info("Sending command", watermill.LogFields{
"command_name": params.CommandName,
})
params.Message.Metadata.Set("sent_at", time.Now().String())
return nil
},
Marshaler: cqrsMarshaler,
Logger: logger,
})
if err != nil {
panic(err)
}
commandProcessor, err := cqrs.NewCommandProcessorWithConfig(
router,
cqrs.CommandProcessorConfig{
GenerateSubscribeTopic: func(params cqrs.CommandProcessorGenerateSubscribeTopicParams) (string, error) {
// we are using queue RabbitMQ config, so we need to have topic per command type
return params.CommandName, nil
},
SubscriberConstructor: func(params cqrs.CommandProcessorSubscriberConstructorParams) (message.Subscriber, error) {
// we can reuse subscriber, because all commands have separated topics
return commandsSubscriber, nil
},
OnHandle: func(params cqrs.CommandProcessorOnHandleParams) error {
start := time.Now()
err := params.Handler.Handle(params.Message.Context(), params.Command)
logger.Info("Command handled", watermill.LogFields{
"command_name": params.CommandName,
"duration": time.Since(start),
"err": err,
})
return err
},
Marshaler: cqrsMarshaler,
Logger: logger,
},
)
if err != nil {
panic(err)
}
eventBus, err := cqrs.NewEventBusWithConfig(eventsPublisher, cqrs.EventBusConfig{
GeneratePublishTopic: func(params cqrs.GenerateEventPublishTopicParams) (string, error) {
// because we are using PubSub RabbitMQ config, we can use one topic for all events
return "events", nil
// we can also use topic per event type
// return params.EventName, nil
},
OnPublish: func(params cqrs.OnEventSendParams) error {
logger.Info("Publishing event", watermill.LogFields{
"event_name": params.EventName,
})
params.Message.Metadata.Set("published_at", time.Now().String())
return nil
},
Marshaler: cqrsMarshaler,
Logger: logger,
})
if err != nil {
panic(err)
}
eventProcessor, err := cqrs.NewEventGroupProcessorWithConfig(
router,
cqrs.EventGroupProcessorConfig{
GenerateSubscribeTopic: func(params cqrs.EventGroupProcessorGenerateSubscribeTopicParams) (string, error) {
return "events", nil
},
SubscriberConstructor: func(params cqrs.EventGroupProcessorSubscriberConstructorParams) (message.Subscriber, error) {
config := amqp.NewDurablePubSubConfig(
amqpAddress,
amqp.GenerateQueueNameTopicNameWithSuffix(params.EventGroupName),
)
return amqp.NewSubscriber(config, logger)
},
OnHandle: func(params cqrs.EventGroupProcessorOnHandleParams) error {
start := time.Now()
err := params.Handler.Handle(params.Message.Context(), params.Event)
logger.Info("Event handled", watermill.LogFields{
"event_name": params.EventName,
"duration": time.Since(start),
"err": err,
})
return err
},
Marshaler: cqrsMarshaler,
Logger: logger,
},
)
if err != nil {
panic(err)
}
err = commandProcessor.AddHandlers(
BookRoomHandler{eventBus},
OrderBeerHandler{eventBus},
)
if err != nil {
panic(err)
}
err = eventProcessor.AddHandlersGroup(
"events",
OrderBeerOnRoomBooked{commandBus},
NewBookingsFinancialReport(),
cqrs.NewGroupEventHandler(func(ctx context.Context, event *BeerOrdered) error {
logger.Info("Beer ordered", watermill.LogFields{
"room_id": event.RoomId,
})
return nil
}),
)
if err != nil {
panic(err)
}
// publish BookRoom commands every second to simulate incoming traffic
go publishCommands(commandBus)
// processors are based on router, so they will work when router will start
if err := router.Run(context.Background()); err != nil {
panic(err)
}
}
// ...
یہ ہے۔ ہمارے پاس ایک قابل چلائی جانے والی CQRS ایپلیکیشن ہے۔