مکانیزم CQRS
CQRS مخفف عبارت "Command Query Responsibility Segregation" است. این مکانیزم مسئولیت دستور (درخواستهای نوشتن) و پرسوجو (درخواستهای خواندن) را از یکدیگر جدا میکند. درخواستهای نوشتن و خواندن توسط اشیاء متفاوتی اداره میشوند.
این است CQRS. میتوانیم مخزن داده را نیز جدا کنیم و مخزنهای جداگانه برای خواندن و نوشتن داشته باشیم. بعد از انجام این کار، ممکن است چندین مخزن خواندنی بهینه شده برای اداره انواع مختلف پرسوجوها یا بازههای متمایز شدهٔ محدود. اگرچه جدا کردن مخازن خواندن/نوشتن موضوع بحثهای مرتبط با CQRS است، اما خود CQRS نیست. CQRS فقط اولین جداسازی دستور و پرسوجو است.
مؤلفه cqrs
فراهم میکند که برخی انتزاعات مفید را فراهم میسازد که بر پایهٔ Pub/Sub و Router ساخته شدهاند تا به اجرای الگوی CQRS کمک کنند.
شما نیازی ندارید که کل CQRS را اجرا کنید. به طور معمول، تنها بخش رویداد از مؤلفه برای ساخت برنامههای مبتنی بر رویداد استفاده میشود.
بلوکهای ساختاری
رویدادها
رویدادها نشاندهندهٔ چیزی هستند که قبلاً رخ داده است. رویدادها قابل تغییر نیستند.
اتوبوس رویداد
کد منبع کامل: github.com/ThreeDotsLabs/watermill/components/cqrs/event_bus.go
// ...
// اتوبوس رویداد، رویدادها را به دستگاههای رویداد منتقل میکند.
type EventBus struct {
// ...
کد منبع کامل: github.com/ThreeDotsLabs/watermill/components/cqrs/event_bus.go
// ...
type EventBusConfig struct {
// GeneratePublishTopic برای تولید نام موضوع برای انتشار رویدادها استفاده میشود.
GeneratePublishTopic GenerateEventPublishTopicFn
// OnPublish قبل از ارسال رویداد فراخوانی میشود. میتواند *message.Message را اصلاح کند.
//
// این گزینه اجباری نیست.
OnPublish OnEventSendFn
// Marshaler برای رمزگذاری و رمزگشایی رویدادها استفاده میشود.
// این گزینه اجباری است.
Marshaler CommandEventMarshaler
// نمونه ثبت برای ورود به سیستم. اگر ارائه نشود، watermill.NopLogger استفاده میشود.
Logger watermill.LoggerAdapter
}
func (c *EventBusConfig) setDefaults() {
if c.Logger == nil {
c.Logger = watermill.NopLogger{}
}
}
// ...
پردازشگر رویداد
کد کامل: github.com/ThreeDotsLabs/watermill/components/cqrs/event_processor.go
// ...
// EventProcessor برای تعیین EventHandler که باید رویدادهای دریافت شده از اتوبوس رویداد را پردازش کند استفاده میشود.
نوع EventProcessor struct {
// ...
کد کامل: github.com/ThreeDotsLabs/watermill/components/cqrs/event_processor.go
// ...
نوع EventProcessorConfig struct {
// GenerateSubscribeTopic برای تولید موضوع برای مشترک شدن در رویدادها استفاده میشود.
// اگر پردازشگر رویداد از گروه های دستگاه استفاده میکند، آنگاه GenerateSubscribeTopic استفاده میشود.
GenerateSubscribeTopic EventProcessorGenerateSubscribeTopicFn
// SubscriberConstructor برای ایجاد یک مشترک برای EventHandler استفاده میشود.
//
// این تابع برای هر نمونه EventHandler یکبار فراخوانی میشود.
// اگر میخواهید یک مشترک را برای چندین دستگاه استفاده کنید، از GroupEventProcessor استفاده کنید.
SubscriberConstructor EventProcessorSubscriberConstructorFn
// OnHandle قبل از پردازش رویداد فراخوانی میشود.
// OnHandle به صورتی مشابه middleware عمل میکند: میتوانید منطق اضافی قبل و بعد از پردازش رویداد را درج کنید.
//
// بنابراین، باید به طور صریح params.Handler.Handle() را برای پردازش رویداد فراخوانی کنید.
//
// func(params EventProcessorOnHandleParams) (err error) {
// // منطق قبل از پردازش
// // (...)
// err := params.Handler.Handle(params.Message.Context(), params.Event)
//
// // منطق بعد از پردازش
// // (...)
// return err
// }
//
// این گزینه اجباری نیست.
OnHandle EventProcessorOnHandleFn
// AckOnUnknownEvent برای تعیین استفاده میشود که آیا پیام باید تأیید شود یا خیر زمانی که رویداد دستگاه تعریف شده ندارد.
AckOnUnknownEvent bool
// Marshaler برای marshal و unmarshal رویدادها استفاده میشود.
// مورد نیاز است.
Marshaler CommandEventMarshaler
// Logger نمونه برای ورود اطلاعات.
// اگر ارائه نشده باشد، watermill.NopLogger استفاده میشود.
Logger watermill.LoggerAdapter
// disableRouterAutoAddHandlers برای حفظ سازگاری به سمت عقب استفاده میشود.
// این مقدار وقتی که با استفاده از NewEventProcessor EventProcessor ایجاد میشود قرار داده میشود.
// قدیمی شده: به NewEventProcessorWithConfig مهاجرت کنید.
disableRouterAutoAddHandlers bool
}
func (c *EventProcessorConfig) setDefaults() {
if c.Logger == nil {
c.Logger = watermill.NopLogger{}
}
}
// ...
پردازشگر گروه رویداد
کد منبع کامل: github.com/ThreeDotsLabs/watermill/components/cqrs/event_processor_group.go
// ...
// EventGroupProcessor تعیین میکند که کدام پردازشگر رویداد باید رویدادهای دریافتی از اتوبوس رویداد را پردازش کند.
// در مقایسه با EventProcessor، EventGroupProcessor به چندین پردازشگر اجازه میدهد که نمونه اشتراکگیرنده مشترک را به اشتراک بگذارند.
type EventGroupProcessor struct {
// ...
کد منبع کامل: github.com/ThreeDotsLabs/watermill/components/cqrs/event_processor_group.go
// ...
type EventGroupProcessorConfig struct {
// GenerateSubscribeTopic برای تولید موضوع برای اشتراک گرفتن از پردازشگر گروه رویدادها استفاده میشود.
// این گزینه برای EventProcessor هنگام استفاده از گروههای پردازشگر الزامی است.
GenerateSubscribeTopic EventGroupProcessorGenerateSubscribeTopicFn
// SubscriberConstructor برای ایجاد یک مشترک برای GroupEventHandler استفاده میشود.
// این تابع یک بار برای هر گروه رویداد فراخوانی میشود - اجازه میدهد که یک اشتراک برای هر گروه ایجاد شود.
// این بسیار مفید است زمانی که میخواهیم رویدادها از یک جریان به ترتیب را پردازش کنیم.
SubscriberConstructor EventGroupProcessorSubscriberConstructorFn
// OnHandle قبل از پردازش رویداد فراخوانی میشود.
// OnHandle مانند میانافزار (middleware) است: میتوانید منطق اضافی را قبل و بعد از پردازش رویداد درج کنید.
//
// بنابراین شما باید به طور صریح params.Handler.Handle() را فراخوانی کنید تا رویداد پردازش شود.
//
// func(params EventGroupProcessorOnHandleParams) (err error) {
// // منطق قبل از پردازش
// // (...)
//
// err := params.Handler.Handle(params.Message.Context(), params.Event)
//
// // منطق بعد از پردازش
// // (...)
//
// return err
// }
//
// این گزینه الزامی نیست.
OnHandle EventGroupProcessorOnHandleFn
// AckOnUnknownEvent برای تعیین اینکه آیا باید تایید کنیم که رویدادی که پردازشگر تعریف شده ندارد یا خیر استفاده میشود.
AckOnUnknownEvent bool
// Marshaler برای رمزکردن و رمزگشایی رویدادها استفاده میشود.
// این الزامی است.
Marshaler CommandEventMarshaler
// نمونه Logger برای وقفه بر اساس ورودی استفاده میشود.
// اگر ارائه نشود، watermill.NopLogger استفاده خواهد شد.
Logger watermill.LoggerAdapter
}
func (c *EventGroupProcessorConfig) setDefaults() {
if c.Logger == nil {
c.Logger = watermill.NopLogger{}
}
}
// ...
اطلاعات بیشتر درباره پردازشگر گروه رویداد را بیاموزید.
دستگیره رویداد
کد منبع کامل: github.com/ThreeDotsLabs/watermill/components/cqrs/event_handler.go
// ...
// EventHandler رویدادهای تعریف شده توسط NewEvent را دریافت میکند و آنها را با استفاده از متد Handle خود پردازش میکند.
// اگر از DDD استفاده میشود، دستگیره رویداد میتواند پیچیده و پایاشی را تغییر دهد و حفظ کند.
// همچنین میتواند مدیران فرآیند، سگاها یا فقط مدلهای خواندنی ایجاد کند.
//
// برخلاف دستگیره دستور، هر رویداد میتواند دارای چندین دستگیره رویداد باشد.
//
// هنگام رسیدگی به پیام، از یک نمونه EventHandler استفاده کنید.
// هنگام انتقال چند رویداد به طور همزمان، میتوان متد Handle را چندین بار همزمان اجرا کرد.
// بنابراین، متد Handle باید ایمن از نظر ریسه باشد!
type EventHandler interface {
// ...
دستور
یک دستور یک ساختار دادهای ساده است که درخواست انجام یک عملیات را نمایان میکند.
اتوبوس دستور
کد منبع کامل: github.com/ThreeDotsLabs/watermill/components/cqrs/command_bus.go
// ...
// CommandBus یک جزء است که دستورها را به دستگیرندگان دستور منتقل میکند.
type CommandBus struct {
// ...
کد منبع کامل: github.com/ThreeDotsLabs/watermill/components/cqrs/command_bus.go
// ...
type CommandBusConfig struct {
// GeneratePublishTopic برای تولید موضوع انتشار دستورها استفاده میشود.
GeneratePublishTopic CommandBusGeneratePublishTopicFn
// OnSend قبل از انتشار یک دستور فراخوانی میشود.
// *message.Message قابل تغییر است.
//
// این گزینه اجباری نیست.
OnSend CommandBusOnSendFn
// Marshaler برای سریالیزه و دیسریالیزه کردن دستورها استفاده میشود.
// ضروری است.
Marshaler CommandEventMarshaler
// Logger نمونهای است که برای وقایع نوبتبندی استفاده میشود.
// اگر فراهم نشود، watermill.NopLogger استفاده خواهد شد.
Logger watermill.LoggerAdapter
}
func (c *CommandBusConfig) setDefaults() {
if c.Logger == nil {
c.Logger = watermill.NopLogger{}
}
}
// ...
پردازشگر دستور
کد منبع کامل: github.com/ThreeDotsLabs/watermill/components/cqrs/command_processor.go
// ...
// CommandProcessorSubscriberConstructorFn برای ایجاد مشترک برای CommandHandler استفاده میشود.
// این امکان را به شما میدهد که برای هر دستگیرنده دستور سازنده مشترک سفارشی جداگانه ایجاد کنید.
type CommandProcessorSubscriberConstructorFn func(CommandProcessorSubscriberConstructorParams) (message.Subscriber, error)
// ...
کد منبع کامل: github.com/ThreeDotsLabs/watermill/components/cqrs/command_processor.go
// ...
type CommandProcessorConfig struct {
// GenerateSubscribeTopic برای تولید موضوع اشتراک گذاری برای دستورها استفاده میشود.
GenerateSubscribeTopic CommandProcessorGenerateSubscribeTopicFn
// SubscriberConstructor برای ایجاد مشترک برای CommandHandler استفاده میشود.
SubscriberConstructor CommandProcessorSubscriberConstructorFn
// OnHandle قبل از بررسی دستور فراخوانی میشود.
// OnHandle مانند یک میانافزار عمل میکند: میتوانید منطق اضافی را قبل و بعد از بررسی دستور درج کنید.
//
// به همین دلیل، شما باید به طور صریح params.Handler.Handle() را برای بررسی دستور فراخوانی کنید.
// func(params CommandProcessorOnHandleParams) (err error) {
// // منطق قبل از بررسی
// // (...)
//
// err := params.Handler.Handle(params.Message.Context(), params.Command)
//
// // منطق بعد از بررسی
// // (...)
//
// return err
// }
//
// این گزینه اجباری نیست.
OnHandle CommandProcessorOnHandleFn
// Marshaler برای سریالیزه و دیسریالیزه کردن دستورها استفاده میشود.
// ضروری است.
Marshaler CommandEventMarshaler
// Logger نمونه برای وقایع نوبتبندی استفاده میشود.
// اگر فراهم نشود، watermill.NopLogger استفاده خواهد شد.
Logger watermill.LoggerAdapter
// اگر صحیح باشد، CommandProcessor حتی اگر CommandHandler خطا برگرداند، پیامها را تصدیق خواهد کرد.
// اگر RequestReplyBackend خالی نباشد و ارسال پاسخ شکست بخورد، پیام همچنان عدم تصدیق خواهد شد.
//
// اخطار: استفاده از این گزینه وقتی از جزیره درخواست و پاسخ استفاده میشود (requestreply.NewCommandHandler یا requestreply.NewCommandHandlerWithResult)
// توصیه نمیشودزیرا ممکن است دستور را تصدیق کند وقتی ارسال پاسخ شکست بخورد.
//
// زمانی که از جزیره درخواست و پاسخ استفاده میشود، باید requestreply.PubSubBackendConfig.AckCommandErrors را استفاده کنید.
AckCommandHandlingErrors bool
// disableRouterAutoAddHandlers برای سازگاری بهسمت عقب استفاده میشود.
// هنگام ایجاد یک CommandProcessor با NewCommandProcessor تنظیم میشود.
// منسوخ شده: لطفاً به NewCommandProcessorWithConfig مهاجرت کنید.
disableRouterAutoAddHandlers bool
}
func (c *CommandProcessorConfig) setDefaults() {
if c.Logger == nil {
c.Logger = watermill.NopLogger{}
}
}
// ...
پردازشگر دستور
کد منبع کامل: github.com/ThreeDotsLabs/watermill/components/cqrs/command_handler.go
// ...
// CommandHandler دستور تعریف شده توسط NewCommand را دریافت میکند و آن را با استفاده از متد Handle اجرا میکند.
// اگر از DDD استفاده میشود، CommandHandler ممکن است aggregate ها را تغییر دهد و آنها را ذخیره کند.
//
// برخلاف EventHandler، هر Command تنها میتواند یک CommandHandler داشته باشد.
//
// در طول پردازش پیام، از یک نمونه از CommandHandler استفاده کنید.
// زمانی که چندین دستور به صورت همزمان تحویل داده شود، ممکن است متد Handle چند بار به صورت همزمان اجرا شود.
// بنابراین، متد Handle باید thread-safe باشد!
type CommandHandler interface {
// ...
ترجمهکننده دستور و رویداد
کد منبع کامل: github.com/ThreeDotsLabs/watermill/components/cqrs/marshaler.go
// ...
// CommandEventMarshaler دستورها و رویدادها را به پیامهای Watermill تبدیل کرده و برعکس.
// بار مفتول دستور نیاز به تبدیل پیام باید به []bytes شود.
type CommandEventMarshaler interface {
// Marshal دستور یا رویداد را به یک پیام Watermill تبدیل میکند.
Marshal(v interface{}) (*message.Message, error)
// Unmarshal پیام Watermill را به v دستور یا رویداد کدگشایی میکند.
Unmarshal(msg *message.Message, v interface{}) (err error)
// Name نام دستور یا رویداد را برمیگرداند.
// نام میتواند برای تعیین این استفاده شود که آیا دستور یا رویداد دریافتی آن چیزی است که میخواهیم پردازش کنیم.
Name(v interface{}) string
// NameFromMessage نام دستور یا رویداد را از پیام Watermill (تولید شده توسط Marshal) برمیگرداند.
//
// هنگامی که دستورها یا رویدادها به پیامهای Watermill مفتول میشوند، باید از NameFromMessage به جای Name استفاده کنیم تا از کدگشایی بیفایده جلوگیری کنیم.
NameFromMessage(msg *message.Message) string
}
// ...
استفاده
دامنه نمونه
استفاده از یک دامنه ساده که مسئول رزرو اتاقها در یک هتل است.
ما از نمادهای Event Storming برای نمایش مدل این دامنه استفاده خواهیم کرد.
رمزنگاری نماد:
- یادداشتهای چسبان آبی دستورات را نشان میدهند
- یادداشتهای چسبان نارنجی رویدادها را نشان میدهند
- یادداشتهای چسبان سبز مدلهای خواندنی را به صورت ناهمزمان از رویدادها تولید میکنند
- یادداشتهای چسبان بنفش سیاستها را که توسط رویدادها پیشبینی میشوند و دستورات را تولید میکنند نشان میدهند
- یادداشتهای چسبان صورتی نقاط داغ هستند؛ ما بخشهایی که اغلب با مشکل روبرو میشوند علامتگذاری میکنیم
دامنه ساده است:
- مشتریان میتوانند اتاقها را رزرو کنند.
-
هرگاه یک اتاق رزرو شود، برای مشتری یک بطری آبجو سفارش میدهیم (زیرا ما مهمانانمان را دوست داریم).
- ما میدانیم که بعضی وقتها آبجو تمام میشود.
- ما یک گزارش مالی بر اساس رزرو تولید میکنیم.
ارسال دستورات
اولین کاری که باید انجام دهیم، شبیهسازی اقدامات مشتریان است.
کد منبع کامل: github.com/ThreeDotsLabs/watermill/_examples/basic/5-cqrs-protobuf
// ...
bookRoomCmd := &BookRoom{
RoomId: fmt.Sprintf("%d", i),
GuestName: "John",
StartDate: startDate,
EndDate: endDate,
}
if err := commandBus.Send(context.Background(), bookRoomCmd); err != nil {
panic(err)
}
// ...
دستورالعمل هندلر
هندلر BookRoomHandler
دستورات ما را کنترل میکند.
کد منبع کامل: github.com/ThreeDotsLabs/watermill/_examples/basic/5-cqrs-protobuf/main.go
// ...
// BookRoomHandler یک هندلر دستور است که دستور BookRoom را پردازش میکند و رویداد RoomBooked را انتشار میدهد.
//
// در CQRS ، یک دستور باید توسط یک هندلر پردازش شود.
// هنگام اضافه کردن یک هندلر دیگر برای پردازش این دستور ، یک خطا برگردانده میشود.
type BookRoomHandler struct {
eventBus *cqrs.EventBus
}
func (b BookRoomHandler) HandlerName() string {
return "BookRoomHandler"
}
// NewCommand نوع دستوری را که این هندلر باید آن را پردازش کند برمیگرداند. باید یک متغیر نوع پوینتر باشد.
func (b BookRoomHandler) NewCommand() interface{} {
return &BookRoom{}
}
func (b BookRoomHandler) Handle(ctx context.Context, c interface{}) error {
// c همیشه نوعی است که توسط `NewCommand` برگردانده شود، بنابراین تأیید نوع همیشه ایمن است
cmd := c.(*BookRoom)
// قیمت تصادفی، که ممکن است در واقعیت تولیدترینتر محاسبه شود
price := (rand.Int63n(40) + 1) * 10
log.Printf(
"Booked %s, from %s to %s",
cmd.RoomId,
cmd.GuestName,
time.Unix(cmd.StartDate.Seconds, int64(cmd.StartDate.Nanos)),
time.Unix(cmd.EndDate.Seconds, int64(cmd.EndDate.Nanos)),
)
// رویداد RoomBooked توسط هندلر رویداد OrderBeerOnRoomBooked پردازش میشود،
// و در آینده، این رویداد میتواند توسط چندین هندلر رویداد پردازش شود
if err := b.eventBus.Publish(ctx, &RoomBooked{
ReservationId: watermill.NewUUID(),
RoomId: cmd.RoomId,
GuestName: cmd.GuestName,
Price: price,
StartDate: cmd.StartDate,
EndDate: cmd.EndDate,
}); err != nil {
return err
}
return nil
}
// OrderBeerOnRoomBooked یک هندلر رویداد است که رویداد RoomBooked را پردازش کرده و دستور OrderBeer را انتشار میدهد.
// ...
هندلرهای رویداد
همانطور که قبلاً گفته شد، ما میخواهیم هر بار یک اتاق رزرو شود (با برچسب "هنگامی که اتاق رزرو شود")، یک بطری آبجو سفارش دهیم. برای این کار از دستور OrderBeer
استفاده میکنیم.
کد منبع کامل: github.com/ThreeDotsLabs/watermill/_examples/basic/5-cqrs-protobuf/main.go
// ...
// OrderBeerOnRoomBooked یک هندلر رویداد است که رویداد RoomBooked را پردازش کرده و دستور OrderBeer را انتشار میدهد.
type OrderBeerOnRoomBooked struct {
commandBus *cqrs.CommandBus
}
func (o OrderBeerOnRoomBooked) HandlerName() string {
// این نام به EventsSubscriberConstructor منتقل شده و برای تولید نام صف استفاده میشود
return "OrderBeerOnRoomBooked"
}
func (OrderBeerOnRoomBooked) NewEvent() interface{} {
return &RoomBooked{}
}
func (o OrderBeerOnRoomBooked) Handle(ctx context.Context, e interface{}) error {
event := e.(*RoomBooked)
orderBeerCmd := &OrderBeer{
RoomId: event.RoomId,
Count: rand.Int63n(10) + 1,
}
return o.commandBus.Send(ctx, orderBeerCmd)
}
// OrderBeerHandler همانند BookRoomHandler بسیار شبیه به هم است. تنها تفاوت این است که بعضی اوقات خطا برمیگرداند زمانی که موجودی کافی برای آبجو وجود نداشته باشد، که باعث میشود تا دستور مجدداً صادر شود. شما میتوانید پیادهسازی کامل را در [کد منبع مثال](https://github.com/ThreeDotsLabs/watermill/tree/master/_examples/basic/5-cqrs-protobuf/?utm_source=cqrs_doc) پیدا کنید.
گروههای کننده رویداد
به طور پیشفرض، هر کننده رویداد دارای یک نمونه مشترک جداگانه است. این رویکرد مناسب است اگر تنها یک نوع رویداد به موضوع ارسال شود.
در صورت بروز چندین نوع رویداد بر روی موضوع، دو گزینه وجود دارد:
- میتوانید
EventConfig.AckOnUnknownEvent
را بهtrue
تنظیم کنید - این کار باعث تأیید تمام رویدادهایی میشود که توسط کنندگان پردازش نشوند. - میتوانید از مکانیزم گروه کننده رویداد استفاده کنید.
برای استفاده از گروههای رویداد، باید گزینههای GenerateHandlerGroupSubscribeTopic
و GroupSubscriberConstructor
را در EventConfig
تنظیم کنید.
سپس، میتوانید از AddHandlersGroup
بر روی EventProcessor
استفاده کنید.
کد منبع کامل: github.com/ThreeDotsLabs/watermill/_examples/basic/5-cqrs-protobuf/main.go
// ...
err = eventProcessor.AddHandlersGroup(
"events",
OrderBeerOnRoomBooked{commandBus},
NewBookingsFinancialReport(),
cqrs.NewGroupEventHandler(func(ctx context.Context, event *BeerOrdered) error {
logger.Info("Beer ordered", watermill.LogFields{
"room_id": event.RoomId,
})
return nil
}),
)
if err != nil {
// ...
هر دو GenerateHandlerGroupSubscribeTopic
و GroupSubscriberConstructor
اطلاعاتی در مورد نام گروه را به عنوان پارامترهای تابع دریافت میکنند.
کنندگان عمومی
از نسخه 1.3 Watermill به بعد، میتوان از کنندگان عمومی برای کنترل دستورات و رویدادها استفاده کرد. این بسیار مفید است زمانی که تعداد زیادی از دستورات/رویدادها دارید و نمیخواهید برای هرکدام یک کننده ایجاد کنید.
کد منبع کامل: github.com/ThreeDotsLabs/watermill/_examples/basic/5-cqrs-protobuf/main.go
// ...
cqrs.NewGroupEventHandler(func(ctx context.Context, event *BeerOrdered) error {
logger.Info("Beer ordered", watermill.LogFields{
"room_id": event.RoomId,
})
return nil
}),
// ...
پشت صحنه، یک پیادهسازی کننده رویداد یا کننده دستور ایجاد میشود. این مناسب برای تمام انواع کنندههاست.
کد منبع کامل: github.com/ThreeDotsLabs/watermill/components/cqrs/command_handler.go
// ...
// NewCommandHandler یک پیادهسازی جدید از CommandHandler بر اساس تابع ارائه شده و نوع دستور مستند شده از پارامترهای تابع ایجاد میکند.
func NewCommandHandler[Command any](
// ...
کد منبع کامل: github.com/ThreeDotsLabs/watermill/components/cqrs/event_handler.go
// ...
// NewEventHandler یک پیادهسازی جدید از EventHandler بر اساس تابع ارائه شده و نوع رویدادی مستند شده از پارامترهای تابع ایجاد میکند.
func NewEventHandler[T any](
// ...
کد منبع کامل: github.com/ThreeDotsLabs/watermill/components/cqrs/event_handler.go
// ...
// NewGroupEventHandler یک پیادهسازی جدید از GroupEventHandler بر اساس تابع ارائه شده و نوع رویدادی مستند شده از پارامترهای تابع ایجاد میکند.
func NewGroupEventHandler[T any](handleFunc func(ctx context.Context, event *T) error) GroupEventHandler {
// ...
ساخت یک مدل خواندنی با استفاده از دستگیرههای رویداد
کد منبع کامل: github.com/ThreeDotsLabs/watermill/_examples/basic/5-cqrs-protobuf/main.go
// ...
// BookingsFinancialReport یک مدل خواندنی است که محاسبه میکند چهقدر پول میتوانیم از رزروها به دست بیاوریم.
// وقتی رویدادهای RoomBooked رخ میدهد، گوش میدهد.
//
// این پیادهسازی به سادگی در حافظه مینویسد. در محیط تولید، ممکن است از یک نوع ذخیره سازی پایدار استفاده کنید.
type BookingsFinancialReport struct {
handledBookings map[string]struct{}
totalCharge int64
lock sync.Mutex
}
func NewBookingsFinancialReport() *BookingsFinancialReport {
return &BookingsFinancialReport{handledBookings: map[string]struct{}{}}
}
func (b BookingsFinancialReport) HandlerName() string {
// این نام به EventsSubscriberConstructor منتقل میشود و برای تولید نام صف استفاده میشود
return "BookingsFinancialReport"
}
func (BookingsFinancialReport) NewEvent() interface{} {
return &RoomBooked{}
}
func (b *BookingsFinancialReport) Handle(ctx context.Context, e interface{}) error {
// ممکن است Handle به طور همروند فراخوانی شود، بنابراین امنیت رشته لازم است.
b.lock.Lock()
defer b.lock.Unlock()
event := e.(*RoomBooked)
// زمانی که از Pub/Sub استفاده میشود که تضمین تحویل دقیق یکبار را فراهم نمیکند، نیاز به از بین بردن تکرار پیامها است.
// GoChannel Pub/Sub تحویل دقیق یکبار را فراهم میکند،
// اما بیایید این مثال را برای امکان اجرای دیگر پیادهسازیهای Pub/Sub آماده کنیم.
if _, ok := b.handledBookings[event.ReservationId]; ok {
return nil
}
b.handledBookings[event.ReservationId] = struct{}{}
b.totalCharge += event.Price
fmt.Printf(">>> اتاق برای $%d رزرو شده است\n", b.totalCharge)
return nil
}
var amqpAddress = "amqp://guest:guest@rabbitmq:5672/"
func main() {
// ...
اتصال همه چیز
ما در حال حاضر تمام اجزاء مورد نیاز برای ساخت یک برنامه CQRS را داریم.
ما از AMQP (RabbitMQ) به عنوان بروکر پیام خود استفاده خواهیم کرد: AMQP.
در پاییندست، CQRS از مسیریاب پیام Watermill استفاده میکند. اگر شما با این آشنا نیستید و میخواهید بفهمید چگونه کار میکند، باید راهنمای شروع کردن را بررسی کنید. این همچنین به شما نشان میدهد که چگونه از الگوهای استاندارد پیامدهی مانند معیارها، صفهای پیامهای مسموم، محدودیت نرخ، همبندی و دیگر ابزارهای مورد استفاده توسط هر برنامه محور پیام استفاده کنید. این ابزارها از پیش به Watermill ساخته شدهاند.
بیایید به CQRS بازگردیم. همانطور که قبلاً میدانید، CQRS از اجزاء چندگانهای مانند اتوبوس فرمان یا رویداد، پردازندهها و غیره تشکیل شده است.
کد منبع کامل: github.com/ThreeDotsLabs/watermill/_examples/basic/5-cqrs-protobuf/main.go
// ...
func main() {
logger := watermill.NewStdLogger(false, false)
cqrsMarshaler := cqrs.ProtobufMarshaler{}
// You can use any Pub/Sub implementation from here: https://watermill.io/pubsubs/
// Detailed RabbitMQ implementation: https://watermill.io/pubsubs/amqp/
// Commands will be send to queue, because they need to be consumed once.
commandsAMQPConfig := amqp.NewDurableQueueConfig(amqpAddress)
commandsPublisher, err := amqp.NewPublisher(commandsAMQPConfig, logger)
if err != nil {
panic(err)
}
commandsSubscriber, err := amqp.NewSubscriber(commandsAMQPConfig, logger)
if err != nil {
panic(err)
}
// Events will be published to PubSub configured Rabbit, because they may be consumed by multiple consumers.
// (in that case BookingsFinancialReport and OrderBeerOnRoomBooked).
eventsPublisher, err := amqp.NewPublisher(amqp.NewDurablePubSubConfig(amqpAddress, nil), logger)
if err != nil {
panic(err)
}
// CQRS is built on messages router. Detailed documentation: https://watermill.io/docs/messages-router/
router, err := message.NewRouter(message.RouterConfig{}, logger)
if err != nil {
panic(err)
}
// Simple middleware which will recover panics from event or command handlers.
// More about router middlewares you can find in the documentation:
// https://watermill.io/docs/messages-router/#middleware
//
// List of available middlewares you can find in message/router/middleware.
router.AddMiddleware(middleware.Recoverer)
commandBus, err := cqrs.NewCommandBusWithConfig(commandsPublisher, cqrs.CommandBusConfig{
GeneratePublishTopic: func(params cqrs.CommandBusGeneratePublishTopicParams) (string, error) {
// we are using queue RabbitMQ config, so we need to have topic per command type
return params.CommandName, nil
},
OnSend: func(params cqrs.CommandBusOnSendParams) error {
logger.Info("Sending command", watermill.LogFields{
"command_name": params.CommandName,
})
params.Message.Metadata.Set("sent_at", time.Now().String())
return nil
},
Marshaler: cqrsMarshaler,
Logger: logger,
})
if err != nil {
panic(err)
}
commandProcessor, err := cqrs.NewCommandProcessorWithConfig(
router,
cqrs.CommandProcessorConfig{
GenerateSubscribeTopic: func(params cqrs.CommandProcessorGenerateSubscribeTopicParams) (string, error) {
// we are using queue RabbitMQ config, so we need to have topic per command type
return params.CommandName, nil
},
SubscriberConstructor: func(params cqrs.CommandProcessorSubscriberConstructorParams) (message.Subscriber, error) {
// we can reuse subscriber, because all commands have separated topics
return commandsSubscriber, nil
},
OnHandle: func(params cqrs.CommandProcessorOnHandleParams) error {
start := time.Now()
err := params.Handler.Handle(params.Message.Context(), params.Command)
logger.Info("Command handled", watermill.LogFields{
"command_name": params.CommandName,
"duration": time.Since(start),
"err": err,
})
return err
},
Marshaler: cqrsMarshaler,
Logger: logger,
},
)
if err != nil {
panic(err)
}
eventBus, err := cqrs.NewEventBusWithConfig(eventsPublisher, cqrs.EventBusConfig{
GeneratePublishTopic: func(params cqrs.GenerateEventPublishTopicParams) (string, error) {
// because we are using PubSub RabbitMQ config, we can use one topic for all events
return "events", nil
// we can also use topic per event type
// return params.EventName, nil
},
OnPublish: func(params cqrs.OnEventSendParams) error {
logger.Info("Publishing event", watermill.LogFields{
"event_name": params.EventName,
})
params.Message.Metadata.Set("published_at", time.Now().String())
return nil
},
Marshaler: cqrsMarshaler,
Logger: logger,
})
if err != nil {
panic(err)
}
eventProcessor, err := cqrs.NewEventGroupProcessorWithConfig(
router,
cqrs.EventGroupProcessorConfig{
GenerateSubscribeTopic: func(params cqrs.EventGroupProcessorGenerateSubscribeTopicParams) (string, error) {
return "events", nil
},
SubscriberConstructor: func(params cqrs.EventGroupProcessorSubscriberConstructorParams) (message.Subscriber, error) {
config := amqp.NewDurablePubSubConfig(
amqpAddress,
amqp.GenerateQueueNameTopicNameWithSuffix(params.EventGroupName),
)
return amqp.NewSubscriber(config, logger)
},
OnHandle: func(params cqrs.EventGroupProcessorOnHandleParams) error {
start := time.Now()
err := params.Handler.Handle(params.Message.Context(), params.Event)
logger.Info("Event handled", watermill.LogFields{
"event_name": params.EventName,
"duration": time.Since(start),
"err": err,
})
return err
},
Marshaler: cqrsMarshaler,
Logger: logger,
},
)
if err != nil {
panic(err)
}
err = commandProcessor.AddHandlers(
BookRoomHandler{eventBus},
OrderBeerHandler{eventBus},
)
if err != nil {
panic(err)
}
err = eventProcessor.AddHandlersGroup(
"events",
OrderBeerOnRoomBooked{commandBus},
NewBookingsFinancialReport(),
cqrs.NewGroupEventHandler(func(ctx context.Context, event *BeerOrdered) error {
logger.Info("Beer ordered", watermill.LogFields{
"room_id": event.RoomId,
})
return nil
}),
)
if err != nil {
panic(err)
}
// publish BookRoom commands every second to simulate incoming traffic
go publishCommands(commandBus)
// processors are based on router, so they will work when router will start
if err := router.Run(context.Background()); err != nil {
panic(err)
}
}
// ...
این همه است. ما یک برنامه قابل اجرای CQRS داریم.