مکانیزم CQRS

CQRS مخفف عبارت "Command Query Responsibility Segregation" است. این مکانیزم مسئولیت دستور (درخواست‌های نوشتن) و پرس‌وجو (درخواست‌های خواندن) را از یکدیگر جدا می‌کند. درخواست‌های نوشتن و خواندن توسط اشیاء متفاوتی اداره می‌شوند.

این است CQRS. می‌توانیم مخزن داده را نیز جدا کنیم و مخزن‌های جداگانه برای خواندن و نوشتن داشته باشیم. بعد از انجام این کار، ممکن است چندین مخزن خواندنی بهینه شده برای اداره انواع مختلف پرس‌وجوها یا بازه‌های متمایز شدهٔ محدود. اگرچه جدا کردن مخازن خواندن/نوشتن موضوع بحث‌های مرتبط با CQRS است، اما خود CQRS نیست. CQRS فقط اولین جداسازی دستور و پرس‌وجو است.

نمودار معماری CQRS

مؤلفه cqrs فراهم می‌کند که برخی انتزاعات مفید را فراهم می‌سازد که بر پایهٔ Pub/Sub و Router ساخته شده‌اند تا به اجرای الگوی CQRS کمک کنند.

شما نیازی ندارید که کل CQRS را اجرا کنید. به طور معمول، تنها بخش رویداد از مؤلفه برای ساخت برنامه‌های مبتنی بر رویداد استفاده می‌شود.

بلوک‌های ساختاری

رویدادها

رویدادها نشان‌دهندهٔ چیزی هستند که قبلاً رخ داده است. رویدادها قابل تغییر نیستند.

اتوبوس رویداد

کد منبع کامل: github.com/ThreeDotsLabs/watermill/components/cqrs/event_bus.go

// ...
// اتوبوس رویداد، رویدادها را به دستگاه‌های رویداد منتقل می‌کند.
type EventBus struct {
// ...

کد منبع کامل: github.com/ThreeDotsLabs/watermill/components/cqrs/event_bus.go

// ...
type EventBusConfig struct {
    // GeneratePublishTopic برای تولید نام موضوع برای انتشار رویدادها استفاده می‌شود.
    GeneratePublishTopic GenerateEventPublishTopicFn

    // OnPublish قبل از ارسال رویداد فراخوانی می‌شود. می‌تواند *message.Message را اصلاح کند.
    //
    // این گزینه اجباری نیست.
    OnPublish OnEventSendFn

    // Marshaler برای رمزگذاری و رمزگشایی رویدادها استفاده می‌شود.
    // این گزینه اجباری است.
    Marshaler CommandEventMarshaler

    // نمونه ثبت برای ورود به سیستم. اگر ارائه نشود، watermill.NopLogger استفاده می‌شود.
    Logger watermill.LoggerAdapter
}

func (c *EventBusConfig) setDefaults() {
    if c.Logger == nil {
        c.Logger = watermill.NopLogger{}
    }
}
// ...

پردازشگر رویداد

کد کامل: github.com/ThreeDotsLabs/watermill/components/cqrs/event_processor.go

// ...
// EventProcessor برای تعیین EventHandler که باید رویدادهای دریافت شده از اتوبوس رویداد را پردازش کند استفاده می‌شود.
نوع EventProcessor struct {
// ...

کد کامل: github.com/ThreeDotsLabs/watermill/components/cqrs/event_processor.go

// ...
نوع EventProcessorConfig struct {
	// GenerateSubscribeTopic برای تولید موضوع برای مشترک شدن در رویدادها استفاده می‌شود.
	// اگر پردازشگر رویداد از گروه های دستگاه استفاده می‌کند، آنگاه GenerateSubscribeTopic استفاده می‌شود.
	GenerateSubscribeTopic EventProcessorGenerateSubscribeTopicFn

	// SubscriberConstructor برای ایجاد یک مشترک برای EventHandler استفاده می‌شود.
	//
	// این تابع برای هر نمونه EventHandler یکبار فراخوانی می‌شود.
	// اگر می‌خواهید یک مشترک را برای چندین دستگاه استفاده کنید، از GroupEventProcessor استفاده کنید.
	SubscriberConstructor EventProcessorSubscriberConstructorFn

	// OnHandle قبل از پردازش رویداد فراخوانی می‌شود.
	// OnHandle به صورتی مشابه middleware عمل می‌کند: می‌توانید منطق اضافی قبل و بعد از پردازش رویداد را درج کنید.
	//
	// بنابراین، باید به طور صریح params.Handler.Handle() را برای پردازش رویداد فراخوانی کنید.
	//
	//  func(params EventProcessorOnHandleParams) (err error) {
	//      // منطق قبل از پردازش
	//      //  (...)

	//      err := params.Handler.Handle(params.Message.Context(), params.Event)
	//
	//      // منطق بعد از پردازش
	//      //  (...)

	//      return err
	//  }
	//
	// این گزینه اجباری نیست.
	OnHandle EventProcessorOnHandleFn

	// AckOnUnknownEvent برای تعیین استفاده می‌شود که آیا پیام باید تأیید شود یا خیر زمانی که رویداد دستگاه تعریف شده ندارد.
	AckOnUnknownEvent bool

	// Marshaler برای marshal و unmarshal رویدادها استفاده میشود.
	// مورد نیاز است.
	Marshaler CommandEventMarshaler

	// Logger نمونه برای ورود اطلاعات.
	// اگر ارائه نشده باشد، watermill.NopLogger استفاده می‌شود.
	Logger watermill.LoggerAdapter

	// disableRouterAutoAddHandlers برای حفظ سازگاری به سمت عقب استفاده می‌شود.
	// این مقدار وقتی که با استفاده از NewEventProcessor EventProcessor ایجاد می‌شود قرار داده می‌شود.
	// قدیمی شده: به NewEventProcessorWithConfig مهاجرت کنید.
	disableRouterAutoAddHandlers bool
}

func (c *EventProcessorConfig) setDefaults() {
	if c.Logger == nil {
		c.Logger = watermill.NopLogger{}
	}
}
// ...

پردازشگر گروه رویداد

کد منبع کامل: github.com/ThreeDotsLabs/watermill/components/cqrs/event_processor_group.go

// ...
// EventGroupProcessor تعیین می‌کند که کدام پردازش‌گر رویداد باید رویدادهای دریافتی از اتوبوس رویداد را پردازش کند.
// در مقایسه با EventProcessor، EventGroupProcessor به چندین پردازش‌گر اجازه می‌دهد که نمونه اشتراک‌گیرنده مشترک را به اشتراک بگذارند.
type EventGroupProcessor struct {
// ...

کد منبع کامل: github.com/ThreeDotsLabs/watermill/components/cqrs/event_processor_group.go

// ...
type EventGroupProcessorConfig struct {
	// GenerateSubscribeTopic برای تولید موضوع برای اشتراک گرفتن از پردازش‌گر گروه رویدادها استفاده می‌شود.
	// این گزینه برای EventProcessor هنگام استفاده از گروه‌های پردازش‌گر الزامی است.
	GenerateSubscribeTopic EventGroupProcessorGenerateSubscribeTopicFn

	// SubscriberConstructor برای ایجاد یک مشترک برای GroupEventHandler استفاده می‌شود.
	// این تابع یک بار برای هر گروه رویداد فراخوانی می‌شود - اجازه می‌دهد که یک اشتراک برای هر گروه ایجاد شود.
	// این بسیار مفید است زمانی که می‌خواهیم رویدادها از یک جریان به ترتیب را پردازش کنیم.
	SubscriberConstructor EventGroupProcessorSubscriberConstructorFn

	// OnHandle قبل از پردازش رویداد فراخوانی می‌شود.
	// OnHandle مانند میان‌افزار (middleware) است: می‌توانید منطق اضافی را قبل و بعد از پردازش رویداد درج کنید.
	//
	// بنابراین شما باید به طور صریح params.Handler.Handle() را فراخوانی کنید تا رویداد پردازش شود.
	//
	// func(params EventGroupProcessorOnHandleParams) (err error) {
	//     // منطق قبل از پردازش
	//     //  (...)
	//
	//     err := params.Handler.Handle(params.Message.Context(), params.Event)
	//
	//     // منطق بعد از پردازش
	//     //  (...)
	//
	//     return err
	// }
	//
	// این گزینه الزامی نیست.
	OnHandle EventGroupProcessorOnHandleFn

	// AckOnUnknownEvent برای تعیین اینکه آیا باید تایید کنیم که رویدادی که پردازش‌گر تعریف شده ندارد یا خیر استفاده می‌شود.
	AckOnUnknownEvent bool

	// Marshaler برای رمزکردن و رمزگشایی رویدادها استفاده می‌شود.
	// این الزامی است.
	Marshaler CommandEventMarshaler

	// نمونه Logger برای وقفه بر اساس ورودی استفاده می‌شود.
	// اگر ارائه نشود، watermill.NopLogger استفاده خواهد شد.
	Logger watermill.LoggerAdapter
}

func (c *EventGroupProcessorConfig) setDefaults() {
	if c.Logger == nil {
		c.Logger = watermill.NopLogger{}
	}
}
// ...

اطلاعات بیشتر درباره پردازشگر گروه رویداد را بیاموزید.

دستگیره رویداد

کد منبع کامل: github.com/ThreeDotsLabs/watermill/components/cqrs/event_handler.go

// ...
// EventHandler رویدادهای تعریف شده توسط NewEvent را دریافت می‌کند و آنها را با استفاده از متد Handle خود پردازش می‌کند.
// اگر از DDD استفاده می‌شود، دستگیره رویداد می‌تواند پیچیده و پایاشی را تغییر دهد و حفظ کند.
// همچنین می‌تواند مدیران فرآیند، سگاها یا فقط مدل‌های خواندنی ایجاد کند.
//
// برخلاف دستگیره دستور، هر رویداد می‌تواند دارای چندین دستگیره رویداد باشد.
//
// هنگام رسیدگی به پیام، از یک نمونه EventHandler استفاده کنید.
// هنگام انتقال چند رویداد به طور همزمان، می‌توان متد Handle را چندین بار هم‌زمان اجرا کرد.
// بنابراین، متد Handle باید ایمن از نظر ریسه باشد!
type EventHandler interface {
// ...

دستور

یک دستور یک ساختار داده‌ای ساده است که درخواست انجام یک عملیات را نمایان می‌کند.

اتوبوس دستور

کد منبع کامل: github.com/ThreeDotsLabs/watermill/components/cqrs/command_bus.go

// ...
// CommandBus یک جزء است که دستورها را به دستگیرندگان دستور منتقل می‌کند.
type CommandBus struct {
// ...

کد منبع کامل: github.com/ThreeDotsLabs/watermill/components/cqrs/command_bus.go

// ...
type CommandBusConfig struct {
	// GeneratePublishTopic برای تولید موضوع انتشار دستورها استفاده می‌شود.
	GeneratePublishTopic CommandBusGeneratePublishTopicFn

	// OnSend قبل از انتشار یک دستور فراخوانی می‌شود.
	// *message.Message قابل تغییر است.
	//
	// این گزینه اجباری نیست.
	OnSend CommandBusOnSendFn

	// Marshaler برای سریالیزه و دیسریالیزه کردن دستورها استفاده می‌شود.
	// ضروری است.
	Marshaler CommandEventMarshaler

	// Logger نمونه‌ای است که برای وقایع نوبت‌بندی استفاده می‌شود.
	// اگر فراهم نشود، watermill.NopLogger استفاده خواهد شد.
	Logger watermill.LoggerAdapter
}

func (c *CommandBusConfig) setDefaults() {
	if c.Logger == nil {
		c.Logger = watermill.NopLogger{}
	}
}
// ...

پردازش‌گر دستور

کد منبع کامل: github.com/ThreeDotsLabs/watermill/components/cqrs/command_processor.go

// ...
// CommandProcessorSubscriberConstructorFn برای ایجاد مشترک برای CommandHandler استفاده می‌شود.
// این امکان را به شما می‌دهد که برای هر دستگیرنده دستور سازنده مشترک سفارشی جداگانه ایجاد کنید.
type CommandProcessorSubscriberConstructorFn func(CommandProcessorSubscriberConstructorParams) (message.Subscriber, error)
// ...

کد منبع کامل: github.com/ThreeDotsLabs/watermill/components/cqrs/command_processor.go

// ...
type CommandProcessorConfig struct {
	// GenerateSubscribeTopic برای تولید موضوع اشتراک گذاری برای دستورها استفاده می‌شود.
	GenerateSubscribeTopic CommandProcessorGenerateSubscribeTopicFn

	// SubscriberConstructor برای ایجاد مشترک برای CommandHandler استفاده می‌شود.
	SubscriberConstructor CommandProcessorSubscriberConstructorFn

	// OnHandle قبل از بررسی دستور فراخوانی می‌شود.
	// OnHandle مانند یک میان‌افزار عمل می‌کند: می‌توانید منطق اضافی را قبل و بعد از بررسی دستور درج کنید.
	//
	// به همین دلیل، شما باید به طور صریح params.Handler.Handle() را برای بررسی دستور فراخوانی کنید.
	//  func(params CommandProcessorOnHandleParams) (err error) {
	//      // منطق قبل از بررسی
	//      // (...)
	//
	//      err := params.Handler.Handle(params.Message.Context(), params.Command)
	//
	//      // منطق بعد از بررسی
	//      // (...)
	//
	//      return err
	//  }
	//
	// این گزینه اجباری نیست.
	OnHandle CommandProcessorOnHandleFn

	// Marshaler برای سریالیزه و دیسریالیزه کردن دستورها استفاده می‌شود.
	// ضروری است.
	Marshaler CommandEventMarshaler

	// Logger نمونه برای وقایع نوبت‌بندی استفاده می‌شود.
	// اگر فراهم نشود، watermill.NopLogger استفاده خواهد شد.
	Logger watermill.LoggerAdapter

	// اگر صحیح باشد، CommandProcessor حتی اگر CommandHandler خطا برگرداند، پیام‌ها را تصدیق خواهد کرد.
	// اگر RequestReplyBackend خالی نباشد و ارسال پاسخ شکست بخورد، پیام همچنان عدم تصدیق خواهد شد.
	//
	// اخطار: استفاده از این گزینه وقتی از جزیره درخواست و پاسخ استفاده می‌شود (requestreply.NewCommandHandler یا requestreply.NewCommandHandlerWithResult)
	// توصیه نمی‌شودزیرا ممکن است دستور را تصدیق کند وقتی ارسال پاسخ شکست بخورد.
	//
	// زمانی که از جزیره درخواست و پاسخ استفاده می‌شود، باید requestreply.PubSubBackendConfig.AckCommandErrors را استفاده کنید.
	AckCommandHandlingErrors bool

	// disableRouterAutoAddHandlers برای سازگاری به‌سمت عقب استفاده می‌شود.
	// هنگام ایجاد یک CommandProcessor با NewCommandProcessor تنظیم می‌شود.
	// منسوخ شده: لطفاً به NewCommandProcessorWithConfig مهاجرت کنید.
	disableRouterAutoAddHandlers bool
}

func (c *CommandProcessorConfig) setDefaults() {
	if c.Logger == nil {
		c.Logger = watermill.NopLogger{}
	}
}
// ...

پردازشگر دستور

کد منبع کامل: github.com/ThreeDotsLabs/watermill/components/cqrs/command_handler.go

// ...
// CommandHandler دستور تعریف شده توسط NewCommand را دریافت می‌کند و آن را با استفاده از متد Handle اجرا می‌کند.
// اگر از DDD استفاده می‌شود، CommandHandler ممکن است aggregate ها را تغییر دهد و آنها را ذخیره کند.
//
// برخلاف EventHandler، هر Command تنها می‌تواند یک CommandHandler داشته باشد.
//
// در طول پردازش پیام، از یک نمونه از CommandHandler استفاده کنید.
// زمانی که چندین دستور به صورت همزمان تحویل داده شود، ممکن است متد Handle چند بار به صورت همزمان اجرا شود.
// بنابراین، متد Handle باید thread-safe باشد!
type CommandHandler interface {
// ...

ترجمه‌کننده دستور و رویداد

کد منبع کامل: github.com/ThreeDotsLabs/watermill/components/cqrs/marshaler.go

// ...
// CommandEventMarshaler دستورها و رویدادها را به پیام‌های Watermill تبدیل کرده و برعکس.
// بار مفتول دستور نیاز به تبدیل پیام  باید به []bytes شود.
type CommandEventMarshaler interface {
	// Marshal دستور یا رویداد را به یک پیام Watermill تبدیل می‌کند.
	Marshal(v interface{}) (*message.Message, error)

	// Unmarshal پیام Watermill را به v دستور یا رویداد کدگشایی می‌کند.
	Unmarshal(msg *message.Message, v interface{}) (err error)

	// Name نام دستور یا رویداد را برمی‌گرداند.
	// نام می‌تواند برای تعیین این استفاده شود که آیا دستور یا رویداد دریافتی آن چیزی است که می‌خواهیم پردازش کنیم.
	Name(v interface{}) string

	// NameFromMessage نام دستور یا رویداد را از پیام Watermill (تولید شده توسط Marshal) برمی‌گرداند.
	//
	// هنگامی که دستورها یا رویدادها به پیام‌های Watermill مفتول می‌شوند، باید از NameFromMessage به جای Name استفاده کنیم تا از کدگشایی بی‌فایده جلوگیری کنیم.
	NameFromMessage(msg *message.Message) string
}
// ...

استفاده

دامنه نمونه

استفاده از یک دامنه ساده که مسئول رزرو اتاق‌ها در یک هتل است.

ما از نمادهای Event Storming برای نمایش مدل این دامنه استفاده خواهیم کرد.

رمزنگاری نماد:

  • یادداشت‌های چسبان آبی دستورات را نشان می‌دهند
  • یادداشت‌های چسبان نارنجی رویدادها را نشان می‌دهند
  • یادداشت‌های چسبان سبز مدل‌های خواندنی را به صورت ناهمزمان از رویدادها تولید می‌کنند
  • یادداشت‌های چسبان بنفش سیاست‌ها را که توسط رویدادها پیش‌بینی می‌شوند و دستورات را تولید می‌کنند نشان می‌دهند
  • یادداشت‌های چسبان صورتی نقاط داغ هستند؛ ما بخش‌هایی که اغلب با مشکل روبرو می‌شوند علامت‌گذاری می‌کنیم

CQRS Event Storming

دامنه ساده است:

  • مشتریان می‌توانند اتاق‌ها را رزرو کنند.
  • هرگاه یک اتاق رزرو شود، برای مشتری یک بطری آبجو سفارش می‌دهیم (زیرا ما مهمانانمان را دوست داریم).
    • ما می‌دانیم که بعضی وقت‌ها آبجو تمام می‌شود.
  • ما یک گزارش مالی بر اساس رزرو تولید می‌کنیم.

ارسال دستورات

اولین کاری که باید انجام دهیم، شبیه‌سازی اقدامات مشتریان است.

کد منبع کامل: github.com/ThreeDotsLabs/watermill/_examples/basic/5-cqrs-protobuf

// ...
		bookRoomCmd := &BookRoom{
			RoomId:    fmt.Sprintf("%d", i),
			GuestName: "John",
			StartDate: startDate,
			EndDate:   endDate,
		}
		if err := commandBus.Send(context.Background(), bookRoomCmd); err != nil {
			panic(err)
		}
// ...

دستورالعمل هندلر

هندلر BookRoomHandler دستورات ما را کنترل می‌کند.

کد منبع کامل: github.com/ThreeDotsLabs/watermill/_examples/basic/5-cqrs-protobuf/main.go

// ...
// BookRoomHandler یک هندلر دستور است که دستور BookRoom را پردازش می‌کند و رویداد RoomBooked را انتشار می‌دهد.
//
// در CQRS ، یک دستور باید توسط یک هندلر پردازش شود.
// هنگام اضافه کردن یک هندلر دیگر برای پردازش این دستور ، یک خطا برگردانده می‌شود.
type BookRoomHandler struct {
	eventBus *cqrs.EventBus
}

func (b BookRoomHandler) HandlerName() string {
	return "BookRoomHandler"
}

// NewCommand نوع دستوری را که این هندلر باید آن را پردازش کند برمی‌گرداند. باید یک متغیر نوع پوینتر باشد.
func (b BookRoomHandler) NewCommand() interface{} {
	return &BookRoom{}
}

func (b BookRoomHandler) Handle(ctx context.Context, c interface{}) error {
	// c همیشه نوعی است که توسط `NewCommand` برگردانده شود، بنابراین تأیید نوع همیشه ایمن است
	cmd := c.(*BookRoom)

	// قیمت تصادفی، که ممکن است در واقعیت تولیدترین‌تر محاسبه شود
	price := (rand.Int63n(40) + 1) * 10

	log.Printf(
		"Booked %s, from %s to %s",
		cmd.RoomId,
		cmd.GuestName,
		time.Unix(cmd.StartDate.Seconds, int64(cmd.StartDate.Nanos)),
		time.Unix(cmd.EndDate.Seconds, int64(cmd.EndDate.Nanos)),
	)

	// رویداد RoomBooked توسط هندلر رویداد OrderBeerOnRoomBooked پردازش می‌شود،
	// و در آینده، این رویداد می‌تواند توسط چندین هندلر رویداد پردازش شود
	if err := b.eventBus.Publish(ctx, &RoomBooked{
		ReservationId: watermill.NewUUID(),
		RoomId:        cmd.RoomId,
		GuestName:     cmd.GuestName,
		Price:         price,
		StartDate:     cmd.StartDate,
		EndDate:       cmd.EndDate,
	}); err != nil {
		return err
	}

	return nil
}

// OrderBeerOnRoomBooked یک هندلر رویداد است که رویداد RoomBooked را پردازش کرده و دستور OrderBeer را انتشار می‌دهد.
// ...

هندلر‌های رویداد

همانطور که قبلاً گفته شد، ما می‌خواهیم هر بار یک اتاق رزرو شود (با برچسب "هنگامی که اتاق رزرو شود")، یک بطری آبجو سفارش دهیم. برای این کار از دستور OrderBeer استفاده می‌کنیم.

کد منبع کامل: github.com/ThreeDotsLabs/watermill/_examples/basic/5-cqrs-protobuf/main.go

// ...
// OrderBeerOnRoomBooked یک هندلر رویداد است که رویداد RoomBooked را پردازش کرده و دستور OrderBeer را انتشار می‌دهد.
type OrderBeerOnRoomBooked struct {
	commandBus *cqrs.CommandBus
}

func (o OrderBeerOnRoomBooked) HandlerName() string {
	// این نام به EventsSubscriberConstructor منتقل شده و برای تولید نام صف استفاده می‌شود
	return "OrderBeerOnRoomBooked"
}

func (OrderBeerOnRoomBooked) NewEvent() interface{} {
	return &RoomBooked{}
}

func (o OrderBeerOnRoomBooked) Handle(ctx context.Context, e interface{}) error {
	event := e.(*RoomBooked)

	orderBeerCmd := &OrderBeer{
		RoomId: event.RoomId,
		Count:  rand.Int63n(10) + 1,
	}

	return o.commandBus.Send(ctx, orderBeerCmd)
}

// OrderBeerHandler همانند BookRoomHandler بسیار شبیه به هم است. تنها تفاوت این است که بعضی اوقات خطا برمی‌گرداند زمانی که موجودی کافی برای آبجو وجود نداشته باشد، که باعث می‌شود تا دستور مجدداً صادر شود. شما می‌توانید پیاده‌سازی کامل را در [کد منبع مثال](https://github.com/ThreeDotsLabs/watermill/tree/master/_examples/basic/5-cqrs-protobuf/?utm_source=cqrs_doc) پیدا کنید.

گروه‌های کننده رویداد

به طور پیش‌فرض، هر کننده رویداد دارای یک نمونه مشترک جداگانه است. این رویکرد مناسب است اگر تنها یک نوع رویداد به موضوع ارسال شود.

در صورت بروز چندین نوع رویداد بر روی موضوع، دو گزینه وجود دارد:

  1. می‌توانید EventConfig.AckOnUnknownEvent را به true تنظیم کنید - این کار باعث تأیید تمام رویدادهایی می‌شود که توسط کنندگان پردازش نشوند.
  2. می‌توانید از مکانیزم گروه کننده رویداد استفاده کنید.

برای استفاده از گروه‌های رویداد، باید گزینه‌های GenerateHandlerGroupSubscribeTopic و GroupSubscriberConstructor را در EventConfig تنظیم کنید.

سپس، می‌توانید از AddHandlersGroup بر روی EventProcessor استفاده کنید.

کد منبع کامل: github.com/ThreeDotsLabs/watermill/_examples/basic/5-cqrs-protobuf/main.go

// ...
	err = eventProcessor.AddHandlersGroup(
		"events",
		OrderBeerOnRoomBooked{commandBus},

		NewBookingsFinancialReport(),

		cqrs.NewGroupEventHandler(func(ctx context.Context, event *BeerOrdered) error {
			logger.Info("Beer ordered", watermill.LogFields{
				"room_id": event.RoomId,
			})
			return nil
		}),
	)
	if err != nil {
// ...

هر دو GenerateHandlerGroupSubscribeTopic و GroupSubscriberConstructor اطلاعاتی در مورد نام گروه را به عنوان پارامترهای تابع دریافت می‌کنند.

کنندگان عمومی

از نسخه 1.3 Watermill به بعد، می‌توان از کنندگان عمومی برای کنترل دستورات و رویدادها استفاده کرد. این بسیار مفید است زمانی که تعداد زیادی از دستورات/رویدادها دارید و نمی‌خواهید برای هرکدام یک کننده ایجاد کنید.

کد منبع کامل: github.com/ThreeDotsLabs/watermill/_examples/basic/5-cqrs-protobuf/main.go

// ...
		cqrs.NewGroupEventHandler(func(ctx context.Context, event *BeerOrdered) error {
			logger.Info("Beer ordered", watermill.LogFields{
				"room_id": event.RoomId,
			})
			return nil
		}),
// ...

پشت صحنه، یک پیاده‌سازی کننده رویداد یا کننده دستور ایجاد می‌شود. این مناسب برای تمام انواع کننده‌هاست.

کد منبع کامل: github.com/ThreeDotsLabs/watermill/components/cqrs/command_handler.go

// ...
// NewCommandHandler یک پیاده‌سازی جدید از CommandHandler بر اساس تابع ارائه شده و نوع دستور مستند شده از پارامترهای تابع ایجاد می‌کند.
func NewCommandHandler[Command any](
// ...

کد منبع کامل: github.com/ThreeDotsLabs/watermill/components/cqrs/event_handler.go

// ...
// NewEventHandler یک پیاده‌سازی جدید از EventHandler بر اساس تابع ارائه شده و نوع رویدادی مستند شده از پارامترهای تابع ایجاد می‌کند.
func NewEventHandler[T any](
// ...

کد منبع کامل: github.com/ThreeDotsLabs/watermill/components/cqrs/event_handler.go

// ...
// NewGroupEventHandler یک پیاده‌سازی جدید از GroupEventHandler بر اساس تابع ارائه شده و نوع رویدادی مستند شده از پارامترهای تابع ایجاد می‌کند.
func NewGroupEventHandler[T any](handleFunc func(ctx context.Context, event *T) error) GroupEventHandler {
// ...

ساخت یک مدل خواندنی با استفاده از دستگیره‌های رویداد

کد منبع کامل: github.com/ThreeDotsLabs/watermill/_examples/basic/5-cqrs-protobuf/main.go

// ...
// BookingsFinancialReport یک مدل خواندنی است که محاسبه می‌کند چه‌قدر پول می‌توانیم از رزروها به دست بیاوریم.
// وقتی رویدادهای RoomBooked رخ می‌دهد، گوش می‌دهد.
//
// این پیاده‌سازی به سادگی در حافظه مینویسد. در محیط تولید، ممکن است از یک نوع ذخیره سازی پایدار استفاده کنید.
type BookingsFinancialReport struct {
	handledBookings map[string]struct{}
	totalCharge     int64
	lock            sync.Mutex
}

func NewBookingsFinancialReport() *BookingsFinancialReport {
	return &BookingsFinancialReport{handledBookings: map[string]struct{}{}}
}

func (b BookingsFinancialReport) HandlerName() string {
	// این نام به EventsSubscriberConstructor منتقل می‌شود و برای تولید نام صف استفاده می‌شود
	return "BookingsFinancialReport"
}

func (BookingsFinancialReport) NewEvent() interface{} {
	return &RoomBooked{}
}

func (b *BookingsFinancialReport) Handle(ctx context.Context, e interface{}) error {
	// ممکن است Handle به طور همروند فراخوانی شود، بنابراین امنیت رشته لازم است.
	b.lock.Lock()
	defer b.lock.Unlock()

	event := e.(*RoomBooked)

	// زمانی که از Pub/Sub استفاده می‌شود که تضمین تحویل دقیق یکبار را فراهم نمی‌کند، نیاز به از بین بردن تکرار پیام‌ها است.
	// GoChannel Pub/Sub تحویل دقیق یکبار را فراهم می‌کند،
	// اما بیایید این مثال را برای امکان اجرای دیگر پیاده‌سازی‌های Pub/Sub آماده کنیم.
	if _, ok := b.handledBookings[event.ReservationId]; ok {
		return nil
	}
	b.handledBookings[event.ReservationId] = struct{}{}

	b.totalCharge += event.Price

	fmt.Printf(">>> اتاق برای $%d رزرو شده است\n", b.totalCharge)
	return nil
}

var amqpAddress = "amqp://guest:guest@rabbitmq:5672/"

func main() {
// ...

اتصال همه چیز

ما در حال حاضر تمام اجزاء مورد نیاز برای ساخت یک برنامه CQRS را داریم.

ما از AMQP (RabbitMQ) به عنوان بروکر پیام خود استفاده خواهیم کرد: AMQP.

در پایین‌دست، CQRS از مسیریاب پیام Watermill استفاده می‌کند. اگر شما با این آشنا نیستید و می‌خواهید بفهمید چگونه کار می‌کند، باید راهنمای شروع کردن را بررسی کنید. این همچنین به شما نشان می‌دهد که چگونه از الگوهای استاندارد پیام‌دهی مانند معیارها، صف‌های پیام‌های مسموم، محدودیت نرخ، همبندی و دیگر ابزارهای مورد استفاده توسط هر برنامه محور پیام استفاده کنید. این ابزارها از پیش به Watermill ساخته شده‌اند.

بیایید به CQRS بازگردیم. همانطور که قبلاً می‌دانید، CQRS از اجزاء چندگانه‌ای مانند اتوبوس فرمان یا رویداد، پردازنده‌ها و غیره تشکیل شده است.

کد منبع کامل: github.com/ThreeDotsLabs/watermill/_examples/basic/5-cqrs-protobuf/main.go

// ...
func main() {
	logger := watermill.NewStdLogger(false, false)
	cqrsMarshaler := cqrs.ProtobufMarshaler{}

	// You can use any Pub/Sub implementation from here: https://watermill.io/pubsubs/
	// Detailed RabbitMQ implementation: https://watermill.io/pubsubs/amqp/
	// Commands will be send to queue, because they need to be consumed once.
	commandsAMQPConfig := amqp.NewDurableQueueConfig(amqpAddress)
	commandsPublisher, err := amqp.NewPublisher(commandsAMQPConfig, logger)
	if err != nil {
		panic(err)
	}
	commandsSubscriber, err := amqp.NewSubscriber(commandsAMQPConfig, logger)
	if err != nil {
		panic(err)
	}

	// Events will be published to PubSub configured Rabbit, because they may be consumed by multiple consumers.
	// (in that case BookingsFinancialReport and OrderBeerOnRoomBooked).
	eventsPublisher, err := amqp.NewPublisher(amqp.NewDurablePubSubConfig(amqpAddress, nil), logger)
	if err != nil {
		panic(err)
	}

	// CQRS is built on messages router. Detailed documentation: https://watermill.io/docs/messages-router/
	router, err := message.NewRouter(message.RouterConfig{}, logger)
	if err != nil {
		panic(err)
	}

	// Simple middleware which will recover panics from event or command handlers.
	// More about router middlewares you can find in the documentation:
	// https://watermill.io/docs/messages-router/#middleware
	//
	// List of available middlewares you can find in message/router/middleware.
	router.AddMiddleware(middleware.Recoverer)

	commandBus, err := cqrs.NewCommandBusWithConfig(commandsPublisher, cqrs.CommandBusConfig{
		GeneratePublishTopic: func(params cqrs.CommandBusGeneratePublishTopicParams) (string, error) {
			// we are using queue RabbitMQ config, so we need to have topic per command type
			return params.CommandName, nil
		},
		OnSend: func(params cqrs.CommandBusOnSendParams) error {
			logger.Info("Sending command", watermill.LogFields{
				"command_name": params.CommandName,
			})

			params.Message.Metadata.Set("sent_at", time.Now().String())

			return nil
		},
		Marshaler: cqrsMarshaler,
		Logger:    logger,
	})
	if err != nil {
		panic(err)
	}

	commandProcessor, err := cqrs.NewCommandProcessorWithConfig(
		router,
		cqrs.CommandProcessorConfig{
			GenerateSubscribeTopic: func(params cqrs.CommandProcessorGenerateSubscribeTopicParams) (string, error) {
				// we are using queue RabbitMQ config, so we need to have topic per command type
				return params.CommandName, nil
			},
			SubscriberConstructor: func(params cqrs.CommandProcessorSubscriberConstructorParams) (message.Subscriber, error) {
				// we can reuse subscriber, because all commands have separated topics
				return commandsSubscriber, nil
			},
			OnHandle: func(params cqrs.CommandProcessorOnHandleParams) error {
				start := time.Now()

				err := params.Handler.Handle(params.Message.Context(), params.Command)

				logger.Info("Command handled", watermill.LogFields{
					"command_name": params.CommandName,
					"duration":     time.Since(start),
					"err":          err,
				})

				return err
			},
			Marshaler: cqrsMarshaler,
			Logger:    logger,
		},
	)
	if err != nil {
		panic(err)
	}

	eventBus, err := cqrs.NewEventBusWithConfig(eventsPublisher, cqrs.EventBusConfig{
		GeneratePublishTopic: func(params cqrs.GenerateEventPublishTopicParams) (string, error) {
			// because we are using PubSub RabbitMQ config, we can use one topic for all events
			return "events", nil

			// we can also use topic per event type
			// return params.EventName, nil
		},

		OnPublish: func(params cqrs.OnEventSendParams) error {
			logger.Info("Publishing event", watermill.LogFields{
				"event_name": params.EventName,
			})

			params.Message.Metadata.Set("published_at", time.Now().String())

			return nil
		},

		Marshaler: cqrsMarshaler,
		Logger:    logger,
	})
	if err != nil {
		panic(err)
	}

	eventProcessor, err := cqrs.NewEventGroupProcessorWithConfig(
		router,
		cqrs.EventGroupProcessorConfig{
			GenerateSubscribeTopic: func(params cqrs.EventGroupProcessorGenerateSubscribeTopicParams) (string, error) {
				return "events", nil
			},
			SubscriberConstructor: func(params cqrs.EventGroupProcessorSubscriberConstructorParams) (message.Subscriber, error) {
				config := amqp.NewDurablePubSubConfig(
					amqpAddress,
					amqp.GenerateQueueNameTopicNameWithSuffix(params.EventGroupName),
				)

				return amqp.NewSubscriber(config, logger)
			},

			OnHandle: func(params cqrs.EventGroupProcessorOnHandleParams) error {
				start := time.Now()

				err := params.Handler.Handle(params.Message.Context(), params.Event)

				logger.Info("Event handled", watermill.LogFields{
					"event_name": params.EventName,
					"duration":   time.Since(start),
					"err":        err,
				})

				return err
			},

			Marshaler: cqrsMarshaler,
			Logger:    logger,
		},
	)
	if err != nil {
		panic(err)
	}

	err = commandProcessor.AddHandlers(
		BookRoomHandler{eventBus},
		OrderBeerHandler{eventBus},
	)
	if err != nil {
		panic(err)
	}

	err = eventProcessor.AddHandlersGroup(
		"events",
		OrderBeerOnRoomBooked{commandBus},

		NewBookingsFinancialReport(),

		cqrs.NewGroupEventHandler(func(ctx context.Context, event *BeerOrdered) error {
			logger.Info("Beer ordered", watermill.LogFields{
				"room_id": event.RoomId,
			})
			return nil
		}),
	)
	if err != nil {
		panic(err)
	}

	// publish BookRoom commands every second to simulate incoming traffic
	go publishCommands(commandBus)

	// processors are based on router, so they will work when router will start
	if err := router.Run(context.Background()); err != nil {
		panic(err)
	}
}
// ...

این همه است. ما یک برنامه قابل اجرای CQRS داریم.