กลไก CQRS

CQRS ย่อมาจาก "Command Query Responsibility Segregation" ซึ่งแยกความรับผิดชอบของคำสั่ง (คำขอเขียน) และคิวรี่ (คำขออ่าน) โดยการจัดการคำขอเขียนและคำขออ่านโดยอ็อบเจกต์ที่แตกต่างกัน

นี่คือ CQRS และเราสามารถแยกการเก็บรักษาข้อมูลโดยมีการเก็บรักษาข้อมูลที่แตกต่างกันสำหรับการอ่านและเขียน หลังจากที่ทำการแยกแล้ว อาจมีการเก็บรักษาข้อมูลที่ใช้สำหรับการอ่านที่ถูกปรับปรุงเพื่อจัดการคิวรี่ประเภทต่าง ๆ หรือสามารถครอบคลุมขอบเขตที่แตกต่างกัน แม้ว่าการเก็บรักษาข้อมูลที่แยกกันสำหรับการอ่าน/เขียนจะเป็นหัวข้อที่นิพนธ์เกี่ยวกับ CQRS แต่มันไม่ใช่ CQRS เอง CQRS เป็นการแยกคำสั่งและคิวรี่เท่านั้น

แผนผังโครงสร้าง CQRS

คอมโพเนนต์ cqrs มีการให้หลายความสามรถที่สำคัญ ๆ สร้างขึ้นบน Pub/Sub และ Router เพื่อช่วยในการปรับใช้รูปแบบ CQRS

คุณไม่จำเป็นต้องทำการปรับใช้ CQRS ทั้งหมด โดยทั่วไปแล้ว เฉพาะส่วนเหตุการณ์ของคอมโพเนนต์จะถูกใช้เพื่อสร้างแอปพลิเคชันที่เน้นเหตุการณ์

บล็อกการสร้าง

เหตุการณ์ (Events)

เหตุการณ์แทนสิ่งที่เป็นไปแล้ว และเหตุการณ์มีความไม่เปลี่ยนแปลง

รถประทับ (Event Bus)

รหัสภายใต้: github.com/ThreeDotsLabs/watermill/components/cqrs/event_bus.go

// ...
// EventBus ขนส่งเหตุการณ์ไปยังผู้จัดการเหตุการณ์
type EventBus struct {
// ...

รหัสภายใต้: github.com/ThreeDotsLabs/watermill/components/cqrs/event_bus.go

// ...
// EventBusConfig มีการกำหนดค่าสำหรับการสร้างชื่อหัวข้อสำหรับการเผยแพร่เหตุการณ์
type EventBusConfig struct {
    // GeneratePublishTopic ถูกใช้ในการสร้างชื่อหัวข้อสำหรับการเผยแพร่เหตุการณ์
    GeneratePublishTopic GenerateEventPublishTopicFn

    // OnPublish จะถูกเรียกก่อนการส่งเหตุการณ์ มันสามารถแก้แขนง *message.Message
    //
    // อินสแตนซ์นี้ไม่จำเป็น
    OnPublish OnEventSendFn

    // Marshaler ถูกใช้สำหรับการเข้ารหัสและถอดรหัสเหตุการณ์
    // นี่จะเป็นสิ่งที่เข็มงัด
    Marshaler CommandEventMarshaler

    // อินสแตนซ์ของล็อกเกอร์สำหรับการบันทึก หากไม่ได้กำหนด, จะใช้ watermill.NopLogger
    Logger watermill.LoggerAdapter
}

func (c *EventBusConfig) setDefaults() {
    ถ้า c.Logger มีค่าเป็น nil {
        c.Logger = watermill.NopLogger{}
    }
}
// ...

ตัวประมวลผลเหตุการณ์

รหัสเต็ม: github.com/ThreeDotsLabs/watermill/components/cqrs/event_processor.go

// ...
// EventProcessor ใช้ในการกำหนด EventHandler ที่ควรจะจัดการกับเหตุการณ์ที่ได้รับจาก autobus เหตุการณ์
type EventProcessor struct {
// ...

รหัสเต็ม: github.com/ThreeDotsLabs/watermill/components/cqrs/event_processor.go

// ...
type EventProcessorConfig struct {
	// GenerateSubscribeTopic ใช้ในการสร้างหัวข้อสำหรับการสับซึค์เหตุการณ์
	// หากตัวประมวลผลเหตุการณ์ใช้กลุ่มผู้จัดการ จะใช้ GenerateSubscribeTopic
	GenerateSubscribeTopic EventProcessorGenerateSubscribeTopicFn

	// SubscriberConstructor ใช้ในการสร้างผู้สับซึกสำหรับ EventHandler
	//
	// ฟังก์ชันนี้จะถูกเรียกครั้งเดียวสำหรับแต่ละตัวอย่างของ EventHandler
	// หากคุณต้องการ reuse ผู้สับซึกสำหรับผู้จัดการหลายๆ คน ให้ใช้ GroupEventProcessor
	SubscriberConstructor EventProcessorSubscriberConstructorFn

	// OnHandle เรียกก่อนจัดการกับเหตุการณ์
	// OnHandle ทำงานแบบ middleware: คุณสามารถซึ้งตรรก ก่อน และหลังจากจัดการกับเหตุการณ์
	//
	// ดังนั้น คุณต้องเรียก params.Handler.Handle() อย่างชัดเจนเพื่อจัดการกับเหตุการณ์
	//
	//  func(params EventProcessorOnHandleParams) (err error) {
	//      // ตรรกก่อนจัดการ
	//      //  (...)

	//      err := params.Handler.Handle(params.Message.Context(), params.Event)
	//
	//      // ตรรกหลังจัดการ
	//      //  (...)

	//      return err
	//  }
	//
	// ตัวเลือกนี้ไม่ใช่บังคับ
	OnHandle EventProcessorOnHandleFn

	// AckOnUnknownEvent ใช้ในการกำหนดว่าข้อความควรรับทราบหรือไม่เมื่อเหตุการณ์ไม่มีผู้จัดการที่กำหนดไว้
	AckOnUnknownEvent bool

	// Marshaler ใช้ในการ marshal และ unmarshal เหตุการณ์
	// ต้องระบุ
	Marshaler CommandEventMarshaler

	// Logger instance เพื่อการบันทึก
	// หากไม่ได้รับการระบุ จะถูกใช้ watermill.NopLogger
	Logger watermill.LoggerAdapter

	// disableRouterAutoAddHandlers เพื่อการรักษาความเข้ากันได้กับเวอร์ชันย้อนหลัง
	// ค่านี้จะถูกตั้งค่าเมื่อสร้าง EventProcessor โดยใช้ NewEventProcessor
	// ถูกปลดการใช้: ย้ายไปที่ NewEventProcessorWithConfig เป็นการเด็ไรกับเวอร์ชัน
	disableRouterAutoAddHandlers bool
}

func (c *EventProcessorConfig) setDefaults() {
	if c.Logger == nil {
		c.Logger = watermill.NopLogger{}
	}
}
// ...

ตัวประมวลผลกลุ่มเหตุการณ์

รหัสภายในเต็ม: github.com/ThreeDotsLabs/watermill/components/cqrs/event_processor_group.go

// ...
// EventGroupProcessor กำหนดว่าตัวประมวลผลเหตุการณ์ควรจะดำเนินการเหตุการณ์ที่ได้รับมาจากคิวเหตุการณ์อย่างไร
// เปรียบเทียบกับ EventProcessor ตัวประมวลผลกลุ่มเหตุการณ์ช่วยให้ตัวประมวลผลหลายตัวสามารถแชร์ตัวสับสนได้
type EventGroupProcessor struct {
// ...

รหัสภายในเต็ม: github.com/ThreeDotsLabs/watermill/components/cqrs/event_processor_group.go

// ...
type EventGroupProcessorConfig struct {
	// GenerateSubscribeTopic ใช้สำหรับสร้างหัวข้อสำหรับการสับสนกับตัวประมวลผลกลุ่มเหตุการณ์
	// ตัวเลือกนี้จำเป็นสำหรับ EventProcessor เมื่อใช้กลุ่มตัวประมวลผล
	GenerateSubscribeTopic EventGroupProcessorGenerateSubscribeTopicFn

	// SubscriberConstructor ใช้สำหรับสร้างผู้สับสนสำหรับ GroupEventHandler
	// ฟังก์ชันนี้จะถูกเรียกครั้งละครั้งต่อกลุ่มเหตุการณ์ - อนุญาตให้มีการสับสนต่อกลุ่มละหนึ่งครั้ง
	// มันเป็นการใช้สร้างแต่ละกลุ่มเมื่อต้องการจัดการเหตุการณ์จากสตรีมตามลำดับ
	SubscriberConstructor EventGroupProcessorSubscriberConstructorFn

	// OnHandle จะถูกเรียกก่อนการจัดการเหตุการณ์
	// OnHandle คล้ายกับ middleware: คุณสามารถฝังตัวติดต่อก่อนและหลังการจัดการเหตุการณ์ได้
	//
	// ดังนั้น คุณจำเป็นต้องเรียก params.Handler.Handle() เพื่อจัดการเหตุการณ์
	//
	// func(params EventGroupProcessorOnHandleParams) (err error) {
	//     // ตรรกะก่อนจัดการ
	//     //  (...)
	//
	//     err := params.Handler.Handle(params.Message.Context(), params.Event)
	//
	//     // ตรรกะหลังจัดการ
	//     //  (...)
	//
	//     return err
	// }
	//
	// ตัวเลือกนี้ไม่จำเป็น
	OnHandle EventGroupProcessorOnHandleFn

	// AckOnUnknownEvent ใช้สำหรับกำหนดว่าควรยอมรับหรือไม่ถ้าเหตุการณ์ไม่มีตัวจัดการที่กำหนด
	AckOnUnknownEvent bool

	// Marshaler ใช้สำหรับการเข้ารหัสและถอดรหัสเหตุการณ์
	// นี้จำเป็น
	Marshaler CommandEventMarshaler

	// Logger ภาคผู้ใช้ที่ใช้สำหรับการเข้าสับสน
	// หากไม่ได้รับการระวัง watermill.NopLogger จะถูกใช้
	Logger watermill.LoggerAdapter
}

func (c *EventGroupProcessorConfig) setDefaults() {
	if c.Logger == nil {
		c.Logger = watermill.NopLogger{}
	}
}
// ...

เรียนรู้เพิ่มเติมเกี่ยวกับตัวประมวลผลกลุ่มเหตุการณ์

ตัวจัดการเหตุการณ์

รหัสภายในเต็ม: github.com/ThreeDotsLabs/watermill/components/cqrs/event_handler.go

// ...
// EventHandler รับเหตุการณ์ที่กำหนดโดย NewEvent และจัดการด้วยวิธีการจัดการของตัวเอง
// หากใช้ DDD ตัวจัดการเหตุการณ์สามารถปรับแต่งและบันทึกความหนา
// มันยังสามารถเรียกตัวจัดการกระบวนการหรือตำแหน่ง หรือแค่สร้างโมเดลที่อ่าน
//
// ไม่เหมือนกับตัวจัดการคำสั่งที่แต่ละเหตุการณ์สามารถมีตัวจัดการเหตุการณ์หลายตัว
//
// ระหว่างการจัดการข้อความใช้ประการตัวนึงของ EventHandler
// เมื่อส่งผ่านเหตุการณ์หลายรายการพร้อมกัน วิธีการจัดการสามารถทำงานซ้อนสอมากครั้งพร้อมกัน
// ดังนั้น วิธีการจัดการค่อนข้างต้องปลอดภัยต่อเส้น
type EventHandler interface {
// ...

คำสั่ง

คำสั่งคือโครงสร้างข้อมูลง่ายที่แสดงคำขอให้ดำเนินการบางการทำงาน

คำสั่งบัส

เรห็ง ซอร์สโค้ด: github.com/ThreeDotsLabs/watermill/components/cqrs/command_bus.go

// ...
// CommandBus คือคอมโพเนนต์ที่ขนส่งคำสั่งไปยังผู้จัดการคำสั่ง
type CommandBus struct {
// ...

เรห็ง ซอร์สโค้ด: github.com/ThreeDotsLabs/watermill/components/cqrs/command_bus.go

// ...
type CommandBusConfig struct {
	// GeneratePublishTopic ใช้สำหรับสร้างหัวข้อสำหรับการเผยแพร่คำสั่ง
	GeneratePublishTopic CommandBusGeneratePublishTopicFn

	// OnSend ถูกเรียกก่อนที่จะเผยแพร่คำสั่ง
	// *message.Message สามารถถูกแก้ไขได้
	//
	// ตัวเลือกนี้ไม่บังคับ
	OnSend CommandBusOnSendFn

	// Marshaler ใช้สำหรับการตั้งค่าและยอดการเคลียร์คำสั่ง
	// เรียกเป็นผลิตภัณฑ์
	// จำเป็น
	Marshaler CommandEventMarshaler

	// Logger ที่ใช้สำหรับการบันทึก
	// หากไม่มีการให้, จะใช้ watermill.NopLogger แทน
	Logger watermill.LoggerAdapter
}

func (c *CommandBusConfig) setDefaults() {
	if c.Logger == nil {
		c.Logger = watermill.NopLogger{}
	}
}
// ...

โปรเซสเซอร์คำสั่ง

เรห็ง ซอร์สโค้ด: github.com/ThreeDotsLabs/watermill/components/cqrs/command_processor.go

// ...
// CommandProcessorSubscriberConstructorFn ใช้สำหรับสร้างผู้ติดตามสำหรับ CommandHandler
// มันช่วยให้คุณสร้างผู้ติดตามที่แตกต่างกันสำหรับทุก CommandHandler
type CommandProcessorSubscriberConstructorFn func(CommandProcessorSubscriberConstructorParams) (message.Subscriber, error)
// ...

เรห็ง ซอร์สโค้ด: github.com/ThreeDotsLabs/watermill/components/cqrs/command_processor.go

// ...
type CommandProcessorConfig struct {
	// GenerateSubscribeTopic ใช้สำหรับสร้างหัวข้อสำหรับการติดตามคำสั่ง
	GenerateSubscribeTopic CommandProcessorGenerateSubscribeTopicFn

	// SubscriberConstructor ใช้สำหรับสร้างผู้ติดตามสำหรับ CommandHandler
	SubscriberConstructor CommandProcessorSubscriberConstructorFn

	// OnHandle ถูกเรียกก่อนการจัดการคำสั่ง
	// OnHandle ทำงานเหมือน middleware: คุณสามารถฉีกเกินิกความ logic เพิ่มเติมก่อนและหลังจากการจัดการคำสั่ง
	//
	// เนื่องจากสิ่งนี้, คุณต้องเรียก params.Handler.Handle() โดยชัดเจนเพื่อจัดการคำสั่ง
	//  func(params CommandProcessorOnHandleParams) (err error) {
	//      // ตรรกก่อนการจัดการ
	//      // (...)
	//
	//      err := params.Handler.Handle(params.Message.Context(), params.Command)
	//
	//      // ตรรกหลังจากการจัดการ
	//      // (...)
	//
	//      return err
	//  }
	//
	// ตัวเลือกนี้ไม่บังคับ
	OnHandle CommandProcessorOnHandleFn

	// Marshaler ใช้สำหรับการตั้งค่าและยอดการเคลียร์คำสั่ง
	// จำเป็น
	Marshaler CommandEventMarshaler

	// Logger สำหรับบันทึก
	// หากไม่มีการให้, จะใช้ watermill.NopLogger แทน
	Logger watermill.LoggerAdapter

	// หากเป็นจริง, CommandProcessor จะ ack ข้อความ แม้ว่า CommandHandler จะส่งคืนข้อผิดพลาด
	// หาก RequestReplyBackend ไม่เป็นค่าว่างและการส่งตอบกลับผิดพลาด, ข้อความยังคงจะถูก nacked
	//
	// คำเตือน: ไม่แนะนำที่จะใช้ตัวเลือกนี้เมื่อใช้คำสั่ง requestreply (requestreply.NewCommandHandler หรือ requestreply.NewCommandHandlerWithResult),
	// เนื่องจากมันอาจ ack คำสั่งเมื่อการส่งตอบกลับผิดพลาด
	//
	// เมื่อใช้ requestreply, คุณควรใช้ requestreply.PubSubBackendConfig.AckCommandErrors
	AckCommandHandlingErrors bool

	// ปิดความสามารถในการเพิ่มตัวจัดการเองโดยอัตโนมัติ
	// ใช้สำหรับเลวทางถอยหลังเท่านั้น
	// ตั้งค่าเมื่อสร้าง CommandProcessor ด้วย NewCommandProcessor
	// เลิกใช้: โปรดย้ายไปที่ NewCommandProcessorWithConfig แทน
	disableRouterAutoAddHandlers bool
}

func (c *CommandProcessorConfig) setDefaults() {
	if c.Logger == nil {
		c.Logger = watermill.NopLogger{}
	}
}
// ...

ตัวประมวลผลคำสั่ง

โค้ดซอร์สแบบเต็ม: github.com/ThreeDotsLabs/watermill/components/cqrs/command_handler.go

// ...
// CommandHandler รับคำสั่งที่กำหนดโดย NewCommand และจัดการกับมันโดยใช้เมธอด Handle
// หากใช้ DDD, CommandHandler อาจแก้ไขและบันทึกส่วนรวม
//
// ต่างจาก EventHandler, แต่ละ Command สามารถมีเพียง CommandHandler เดียวเท่านั้น
//
// ระหว่างการจัดการข้อมูล, ใช้ CommandHandler หนึ่งกรณี
// เมื่อมีการส่งมอบคำสั่งหลายรายการพร้อมกัน, เมธอด Handle อาจถูกทำงานพร้อมกันหลายครั้ง
// ดังนั้น, เมธอด Handle จำเป็นต้องประสิทธิภาพเป็นสาย!
type CommandHandler interface {
// ...

ตัวแปลงคำสั่งและเหตุการณ์

โค้ดซอร์สแบบเต็ม: github.com/ThreeDotsLabs/watermill/components/cqrs/marshaler.go

// ...
// CommandEventMarshaler แปลงคำสั่งและเหตุการณ์เป็นข้อความ Watermill และในทางกลับกัน
// พล็อตไพษของคำสั่งจำเป็นต้องถูกแปลงเป็น []bytes
type CommandEventMarshaler interface {
	// Marshal แปลงคำสั่งหรือเหตุการณ์เป็นข้อความ Watermill
	Marshal(v interface{}) (*message.Message, error)

	// Unmarshal ถอดรหัสข้อความ Watermill เป็นคำสั่งหรือเหตุการณ์ v
	Unmarshal(msg *message.Message, v interface{}) (err error)

	// Name คืนชื่อของคำสั่งหรือเหตุการณ์
	// ชื่อนี้สามารถใช้เพื่อกำหนดว่าคำสั่งหรือเหตุการณ์ที่ได้รับเป็นที่เราต้องการประมวลผลหรือไม่
	Name(v interface{}) string

	// NameFromMessage คืนชื่อของคำสั่งหรือเหตุการณ์จากข้อความ Watermill (ที่สร้างโดย Marshal)
	//
	// เมื่อเรามีคำสั่งหรือเหตุการณ์ที่ได้แปลงเป็นข้อความ Watermill, เราควรใช้ NameFromMessage แทน Name เพื่อหลีกเลี่ยงการถอดรหัสที่ไม่จำเป็น
	NameFromMessage(msg *message.Message) string
}
// ...

การใช้งาน

ตัวอย่างโดเมน

การใช้โดเมนที่เรียกว่าการจัดการการจองห้องพักในโรงแรม

เราจะใช้สัญลักษณ์ Event Storming เพื่อแสดงโมเดลของโดเมนนี้

สัญลักษณ์ตัวช่วย:

  • สติกกี้โน๊ต ส้ม คือคำสั่ง
  • สติกกี้โน๊ต ส้มสีเขียว คือเหตุการณ์
  • สติกกี้โน๊ต ส้มสีฟ้า คือแบบจำลองการอ่านที่สร้างออกมาอย่างแบบไม่เกี่ยวข้องกับเหตุการณ์
  • สติกกี้โน๊ต ส้มสีม่วง คือนโยบายที่ถูกเรียกด้วยเหตุการณ์และสร้างคำสั่ง
  • สติกกี้โน๊ต ส้มสีชมพู คือจุดร้อน; เราเคารพพื้นที่ที่เจอปัญหาบ่อยครั้ง

CQRS Event Storming

โดเมนง่าย:

  • ลูกค้าสามารถ จองห้องพัก
  • ทุกครั้งที่มีการจองห้องพัก, เราจะสั่งหมดปากชายสำหรับลูกค้า (เพราะเรารักลูกค้าของเรา)
    • เรารู้ว่าบางครั้ง เบียร์จะหมด
  • เราสร้าง รายงานการเงิน โดยอ้างอิงการจอง

การส่งคำสั่ง

ก่อนอื่น, เราต้องจำลองการกระทำของลูกค้า

โค้ดซอร์สแบบเต็ม: github.com/ThreeDotsLabs/watermill/_examples/basic/5-cqrs-protobuf

// ...
		bookRoomCmd := &BookRoom{
			RoomId:    fmt.Sprintf("%d", i),
			GuestName: "John",
			StartDate: startDate,
			EndDate:   endDate,
		}
		if err := commandBus.Send(context.Background(), bookRoomCmd); err != nil {
			panic(err)
		}
// ...

ตัวจัดการคำสั่ง

BookRoomHandler จะเป็นผู้จัดการคำสั่งของเรา

โค้ดภายใน: github.com/ThreeDotsLabs/watermill/_examples/basic/5-cqrs-protobuf/main.go

// ...
// BookRoomHandler เป็นตัวจัดการคำสั่งที่ประมวลผลคำสั่ง BookRoom และส่งออกเหตุการณ์ RoomBooked
//
// ใน CQRS คำสั่งจะต้องถูกประมวลผลโดยตัวจัดการ
// เมื่อเพิ่มตัวจัดการอีกตัวหนึ่งที่จะจัดการคำสั่งนี้ จะมีข้อผิดพลาดที่ถูกส่งกลับ
type BookRoomHandler struct {
	eventBus *cqrs.EventBus
}

func (b BookRoomHandler) HandlerName() string {
	return "BookRoomHandler"
}

// NewCommand จะคืนชนิดของคำสั่งที่ตัวจัดการควรประมวลผล มันต้องเป็นตัวแสดง
func (b BookRoomHandler) NewCommand() interface{} {
	return &BookRoom{}
}

func (b BookRoomHandler) Handle(ctx context.Context, c interface{}) error {
	// c เป็นชนิดที่ถูกคืนโดย `NewCommand` ดังนั้นการยืนยันชนิดจะคุ้มครองเสมอ
	cmd := c.(*BookRoom)

	// คำนวณราคาสุ่มซึ่งอาจจะถูกคำนวณในทางที่มีเหตุผลมากขึ้นในการผลิตจริง
	price := (rand.Int63n(40) + 1) * 10

	log.Printf(
		"จอง %s, จาก %s ถึง %s",
		cmd.RoomId,
		cmd.GuestName,
		time.Unix(cmd.StartDate.Seconds, int64(cmd.StartDate.Nanos)),
		time.Unix(cmd.EndDate.Seconds, int64(cmd.EndDate.Nanos)),
	)

	// RoomBooked จะถูกจัดการโดยตัวจัดการเหตุการณ์ OrderBeerOnRoomBooked
	// และในอนาคต RoomBooked สามารถถูกจัดการโดยตัวจัดการเหตุการณ์หลายตัว
	if err := b.eventBus.Publish(ctx, &RoomBooked{
		ReservationId: watermill.NewUUID(),
		RoomId:        cmd.RoomId,
		GuestName:     cmd.GuestName,
		Price:         price,
		StartDate:     cmd.StartDate,
		EndDate:       cmd.EndDate,
	}); err != nil {
		return err
	}

	return nil
}

// OrderBeerOnRoomBooked เป็นตัวจัดการเหตุการณ์ที่ประมวลผลเหตุการณ์ RoomBooked และส่งออกคำสั่ง OrderBeer
// ...

ตัวจัดการเหตุการณ์

ตามที่กล่าวไว้ก่อนหน้านี้ เราต้องการสั่งขวดเบียร์ทุกครั้งที่จองห้อง (ที่แสดงว่า "เมื่อห้องถูกจอง") เราทำได้โดยใช้คำสั่ง OrderBeer

โค้ดภายใน: github.com/ThreeDotsLabs/watermill/_examples/basic/5-cqrs-protobuf/main.go

// ...
// OrderBeerOnRoomBooked เป็นตัวจัดการเหตุการณ์ที่ประมวลผลเหตุการณ์ RoomBooked และส่งออกคำสั่ง OrderBeer
type OrderBeerOnRoomBooked struct {
	commandBus *cqrs.CommandBus
}

func (o OrderBeerOnRoomBooked) HandlerName() string {
	// ชื่อนี้ถูกส่งไปยัง EventsSubscriberConstructor เพื่อสร้างชื่อคิว
	return "OrderBeerOnRoomBooked"
}

func (OrderBeerOnRoomBooked) NewEvent() interface{} {
	return &RoomBooked{}
}

func (o OrderBeerOnRoomBooked) Handle(ctx context.Context, e interface{}) error {
	event := e.(*RoomBooked)

	orderBeerCmd := &OrderBeer{
		RoomId: event.RoomId,
		Count:  rand.Int63n(10) + 1,
	}

	return o.commandBus.Send(ctx, orderBeerCmd)
}

// OrderBeerHandler เป็นตัวจัดการคำสั่งที่ประมวลผลคำสั่ง OrderBeer และส่งออกเหตุการณ์ BeerOrdered
// ...

OrderBeerHandler เหมือนกับ BookRoomHandler มาก ความแตกต่างเดียวคือ มันอาจคืนข้อผิดพลาดเมื่อไม่มีเบียร์เพียงพอ ทำให้ต้องออกคำสั่งใหม่ คุณสามารถค้นพบการปฏิบัติที่สมบูรณ์ใน โค้ดตัวอย่าง

กลุ่มผู้จัดการเหตุการณ์ (Event Handler Groups)

ตามค่าเริ่มต้น มีตัวรับสมาชิกแยกต่างหากสำหรับแต่ละตัวจัดการเหตุการณ์ วิธีการนี้สามารถทำงานได้ดีถ้ามีเพียงเพียงประเภทเหตุการณ์เดียวที่ถูกส่งไปยังหัวข้อ (topic) นั่นเอง

ในกรณีที่มีหลายประเภทของเหตุการณ์บนหัวข้อ มีทางเลือกสองทางดังนี้:

  1. คุณสามารถตั้งค่า EventConfig.AckOnUnknownEvent เป็น true - นี้จะอนุมัติเหตุการณ์ทั้งหมดที่ไม่ได้รับการจัดการโดยตัวจัดการ.
  2. คุณสามารถใช้กลไกของกลุ่มผู้จัดการเหตุการณ์ (event handler group mechanism)

ในการใช้กลุ่มเหตุการณ์ คุณต้องตั้งค่า GenerateHandlerGroupSubscribeTopic และ GroupSubscriberConstructor ในตัวเลือกของ EventConfig

จากนั้นคุณสามารถใช้ AddHandlersGroup บน EventProcessor

รหัสภายใน: github.com/ThreeDotsLabs/watermill/_examples/basic/5-cqrs-protobuf/main.go

// ...
	err = eventProcessor.AddHandlersGroup(
		"events",
		OrderBeerOnRoomBooked{commandBus},

		NewBookingsFinancialReport(),

		cqrs.NewGroupEventHandler(func(ctx context.Context, event *BeerOrdered) error {
			logger.Info("การสั่งเบียร์", watermill.LogFields{
				"room_id": event.RoomId,
			})
			return nil
		}),
	)
	if err != nil {
// ...

ทั้ง GenerateHandlerGroupSubscribeTopic และ GroupSubscriberConstructor จะรับข้อมูลเกี่ยวกับชื่อกลุ่มเป็นพารามิเตอร์ของฟังก์ชัน

ตัวจัดการทั่วไป (Generic Handlers)

ตั้งแต่ Watermill เวอร์ชัน 1.3, ตัวจัดการทั่วไปสามารถใช้ในการจัดการคำสั่งและเหตุการณ์ได้ มีประโยชน์มากเมื่อคุณมีจำนวนคำสั่ง/เหตุการณ์มากและไม่ต้องการสร้างตัวจัดการสำหรับแต่ละคำสั่ง/เหตุการณ์

รหัสภายใน: github.com/ThreeDotsLabs/watermill/_examples/basic/5-cqrs-protobuf/main.go

// ...
		cqrs.NewGroupEventHandler(func(ctx context.Context, event *BeerOrdered) error {
			logger.Info("การสั่งเบียร์", watermill.LogFields{
				"room_id": event.RoomId,
			})
			return nil
		}),
// ...

เบื้องหลังแล้ว มันสร้างมาจากการสร้างตัวจัดการเหตุการณ์หรือตัวจัดการคำสั่ง มันเหมาะสำหรับทุกประเภทของตัวจัดการ

รหัสภายใน: github.com/ThreeDotsLabs/watermill/components/cqrs/command_handler.go

// ...
// NewCommandHandler สร้างการดำเนินการคำสั่งใหม่โดยใช้ฟังก์ชันที่ให้ไว้ และประเภทคำสั่งที่ได้รับมาจากพารามิเตอร์ของฟังก์ชัน
func NewCommandHandler[Command any](
// ...

รหัสภายใน: github.com/ThreeDotsLabs/watermill/components/cqrs/event_handler.go

// ...
// NewEventHandler สร้างการดำเนินการเหตุการณ์ใหม่โดยใช้ฟังก์ชันที่ให้ไว้ และประเภทเหตุการณ์ที่ได้รับมาจากพารามิเตอร์ของฟังก์ชัน
func NewEventHandler[T any](
// ...

รหัสภายใน: github.com/ThreeDotsLabs/watermill/components/cqrs/event_handler.go

// ...
// NewGroupEventHandler สร้างการดำเนินการกลุ่มเหตุการณ์ใหม่โดยใช้ฟังก์ชันที่ให้ไว้ และประเภทเหตุการณ์ที่ได้รับมาจากพารามิเตอร์ของฟังก์ชัน
func NewGroupEventHandler[T any](handleFunc func(ctx context.Context, event *T) error) GroupEventHandler {
// ...

สร้างโมเดลที่ใช้จากการใช้งานเหตุการณ์

โค้ดที่สมบูรณ์: github.com/ThreeDotsLabs/watermill/_examples/basic/5-cqrs-protobuf/main.go

// ...
// BookingsFinancialReport เป็นโมเดลที่ใช้สำหรับคำนวณว่าเราสามารถรับรายได้เท่าไหร่จากการจองห้องพัก
// มันจะฟังเหตุการณ์ RoomBooked เมื่อเกิดเหตุการณ์
//
// การประมวลผลนี้เพียงแค่เขียนลงในหน่วยความจำ ในสภาพแวดล้อมของการผลิต คุณอาจใช้รูปแบบการเก็บรักษาที่มีความต่อเนื่อง
type BookingsFinancialReport struct {
	handledBookings map[string]struct{}
	totalCharge     int64
	lock            sync.Mutex
}

func NewBookingsFinancialReport() *BookingsFinancialReport {
	return &BookingsFinancialReport{handledBookings: map[string]struct{}{}}
}

func (b BookingsFinancialReport) HandlerName() string {
	// ชื่อนี้ถูกส่งให้กับ EventsSubscriberConstructor และถูกใช้เพื่อสร้างชื่อคิว
	return "BookingsFinancialReport"
}

func (BookingsFinancialReport) NewEvent() interface{} {
	return &RoomBooked{}
}

func (b *BookingsFinancialReport) Handle(ctx context.Context, e interface{}) error {
	// การจัดการอาจถูกเรียกพร้อมกัน ดังนั้นต้องมีความปลอดภัยในเรื่องของเธรด
	b.lock.Lock()
	defer b.lock.Unlock()

	event := e.(*RoomBooked)

	// เมื่อใช้ Pub/Sub ซึ่งไม่ให้การส่งสารที่มีเพียงครั้งเดียวอย่างแน่นอน จำเป็นต้องทำการถอดความซ้ำกันของข้อความ
	// GoChannel Pub/Sub ให้การส่งสารที่มีเพียงครั้งเดียว,
	// แต่เพื่อเตรียมตัวสำหรับตัวอย่างนี้เล้วิต  อย่างอื่น ๆ จำเป็นต้องทำการคัดคำซ้ำด้วย
	if _, ok := b.handledBookings[event.ReservationId]; ok {
		return nil
	}
	b.handledBookings[event.ReservationId] = struct{}{}

	b.totalCharge += event.Price

	fmt.Printf(">>> จองห้องพักให้เป็น $%d\n", b.totalCharge)
	return nil
}

var amqpAddress = "amqp://guest:guest@rabbitmq:5672/"

func main() {
// ...

เชื่อมต่อทุกอย่าง

เรามีสิ่งที่จำเป็นทั้งหมดในการสร้างแอปพลิเคชัน CQRS แล้ว

เราจะใช้ AMQP (RabbitMQ) เป็น message broker ของเรา: AMQP.

ในพื้นที่ภายใน CQRS ใช้ Watermill's message router ซึ่งถ้าคุณยังไม่คุ้นเคยและต้องการเข้าใจว่ามันทำงานอย่างไร คุณควรตรวจสอบไกด์เริ่มต้น มันยังจะแสดงให้คุณเห็นว่าจะใช้รูปแบบการส่งข้อความมาตรฐานบางประการ เช่น การเก็บวัดผล คิวข้อความพิษ การจำกัดอัตรา, ความสัมพันธ์ และเครื่องมืออื่นที่ใช้โดยแอปพลิเคชันแบบส่งข้อความทุกๆ ประการ ๆ นี้ถูกสร้างใน Watermill แล้ว

มากลับสู่ CQRS กันบ้าง เช่นที่คุณทราบอยู่แล้ว CQRS ประกอบด้วยส่วนประกอบหลาย ๆ อย่าง เช่น command หรือ event buses, processors, และอื่น ๆ

รหัสภายใน: github.com/ThreeDotsLabs/watermill/_examples/basic/5-cqrs-protobuf/main.go

// ...
func main() {
	logger := watermill.NewStdLogger(false, false)
	cqrsMarshaler := cqrs.ProtobufMarshaler{}

	// You can use any Pub/Sub implementation from here: https://watermill.io/pubsubs/
	// Detailed RabbitMQ implementation: https://watermill.io/pubsubs/amqp/
	// Commands will be send to queue, because they need to be consumed once.
	commandsAMQPConfig := amqp.NewDurableQueueConfig(amqpAddress)
	commandsPublisher, err := amqp.NewPublisher(commandsAMQPConfig, logger)
	if err != nil {
		panic(err)
	}
	commandsSubscriber, err := amqp.NewSubscriber(commandsAMQPConfig, logger)
	if err != nil {
		panic(err)
	}

	// Events will be published to PubSub configured Rabbit, because they may be consumed by multiple consumers.
	// (in that case BookingsFinancialReport and OrderBeerOnRoomBooked).
	eventsPublisher, err := amqp.NewPublisher(amqp.NewDurablePubSubConfig(amqpAddress, nil), logger)
	if err != nil {
		panic(err)
	}

	// CQRS is built on messages router. Detailed documentation: https://watermill.io/docs/messages-router/
	router, err := message.NewRouter(message.RouterConfig{}, logger)
	if err != nil {
		panic(err)
	}

	// Simple middleware which will recover panics from event or command handlers.
	// More about router middlewares you can find in the documentation:
	// https://watermill.io/docs/messages-router/#middleware
	//
	// List of available middlewares you can find in message/router/middleware.
	router.AddMiddleware(middleware.Recoverer)

	commandBus, err := cqrs.NewCommandBusWithConfig(commandsPublisher, cqrs.CommandBusConfig{
		GeneratePublishTopic: func(params cqrs.CommandBusGeneratePublishTopicParams) (string, error) {
			// we are using queue RabbitMQ config, so we need to have topic per command type
			return params.CommandName, nil
		},
		OnSend: func(params cqrs.CommandBusOnSendParams) error {
			logger.Info("Sending command", watermill.LogFields{
				"command_name": params.CommandName,
			})

			params.Message.Metadata.Set("sent_at", time.Now().String())

			return nil
		},
		Marshaler: cqrsMarshaler,
		Logger:    logger,
	})
	if err != nil {
		panic(err)
	}

	commandProcessor, err := cqrs.NewCommandProcessorWithConfig(
		router,
		cqrs.CommandProcessorConfig{
			GenerateSubscribeTopic: func(params cqrs.CommandProcessorGenerateSubscribeTopicParams) (string, error) {
				// we are using queue RabbitMQ config, so we need to have topic per command type
				return params.CommandName, nil
			},
			SubscriberConstructor: func(params cqrs.CommandProcessorSubscriberConstructorParams) (message.Subscriber, error) {
				// we can reuse subscriber, because all commands have separated topics
				return commandsSubscriber, nil
			},
			OnHandle: func(params cqrs.CommandProcessorOnHandleParams) error {
				start := time.Now()

				err := params.Handler.Handle(params.Message.Context(), params.Command)

				logger.Info("Command handled", watermill.LogFields{
					"command_name": params.CommandName,
					"duration":     time.Since(start),
					"err":          err,
				})

				return err
			},
			Marshaler: cqrsMarshaler,
			Logger:    logger,
		},
	)
	if err != nil {
		panic(err)
	}

	eventBus, err := cqrs.NewEventBusWithConfig(eventsPublisher, cqrs.EventBusConfig{
		GeneratePublishTopic: func(params cqrs.GenerateEventPublishTopicParams) (string, error) {
			// because we are using PubSub RabbitMQ config, we can use one topic for all events
			return "events", nil

			// we can also use topic per event type
			// return params.EventName, nil
		},

		OnPublish: func(params cqrs.OnEventSendParams) error {
			logger.Info("Publishing event", watermill.LogFields{
				"event_name": params.EventName,
			})

			params.Message.Metadata.Set("published_at", time.Now().String())

			return nil
		},

		Marshaler: cqrsMarshaler,
		Logger:    logger,
	})
	if err != nil {
		panic(err)
	}

	eventProcessor, err := cqrs.NewEventGroupProcessorWithConfig(
		router,
		cqrs.EventGroupProcessorConfig{
			GenerateSubscribeTopic: func(params cqrs.EventGroupProcessorGenerateSubscribeTopicParams) (string, error) {
				return "events", nil
			},
			SubscriberConstructor: func(params cqrs.EventGroupProcessorSubscriberConstructorParams) (message.Subscriber, error) {
				config := amqp.NewDurablePubSubConfig(
					amqpAddress,
					amqp.GenerateQueueNameTopicNameWithSuffix(params.EventGroupName),
				)

				return amqp.NewSubscriber(config, logger)
			},

			OnHandle: func(params cqrs.EventGroupProcessorOnHandleParams) error {
				start := time.Now()

				err := params.Handler.Handle(params.Message.Context(), params.Event)

				logger.Info("Event handled", watermill.LogFields{
					"event_name": params.EventName,
					"duration":   time.Since(start),
					"err":        err,
				})

				return err
			},

			Marshaler: cqrsMarshaler,
			Logger:    logger,
		},
	)
	if err != nil {
		panic(err)
	}

	err = commandProcessor.AddHandlers(
		BookRoomHandler{eventBus},
		OrderBeerHandler{eventBus},
	)
	if err != nil {
		panic(err)
	}

	err = eventProcessor.AddHandlersGroup(
		"events",
		OrderBeerOnRoomBooked{commandBus},

		NewBookingsFinancialReport(),

		cqrs.NewGroupEventHandler(func(ctx context.Context, event *BeerOrdered) error {
			logger.Info("Beer ordered", watermill.LogFields{
				"room_id": event.RoomId,
			})
			return nil
		}),
	)
	if err != nil {
		panic(err)
	}

	// publish BookRoom commands every second to simulate incoming traffic
	go publishCommands(commandBus)

	// processors are based on router, so they will work when router will start
	if err := router.Run(context.Background()); err != nil {
		panic(err)
	}
}
// ...

นั้นคือ พวกเรามีแอปพลิเคชัน CQRS ที่สามารถทำงานได้แล้วครับ