آلية CQRS

تعني CQRS "تفريق مسؤولية الأمر واستعلام الفصل". يفصل مسؤولية الأمر (طلبات الكتابة) والاستعلام (طلبات القراءة). يتم التعامل مع طلبات الكتابة والقراءة بواسطة كائنات مختلفة.

هذه هي CQRS. يمكننا فصل تخزين البيانات بشكل أعمق، والحصول على تخزين قراءة وكتابة منفصل. بمجرد القيام بذلك، قد يكون هناك العديد من تخازين القراءة المحسنة لمعالجة أنواع مختلفة من الاستعلامات أو تشمل عدة سياقات محددة. على الرغم من أن تخزين القراءة/الكتابة المنفصل غالبًا ما يكون موضوعًا للنقاش المتعلق بـ CQRS، إلا أنه ليس CQRS بحد ذاته. يعد CQRS فقط التفريق الأول للأمر والاستعلام.

مخطط بنية CQRS

المكون cqrs يوفر بعض التجريدات المفيدة، التي تعتمد على نظام النشر/الاشتراك والتوجيه، للمساعدة في تنفيذ نمط CQRS.

ليس من الضروري تنفيذ CQRS بالكامل. عادةً ما يُستخدم جزء المكون المتعلق بالحدث فقط لبناء تطبيقات معتمدة على الأحداث.

الكتل البنائية

الأحداث

الأحداث تمثل شيئًا قد حدث بالفعل. الأحداث لا يمكن تغييرها.

حافلة الأحداث

الشفرة المصدرية الكاملة: github.com/ThreeDotsLabs/watermill/components/cqrs/event_bus.go

// ...
// حافلة الأحداث نقل الأحداث إلى المعالجين الخاصين بالأحداث.
type EventBus struct {
// ...

الشفرة المصدرية الكاملة: github.com/ThreeDotsLabs/watermill/components/cqrs/event_bus.go

// ...
type EventBusConfig struct {
    // يُستخدم GeneratePublishTopic لتوليف اسم الموضوع لنشر الأحداث.
    GeneratePublishTopic GenerateEventPublishTopicFn

    // يُستدعى OnPublish قبل إرسال الحدث. يمكنه تعديل *message.Message.
    //
    // هذا الخيار ليس إجباريًا.
    OnPublish OnEventSendFn

    // يُستخدم Marshaler لترميز وفك ترميز الأحداث.
    // هذا إجباري.
    Marshaler CommandEventMarshaler

    // مثيل Logger لتسجيل الأحداث. إذا لم يتم توفيره، يُستخدم watermill.NopLogger.
    Logger watermill.LoggerAdapter
}

func (c *EventBusConfig) setDefaults() {
    if c.Logger == nil {
        c.Logger = watermill.NopLogger{}
    }
}
// ...

معالج الحدث

الرمز الكامل: github.com/ThreeDotsLabs/watermill/components/cqrs/event_processor.go

// ...
// تُستخدم معالج الحدث لتحديد معالج الحدث الذي يجب أن يتعامل مع الأحداث المستلمة من حافلة الأحداث.
type EventProcessor struct {
// ...

الرمز الكامل: github.com/ThreeDotsLabs/watermill/components/cqrs/event_processor.go

// ...
type EventProcessorConfig struct {
	// GenerateSubscribeTopic تُستخدم لإنشاء موضوع الاشتراك في الأحداث.
	// إذا استخدم معالج الحدث مجموعات المعالجين، فإنه ستُستخدم GenerateSubscribeTopic.
	GenerateSubscribeTopic EventProcessorGenerateSubscribeTopicFn

	// SubscriberConstructor يُستخدم لإنشاء مشترك لمعالج الحدث.
	//
	// يتم استدعاء هذه الوظيفة مرة واحدة لكل مثيل من معالج الحدث.
	// إذا كنت ترغب في إعادة استخدام مشترك لعدة معالجين، استخدم GroupEventProcessor.
	SubscriberConstructor EventProcessorSubscriberConstructorFn

	// OnHandle يُستدعى قبل التعامل مع الحدث.
	// OnHandle يعمل بشكل مماثل للوسيط: يمكنك حقن منطق إضافي قبل وبعد التعامل مع الحدث.
	//
	// لذلك، يجب عليك استدعاء بشكل صريح params.Handler.Handle() للتعامل مع الحدث.
	//
	//  func(params EventProcessorOnHandleParams) (err error) {
	//      // منطق قبل التعامل
	//      //  (...)

	//      err := params.Handler.Handle(params.Message.Context(), params.Event)
	//
	//      // منطق بعد التعامل
	//      //  (...)

	//      return err
	//  }
	//
	// هذا الخيار ليس إلزاميًا.
	OnHandle EventProcessorOnHandleFn

	// AckOnUnknownEvent يُستخدم لتحديد ما إذا كان يجب إقرار الرسالة عندما لا يكون هناك معالج محدد للحدث.
	AckOnUnknownEvent bool

	// Marshaler تُستخدم لتسوية وفك تسوية الأحداث.
	// مطلوب.
	Marshaler CommandEventMarshaler

	// مثيل Logger لتسجيل الأحداث.
	// إذا لم يتم توفيره، سيتم استخدام watermill.NopLogger.
	Logger watermill.LoggerAdapter

	// disableRouterAutoAddHandlers للحفاظ على التوافق مع الإصدارات السابقة.
	// سيتم تعيين هذا القيمة عند إنشاء معالج الحدث باستخدام NewEventProcessor.
	// مهجور: قم بالانتقال إلى NewEventProcessorWithConfig.
	disableRouterAutoAddHandlers bool
}

func (c *EventProcessorConfig) setDefaults() {
	if c.Logger == nil {
		c.Logger = watermill.NopLogger{}
	}
}
// ...

معالج مجموعة الأحداث

الشيفرة الكاملة: github.com/ThreeDotsLabs/watermill/components/cqrs/event_processor_group.go

// ...
// معالج مجموعة الأحداث يقرر أي معالج أحداث يجب أن يتعامل مع الأحداث التي تم استلامها من حافلة الأحداث.
// بالمقارنة مع معالج الأحداث، يسمح معالج مجموعة الأحداث لعدة معالجين بمشاركة نفس نسخة المشترك.
type EventGroupProcessor struct {
// ...

الشيفرة الكاملة: github.com/ThreeDotsLabs/watermill/components/cqrs/event_processor_group.go

// ...
type EventGroupProcessorConfig struct {
	// GenerateSubscribeTopic يُستخدم لإنشاء الموضوع للاشتراك في مجموعة معالجي الأحداث.
	// هذا الخيار مطلوب لمعالج الأحداث عند استخدام مجموعات المعالجين.
	GenerateSubscribeTopic EventGroupProcessorGenerateSubscribeTopicFn

	// يُستخدم مُنشئ المشترك لإنشاء مشترك لمعالج الحدث الجماعي.
	// يُستدعى هذا الدالة مرة واحدة لكل مجموعة أحداث - مما يسمح بإنشاء اشتراك لكل مجموعة.
	// وهو مفيد جدًا عندما نريد معالجة الأحداث من تيار بترتيب.
	SubscriberConstructor EventGroupProcessorSubscriberConstructorFn

	// يتم استدعاء OnHandle قبل التعامل مع الحدث.
	// يشبه OnHandle التوسط: يمكنك حقن منطق إضافي قبل وبعد التعامل مع الحدث.
	//
	// وبالتالي، يجب عليك الاتصال بوضع params.Handler.Handle() بوضوح للتعامل مع الحدث.
	//
	// func(params EventGroupProcessorOnHandleParams) (err error) {
	//     // منطق قبل التعامل
	//     //  (...)
	//
	//     err := params.Handler.Handle(params.Message.Context(), params.Event)
	//
	//     // منطق بعد التعامل
	//     //  (...)
	//
	//     return err
	// }
	//
	// هذا الخيار غير مطلوب.
	OnHandle EventGroupProcessorOnHandleFn

	// AckOnUnknownEvent يُستخدم لتحديد ما إذا كان يجب الاعتراف إذا كان الحدث ليس له معالج محدد.
	AckOnUnknownEvent bool

	// يُستخدم Marshaler لترميز وفك ترميز الأحداث.
	// هذا مطلوب.
	Marshaler CommandEventMarshaler

	// مثيل سجل يُستخدم لتسجيل الأحداث.
	// إذا لم يتم توفيره، سيتم استخدام watermill.NopLogger.
	Logger watermill.LoggerAdapter
}

func (c *EventGroupProcessorConfig) setDefaults() {
	if c.Logger == nil {
		c.Logger = watermill.NopLogger{}
	}
}
// ...

تعرف أكثر عن معالج مجموعة الأحداث.

معالج الأحداث

الشيفرة الكاملة: github.com/ThreeDotsLabs/watermill/components/cqrs/event_handler.go

// ...
// يتلقى معالج الأحداث الأحداث المحددة بواسطة NewEvent ويقوم بالتعامل معها باستخدام طريقته Handle.
// إذا كنت تستخدم البرمجة الدفعية المحددة (DDD)، يمكن لمعالج الأحداث تعديل والاحتفاظ بالمجاميع.
// كما يمكنه استدعاء مديري العمليات، أو الملحنات (ساجا)، أو بناء نماذج القراءة فقط.
//
// على عكس معالجي الأوامر، يمكن أن يكون لكل حدث عدة معالجي أحداث.
//
// خلال معالجة الرسالة، استخدم مثيلًا واحدًا من معالج الأحداث.
// عند تمرير العديد من الأحداث في نفس الوقت، يمكن تنفيذ طريقة التعامل Handle مرارًا وتكرارًا بشكل متزامن.
// لذلك، تحتاج طريقة التعامل Handle أن تكون آمنة متزامنة!
type EventHandler interface {
// ...

الأمر

الأمر هو هيكل بيانات بسيط يمثل طلبًا لأداء بعض العملية.

حافلة الأوامر

الشيفرة البرمجية الكاملة: github.com/ThreeDotsLabs/watermill/components/cqrs/command_bus.go

// ...
// حافلة الأوامر هي المكوّن الذي يقوم بنقل الأوامر إلى معالجي الأوامر.
type CommandBus struct {
// ...

الشيفرة البرمجية الكاملة: github.com/ThreeDotsLabs/watermill/components/cqrs/command_bus.go

// ...
type CommandBusConfig struct {
	// GeneratePublishTopic يُستخدم لتوليد الموضوع لنشر الأوامر.
	GeneratePublishTopic CommandBusGeneratePublishTopicFn

	// OnSend يُستدعى قبل نشر أمر.
	// يمكن تعديل *message.Message.
	//
	// هذا الخيار غير إلزامي.
	OnSend CommandBusOnSendFn

	// تُستخدم Marshaler لتسلسل وفك تسلسل الأوامر.
	// مطلوب.
	Marshaler CommandEventMarshaler

	// مثيل Logger مُستخدم لتسجيل الأحداث.
	// إذا لم يُقدم، سيتم استخدام watermill.NopLogger.
	Logger watermill.LoggerAdapter
}

func (c *CommandBusConfig) setDefaults() {
	if c.Logger == nil {
		c.Logger = watermill.NopLogger{}
	}
}
// ...

معالج الأوامر

الشيفرة البرمجية الكاملة: github.com/ThreeDotsLabs/watermill/components/cqrs/command_processor.go

// ...
// تُستخدم CommandProcessorSubscriberConstructorFn لإنشاء مشترك لمعالج الأوامر.
// يسمح لك بإنشاء مشترك مخصص منفصل لكل معالج أمر.
type CommandProcessorSubscriberConstructorFn func(CommandProcessorSubscriberConstructorParams) (message.Subscriber, error)
// ...

الشيفرة البرمجية الكاملة: github.com/ThreeDotsLabs/watermill/components/cqrs/command_processor.go

// ...
type CommandProcessorConfig struct {
	// GenerateSubscribeTopic تُستخدم لتوليد الموضوع للاشتراك في الأوامر.
	GenerateSubscribeTopic CommandProcessorGenerateSubscribeTopicFn

	// تُستخدم SubscriberConstructor لإنشاء مشترك لمعالج الأوامر.
	SubscriberConstructor CommandProcessorSubscriberConstructorFn

	// OnHandle يُستدعى قبل معالجة الأمر.
	// يعمل OnHandle مثل وسيط: يمكنك حقن منطق إضافي قبل وبعد معالجة الأمر.
	//
	// لذا، تحتاج إلى استدعاء params.Handler.Handle() صراحة لمعالجة الأمر.
	//  func(params CommandProcessorOnHandleParams) (err error) {
	//      // المنطق قبل المعالجة
	//      // (...)
	//
	//      err := params.Handler.Handle(params.Message.Context(), params.Command)
	//
	//      // البعد عن المعالجة
	//      // (...)
	//
	//      return err
	//  }
	//
	// هذا الخيار غير مطلوب.
	OnHandle CommandProcessorOnHandleFn

	// تُستخدم Marshaler لتسلسل وفك تسلسل الأوامر.
	// مطلوب.
	Marshaler CommandEventMarshaler

	// مثيل Logger لتسجيل الأحداث.
	// إذا لم يُقدم، سيتم استخدام watermill.NopLogger.
	Logger watermill.LoggerAdapter

	// إذا صح، سيؤكد CommandProcessor الرسائل حتى لو أرجع CommandHandler خطأ.
	// إذا كان RequestReplyBackend ليس فارغًا وفشل إرسال الرد، سيتم إلغاء الرسالة على أي حال.
	//
	// تحذير: ليس مُستحسن استخدام هذا الخيار عند استخدام المكون requestreply (requestreply.NewCommandHandler أو requestreply.NewCommandHandlerWithResult)،
	// حيث قد يؤكد الأمر في حال فشل إرسال الرد.
	//
	// عند استخدام requestreply، يجب استخدام requestreply.PubSubBackendConfig.AckCommandErrors.
	AckCommandHandlingErrors bool

	// disableRouterAutoAddHandlers تُستخدم للتوافق مع الإصدارات السابقة.
	// يتم تعيينه عند إنشاء CommandProcessor بـ NewCommandProcessor.
	// تم إهماله: يرجى الانتقال إلى NewCommandProcessorWithConfig.
	disableRouterAutoAddHandlers bool
}

func (c *CommandProcessorConfig) setDefaults() {
	if c.Logger == nil {
		c.Logger = watermill.NopLogger{}
	}
}
// ...

معالج الأوامر

الشفرة الكاملة: github.com/ThreeDotsLabs/watermill/components/cqrs/command_handler.go

// ...
// CommandHandler يستقبل الأمر المحدد بواسطة NewCommand ويعالجه باستخدام الأسلوب Handle.
// إذا كنت تستخدم DDD، فقد يقوم CommandHandler بتعديل والاحتفاظ بالمجاميع.
//
// على عكس EventHandler، يمكن أن يحتوي كل CommandHandler على معالج أمر واحد فقط.
//
// أثناء معالجة الرسائل، استخدم مثيل واحد من CommandHandler.
// عند تسليم أمر عدة مرات في نفس الوقت، قد يتم تنفيذ الأسلوب Handle عدة مرات بشكل متزامن.
// لذا، يجب أن يكون الأسلوب Handle آمنًا من الناحية التقنية!
type CommandHandler interface {
// ...

محول الأمر والحدث

الشفرة الكاملة: github.com/ThreeDotsLabs/watermill/components/cqrs/marshaler.go

// ...
// CommandEventMarshaler يحول الأوامر والأحداث إلى رسائل Watermill، والعكس صحيح.
// يجب تحويل حمولة الأمر إلى []bytes.
type CommandEventMarshaler interface {
	// Marshal يحول الأمر أو الحدث إلى رسالة Watermill.
	Marshal(v interface{}) (*message.Message, error)

	// Unmarshal يفك تشفير رسالة Watermill إلى الأمر أو الحدث v.
	Unmarshal(msg *message.Message, v interface{}) (err error)

	// Name يعيد اسم الأمر أو الحدث.
	// يمكن استخدام الاسم لتحديد ما إذا كان الأمر أو الحدث الذي تم استلامه هو الذي نريد معالجته.
	Name(v interface{}) string

	// NameFromMessage يعيد اسم الأمر أو الحدث من رسالة Watermill (تم إنشاؤها بواسطة Marshal).
	//
	// عندما يكون لدينا أوامر أو أحداث تم تحويلها إلى رسائل Watermill، يجب علينا استخدام NameFromMessage بدلاً من Name لتجنب الفك تشفير غير الضروري.
	NameFromMessage(msg *message.Message) string
}
// ...

الاستخدام

مثال المجال

استخدام مجال بسيط مسئول عن معالجة حجوزات الغرف في فندق.

سوف نستخدم رموز Event Storming لعرض نموذج هذا المجال.

أسطورة الرموز:

  • تذاكر لاصقة زرقاء تمثل الأوامر
  • تذاكر لاصقة برتقالية تمثل الأحداث
  • تذاكر لاصقة خضراء تمثل النماذج القراءة المُنشئة بشكل غير متزامن من الأحداث
  • تذاكر لاصقة بنفسجية تمثل السياسات المُفعلة بواسطة الأحداث وتوجيه الأوامر
  • تذاكر لاصقة وردية تمثل نقاط ساخنة؛ نقوم بتحديد المناطق التي تواجه مشاكل فيها بشكل متكرر

CQRS Event Storming

المجال بسيط:

  • يمكن للعملاء حجز الغرف.
  • كلما تم حجز غرفة، فإننا نطلب زجاجة من البيرة للعميل (لأننا نحب ضيوفنا).
    • نحن نعلم أنه في بعض الأحيان تنفد البيرة.
  • نقوم بتوليد تقرير مالي استنادًا إلى الحجز.

إرسال الأوامر

أولاً، نحتاج إلى محاكاة إجراءات العملاء.

الشفرة الكاملة: github.com/ThreeDotsLabs/watermill/_examples/basic/5-cqrs-protobuf

// ...
		bookRoomCmd := &BookRoom{
			RoomId:    fmt.Sprintf("%d", i),
			GuestName: "John",
			StartDate: startDate,
			EndDate:   endDate,
		}
		if err := commandBus.Send(context.Background(), bookRoomCmd); err != nil {
			panic(err)
		}
// ...

منفذ الأوامر

سيدير BookRoomHandler أوامرنا.

كود المصدر الكامل: github.com/ThreeDotsLabs/watermill/_examples/basic/5-cqrs-protobuf/main.go

// ...
// BookRoomHandler هو منفذ أمر يقوم بمعالجة أمر BookRoom ويصدر حدث RoomBooked.
//
// في CQRS ، يجب أن يتم معالجة الأمر بواسطة منفذ.
// عند إضافة منفذ آخر لمعالجة هذا الأمر ، سيتم إرجاع خطأ.
type BookRoomHandler struct {
	eventBus *cqrs.EventBus
}

func (b BookRoomHandler) HandlerName() string {
	return "BookRoomHandler"
}

// NewCommand يرجع نوع الأمر الذي يجب على هذا المنفذ معالجته. يجب أن يكون مؤشرًا.
func (b BookRoomHandler) NewCommand() interface{} {
	return &BookRoom{}
}

func (b BookRoomHandler) Handle(ctx context.Context, c interface{}) error {
	// c هو دائمًا نوع الذي يُرجع بواسطة `NewCommand`، لذا فإن التأكيد على النوع دائمًا آمن
	cmd := c.(*BookRoom)

	// سعر عشوائي ، الذي يمكن أن يتم حسابه بطريقة أكثر معقولية في الإنتاج الفعلي
	price := (rand.Int63n(40) + 1) * 10

	log.Printf(
		"تم حجز %s، من %s إلى %s",
		cmd.RoomId,
		cmd.GuestName,
		time.Unix(cmd.StartDate.Seconds, int64(cmd.StartDate.Nanos)),
		time.Unix(cmd.EndDate.Seconds, int64(cmd.EndDate.Nanos)),
	)

	// سيتم معالجة RoomBooked بواسطة منفذ حدث OrderBeerOnRoomBooked ،
	// وفي المستقبل ، يمكن معالجة RoomBooked بواسطة عدة منافذ حدث
	if err := b.eventBus.Publish(ctx, &RoomBooked{
		ReservationId: watermill.NewUUID(),
		RoomId:        cmd.RoomId,
		GuestName:     cmd.GuestName,
		Price:         price,
		StartDate:     cmd.StartDate,
		EndDate:       cmd.EndDate,
	}); err != nil {
		return err
	}

	return nil
}

// OrderBeerOnRoomBooked هو منفذ حدث يعالج الحدث RoomBooked ويصدر أمر OrderBeer.
// ...

منافذ الأحداث

كما ذُكر سابقًا ، نريد طلب زجاجة من البيرة في كل مرة يتم فيها حجز غرفة (مسمى بـ "عند حجز الغرفة") باستخدام أمر OrderBeer.

كود المصدر الكامل: github.com/ThreeDotsLabs/watermill/_examples/basic/5-cqrs-protobuf/main.go

// ...
// OrderBeerOnRoomBooked هو منفذ حدث يعالج حدث RoomBooked ويصدر أمر OrderBeer.
type OrderBeerOnRoomBooked struct {
	commandBus *cqrs.CommandBus
}

func (o OrderBeerOnRoomBooked) HandlerName() string {
	// يتم تمرير هذا الاسم إلى EventsSubscriberConstructor لتوليد اسم طابور الانتظار
	return "OrderBeerOnRoomBooked"
}

func (OrderBeerOnRoomBooked) NewEvent() interface{} {
	return &RoomBooked{}
}

func (o OrderBeerOnRoomBooked) Handle(ctx context.Context, e interface{}) error {
	event := e.(*RoomBooked)

	orderBeerCmd := &OrderBeer{
		RoomId: event.RoomId,
		Count:  rand.Int63n(10) + 1,
	}

	return o.commandBus.Send(ctx, orderBeerCmd)
}

// منفذ OrderBeerHandler مشابه للغاية لـ BookRoomHandler. الفرق الوحيد هو أنه في بعض الأحيان يُرجع خطأ عند عدم وجود ما يكفي من البيرة ، مما يتسبب في إعادة إصدار الأمر. يمكنك العثور على التنفيذ الكامل في [شفرة المصدر للمثال](https://github.com/ThreeDotsLabs/watermill/tree/master/_examples/basic/5-cqrs-protobuf/?utm_source=cqrs_doc).

مجموعات معالجي الأحداث

بشكل افتراضي، يحتوي كل معالج حدث على نسخة منفصلة للمشترك. يعمل هذا النهج بشكل جيد إذا تم إرسال نوع حدث واحد فقط إلى الموضوع.

في حالة وجود عدة أنواع من الأحداث على الموضوع، هناك خياران:

  1. يمكنك ضبط EventConfig.AckOnUnknownEvent على true - وسيتم تأكيد استلام جميع الأحداث التي لم يتم معالجتها بواسطة المعالجين.
  2. يمكنك استخدام آلية مجموعات معالجي الأحداث.

لاستخدام مجموعات الأحداث، يجب ضبط خيارات GenerateHandlerGroupSubscribeTopic و GroupSubscriberConstructor في EventConfig.

ثم يمكنك استخدام AddHandlersGroup على EventProcessor. الشيفرة الكاملة

// ...
	err = eventProcessor.AddHandlersGroup(
		"events",
		OrderBeerOnRoomBooked{commandBus},

		NewBookingsFinancialReport(),

		cqrs.NewGroupEventHandler(func(ctx context.Context, event *BeerOrdered) error {
			logger.Info("تم طلب البيرة", watermill.LogFields{
				"room_id": event.RoomId,
			})
			return nil
		}),
	)
	if err != nil {
// ...

كل من GenerateHandlerGroupSubscribeTopic و GroupSubscriberConstructor يتلقيان معلومات حول اسم المجموعة كمعاملات وظيفة.

المعالجات العامة

ابتداءً من Watermill v1.3، يمكن استخدام المعالجات العامة لمعالجة الأوامر والأحداث. هذا مفيد جدًا عندما يكون لديك عدد كبير من الأوامر/الأحداث ولا تريد إنشاء معالج لكل واحد.

الشيفرة الكاملة

// ...
		cqrs.NewGroupEventHandler(func(ctx context.Context, event *BeerOrdered) error {
			logger.Info("تم طلب البيرة", watermill.LogFields{
				"room_id": event.RoomId,
			})
			return nil
		}),
// ...

خلف الكواليس، يقوم بإنشاء تنفيذ لمعالج الأحداث أو معالج الأوامر. وهو مناسب لجميع أنواع المعالجين.

الشيفرة الكاملة

// ...
// NewCommandHandler creates a new CommandHandler implementation based on the provided function and the inferred command type from the function parameters.
func NewCommandHandler[Command any](
// ...

الشيفرة الكاملة

// ...
// NewEventHandler creates a new EventHandler implementation based on the provided function and the inferred event type from the function parameters.
func NewEventHandler[T any](
// ...

الشيفرة الكاملة

// ...
// NewGroupEventHandler creates a new GroupEventHandler implementation based on the provided function and the inferred event type from the function parameters.
func NewGroupEventHandler[T any](handleFunc func(ctx context.Context, event *T) error) GroupEventHandler {
// ...

بناء نموذج قراءة باستخدام مناولات الأحداث

الشيفرة النصية الكاملة: github.com/ThreeDotsLabs/watermill/_examples/basic/5-cqrs-protobuf/main.go

// ...
// BookingsFinancialReport هو نموذج قراءة يحسب كمية الأموال التي يمكننا كسبها من الحجوزات.
// إنه يستمع لأحداث RoomBooked عند حدوثها.
//
// تقوم هذه الطريقة المبسطة ببساطة بالكتابة إلى الذاكرة. في بيئة الإنتاج، قد تستخدم نوعًا ما من التخزين الدائم.
type BookingsFinancialReport struct {
	handledBookings map[string]struct{}
	totalCharge     int64
	lock            sync.Mutex
}

func NewBookingsFinancialReport() *BookingsFinancialReport {
	return &BookingsFinancialReport{handledBookings: map[string]struct{}{}}
}

func (b BookingsFinancialReport) HandlerName() string {
	// يتم تمرير هذا الاسم إلى EventsSubscriberConstructor ويُستخدم لتوليد اسم الطابور
	return "BookingsFinancialReport"
}

func (BookingsFinancialReport) NewEvent() interface{} {
	return &RoomBooked{}
}

func (b *BookingsFinancialReport) Handle(ctx context.Context, e interface{}) error {
	// قد يتم استدعاء Handle بتنافسية، لذا يجب أن يكون هناك توافق في الخيوط.
	b.lock.Lock()
	defer b.lock.Unlock()

	event := e.(*RoomBooked)

	// عند استخدام Pub/Sub الذي لا يوفر نصوصًا يسلم مرة واحدة بالضبط، نحتاج إلى استبعاد الرسائل.
	// GoChannel Pub/Sub يوفر التسليم مرة واحدة بالضبط،
	// ولكن دعونا نعد هذا المثال لتنفيذات Pub/Sub الأخرى.
	if _, ok := b.handledBookings[event.ReservationId]; ok {
		return nil
	}
	b.handledBookings[event.ReservationId] = struct{}{}

	b.totalCharge += event.Price

	fmt.Printf(">>> تم حجز غرفة بقيمة %d$\n", b.totalCharge)
	return nil
}

var amqpAddress = "amqp://guest:guest@rabbitmq:5672/"

func main() {
// ...

ربط كل شيء

لدينا بالفعل جميع المكونات اللازمة لبناء تطبيق CQRS.

سنستخدم AMQP (RabbitMQ) كوسيط رسائل: AMQP.

تحت الغطاء ، يستخدم CQRS موجِّه الرسائل في Watermill. إذا كنت غير ملم بذلك وترغب في فهم كيف يعمل ، فيجب عليك التحقق من دليل البدء. كما سيُظهر لك كيفية استخدام بعض أنماط الرسائل القياسية مثل القياسات وقوائم الرسائل السامة وتحديد معدل الحد الأقصى والارتباط وغيرها من الأدوات المستخدمة من قِبل كل تطبيق مدفوع بالرسائل. تم بالفعل بناء هذه الأدوات في Watermill.

لنعود إلى CQRS. كما تعلم بالفعل ، يتكون CQRS من عدة مكونات مثل حافلات الأوامر أو الأحداث ، والمعالجين ، وهلم جرا.

الشيفرة المصدرية الكاملة: github.com/ThreeDotsLabs/watermill/_examples/basic/5-cqrs-protobuf/main.go

// ...
func main() {
	logger := watermill.NewStdLogger(false, false)
	cqrsMarshaler := cqrs.ProtobufMarshaler{}

	// You can use any Pub/Sub implementation from here: https://watermill.io/pubsubs/
	// Detailed RabbitMQ implementation: https://watermill.io/pubsubs/amqp/
	// Commands will be send to queue, because they need to be consumed once.
	commandsAMQPConfig := amqp.NewDurableQueueConfig(amqpAddress)
	commandsPublisher, err := amqp.NewPublisher(commandsAMQPConfig, logger)
	if err != nil {
		panic(err)
	}
	commandsSubscriber, err := amqp.NewSubscriber(commandsAMQPConfig, logger)
	if err != nil {
		panic(err)
	}

	// Events will be published to PubSub configured Rabbit, because they may be consumed by multiple consumers.
	// (in that case BookingsFinancialReport and OrderBeerOnRoomBooked).
	eventsPublisher, err := amqp.NewPublisher(amqp.NewDurablePubSubConfig(amqpAddress, nil), logger)
	if err != nil {
		panic(err)
	}

	// CQRS is built on messages router. Detailed documentation: https://watermill.io/docs/messages-router/
	router, err := message.NewRouter(message.RouterConfig{}, logger)
	if err != nil {
		panic(err)
	}

	// Simple middleware which will recover panics from event or command handlers.
	// More about router middlewares you can find in the documentation:
	// https://watermill.io/docs/messages-router/#middleware
	//
	// List of available middlewares you can find in message/router/middleware.
	router.AddMiddleware(middleware.Recoverer)

	commandBus, err := cqrs.NewCommandBusWithConfig(commandsPublisher, cqrs.CommandBusConfig{
		GeneratePublishTopic: func(params cqrs.CommandBusGeneratePublishTopicParams) (string, error) {
			// we are using queue RabbitMQ config, so we need to have topic per command type
			return params.CommandName, nil
		},
		OnSend: func(params cqrs.CommandBusOnSendParams) error {
			logger.Info("Sending command", watermill.LogFields{
				"command_name": params.CommandName,
			})

			params.Message.Metadata.Set("sent_at", time.Now().String())

			return nil
		},
		Marshaler: cqrsMarshaler,
		Logger:    logger,
	})
	if err != nil {
		panic(err)
	}

	commandProcessor, err := cqrs.NewCommandProcessorWithConfig(
		router,
		cqrs.CommandProcessorConfig{
			GenerateSubscribeTopic: func(params cqrs.CommandProcessorGenerateSubscribeTopicParams) (string, error) {
				// we are using queue RabbitMQ config, so we need to have topic per command type
				return params.CommandName, nil
			},
			SubscriberConstructor: func(params cqrs.CommandProcessorSubscriberConstructorParams) (message.Subscriber, error) {
				// we can reuse subscriber, because all commands have separated topics
				return commandsSubscriber, nil
			},
			OnHandle: func(params cqrs.CommandProcessorOnHandleParams) error {
				start := time.Now()

				err := params.Handler.Handle(params.Message.Context(), params.Command)

				logger.Info("Command handled", watermill.LogFields{
					"command_name": params.CommandName,
					"duration":     time.Since(start),
					"err":          err,
				})

				return err
			},
			Marshaler: cqrsMarshaler,
			Logger:    logger,
		},
	)
	if err != nil {
		panic(err)
	}

	eventBus, err := cqrs.NewEventBusWithConfig(eventsPublisher, cqrs.EventBusConfig{
		GeneratePublishTopic: func(params cqrs.GenerateEventPublishTopicParams) (string, error) {
			// because we are using PubSub RabbitMQ config, we can use one topic for all events
			return "events", nil

			// we can also use topic per event type
			// return params.EventName, nil
		},

		OnPublish: func(params cqrs.OnEventSendParams) error {
			logger.Info("Publishing event", watermill.LogFields{
				"event_name": params.EventName,
			})

			params.Message.Metadata.Set("published_at", time.Now().String())

			return nil
		},

		Marshaler: cqrsMarshaler,
		Logger:    logger,
	})
	if err != nil {
		panic(err)
	}

	eventProcessor, err := cqrs.NewEventGroupProcessorWithConfig(
		router,
		cqrs.EventGroupProcessorConfig{
			GenerateSubscribeTopic: func(params cqrs.EventGroupProcessorGenerateSubscribeTopicParams) (string, error) {
				return "events", nil
			},
			SubscriberConstructor: func(params cqrs.EventGroupProcessorSubscriberConstructorParams) (message.Subscriber, error) {
				config := amqp.NewDurablePubSubConfig(
					amqpAddress,
					amqp.GenerateQueueNameTopicNameWithSuffix(params.EventGroupName),
				)

				return amqp.NewSubscriber(config, logger)
			},

			OnHandle: func(params cqrs.EventGroupProcessorOnHandleParams) error {
				start := time.Now()

				err := params.Handler.Handle(params.Message.Context(), params.Event)

				logger.Info("Event handled", watermill.LogFields{
					"event_name": params.EventName,
					"duration":   time.Since(start),
					"err":        err,
				})

				return err
			},

			Marshaler: cqrsMarshaler,
			Logger:    logger,
		},
	)
	if err != nil {
		panic(err)
	}

	err = commandProcessor.AddHandlers(
		BookRoomHandler{eventBus},
		OrderBeerHandler{eventBus},
	)
	if err != nil {
		panic(err)
	}

	err = eventProcessor.AddHandlersGroup(
		"events",
		OrderBeerOnRoomBooked{commandBus},

		NewBookingsFinancialReport(),

		cqrs.NewGroupEventHandler(func(ctx context.Context, event *BeerOrdered) error {
			logger.Info("Beer ordered", watermill.LogFields{
				"room_id": event.RoomId,
			})
			return nil
		}),
	)
	if err != nil {
		panic(err)
	}

	// publish BookRoom commands every second to simulate incoming traffic
	go publishCommands(commandBus)

	// processors are based on router, so they will work when router will start
	if err := router.Run(context.Background()); err != nil {
		panic(err)
	}
}
// ...

هذا كل شيء. لدينا تطبيق CQRS قابل للتشغيل.