กลไก CQRS
CQRS ย่อมาจาก "Command Query Responsibility Segregation" ซึ่งแยกความรับผิดชอบของคำสั่ง (คำขอเขียน) และคิวรี่ (คำขออ่าน) โดยการจัดการคำขอเขียนและคำขออ่านโดยอ็อบเจกต์ที่แตกต่างกัน
นี่คือ CQRS และเราสามารถแยกการเก็บรักษาข้อมูลโดยมีการเก็บรักษาข้อมูลที่แตกต่างกันสำหรับการอ่านและเขียน หลังจากที่ทำการแยกแล้ว อาจมีการเก็บรักษาข้อมูลที่ใช้สำหรับการอ่านที่ถูกปรับปรุงเพื่อจัดการคิวรี่ประเภทต่าง ๆ หรือสามารถครอบคลุมขอบเขตที่แตกต่างกัน แม้ว่าการเก็บรักษาข้อมูลที่แยกกันสำหรับการอ่าน/เขียนจะเป็นหัวข้อที่นิพนธ์เกี่ยวกับ CQRS แต่มันไม่ใช่ CQRS เอง CQRS เป็นการแยกคำสั่งและคิวรี่เท่านั้น
คอมโพเนนต์ cqrs
มีการให้หลายความสามรถที่สำคัญ ๆ สร้างขึ้นบน Pub/Sub และ Router เพื่อช่วยในการปรับใช้รูปแบบ CQRS
คุณไม่จำเป็นต้องทำการปรับใช้ CQRS ทั้งหมด โดยทั่วไปแล้ว เฉพาะส่วนเหตุการณ์ของคอมโพเนนต์จะถูกใช้เพื่อสร้างแอปพลิเคชันที่เน้นเหตุการณ์
บล็อกการสร้าง
เหตุการณ์ (Events)
เหตุการณ์แทนสิ่งที่เป็นไปแล้ว และเหตุการณ์มีความไม่เปลี่ยนแปลง
รถประทับ (Event Bus)
รหัสภายใต้: github.com/ThreeDotsLabs/watermill/components/cqrs/event_bus.go
// ...
// EventBus ขนส่งเหตุการณ์ไปยังผู้จัดการเหตุการณ์
type EventBus struct {
// ...
รหัสภายใต้: github.com/ThreeDotsLabs/watermill/components/cqrs/event_bus.go
// ...
// EventBusConfig มีการกำหนดค่าสำหรับการสร้างชื่อหัวข้อสำหรับการเผยแพร่เหตุการณ์
type EventBusConfig struct {
// GeneratePublishTopic ถูกใช้ในการสร้างชื่อหัวข้อสำหรับการเผยแพร่เหตุการณ์
GeneratePublishTopic GenerateEventPublishTopicFn
// OnPublish จะถูกเรียกก่อนการส่งเหตุการณ์ มันสามารถแก้แขนง *message.Message
//
// อินสแตนซ์นี้ไม่จำเป็น
OnPublish OnEventSendFn
// Marshaler ถูกใช้สำหรับการเข้ารหัสและถอดรหัสเหตุการณ์
// นี่จะเป็นสิ่งที่เข็มงัด
Marshaler CommandEventMarshaler
// อินสแตนซ์ของล็อกเกอร์สำหรับการบันทึก หากไม่ได้กำหนด, จะใช้ watermill.NopLogger
Logger watermill.LoggerAdapter
}
func (c *EventBusConfig) setDefaults() {
ถ้า c.Logger มีค่าเป็น nil {
c.Logger = watermill.NopLogger{}
}
}
// ...
ตัวประมวลผลเหตุการณ์
รหัสเต็ม: github.com/ThreeDotsLabs/watermill/components/cqrs/event_processor.go
// ...
// EventProcessor ใช้ในการกำหนด EventHandler ที่ควรจะจัดการกับเหตุการณ์ที่ได้รับจาก autobus เหตุการณ์
type EventProcessor struct {
// ...
รหัสเต็ม: github.com/ThreeDotsLabs/watermill/components/cqrs/event_processor.go
// ...
type EventProcessorConfig struct {
// GenerateSubscribeTopic ใช้ในการสร้างหัวข้อสำหรับการสับซึค์เหตุการณ์
// หากตัวประมวลผลเหตุการณ์ใช้กลุ่มผู้จัดการ จะใช้ GenerateSubscribeTopic
GenerateSubscribeTopic EventProcessorGenerateSubscribeTopicFn
// SubscriberConstructor ใช้ในการสร้างผู้สับซึกสำหรับ EventHandler
//
// ฟังก์ชันนี้จะถูกเรียกครั้งเดียวสำหรับแต่ละตัวอย่างของ EventHandler
// หากคุณต้องการ reuse ผู้สับซึกสำหรับผู้จัดการหลายๆ คน ให้ใช้ GroupEventProcessor
SubscriberConstructor EventProcessorSubscriberConstructorFn
// OnHandle เรียกก่อนจัดการกับเหตุการณ์
// OnHandle ทำงานแบบ middleware: คุณสามารถซึ้งตรรก ก่อน และหลังจากจัดการกับเหตุการณ์
//
// ดังนั้น คุณต้องเรียก params.Handler.Handle() อย่างชัดเจนเพื่อจัดการกับเหตุการณ์
//
// func(params EventProcessorOnHandleParams) (err error) {
// // ตรรกก่อนจัดการ
// // (...)
// err := params.Handler.Handle(params.Message.Context(), params.Event)
//
// // ตรรกหลังจัดการ
// // (...)
// return err
// }
//
// ตัวเลือกนี้ไม่ใช่บังคับ
OnHandle EventProcessorOnHandleFn
// AckOnUnknownEvent ใช้ในการกำหนดว่าข้อความควรรับทราบหรือไม่เมื่อเหตุการณ์ไม่มีผู้จัดการที่กำหนดไว้
AckOnUnknownEvent bool
// Marshaler ใช้ในการ marshal และ unmarshal เหตุการณ์
// ต้องระบุ
Marshaler CommandEventMarshaler
// Logger instance เพื่อการบันทึก
// หากไม่ได้รับการระบุ จะถูกใช้ watermill.NopLogger
Logger watermill.LoggerAdapter
// disableRouterAutoAddHandlers เพื่อการรักษาความเข้ากันได้กับเวอร์ชันย้อนหลัง
// ค่านี้จะถูกตั้งค่าเมื่อสร้าง EventProcessor โดยใช้ NewEventProcessor
// ถูกปลดการใช้: ย้ายไปที่ NewEventProcessorWithConfig เป็นการเด็ไรกับเวอร์ชัน
disableRouterAutoAddHandlers bool
}
func (c *EventProcessorConfig) setDefaults() {
if c.Logger == nil {
c.Logger = watermill.NopLogger{}
}
}
// ...
ตัวประมวลผลกลุ่มเหตุการณ์
รหัสภายในเต็ม: github.com/ThreeDotsLabs/watermill/components/cqrs/event_processor_group.go
// ...
// EventGroupProcessor กำหนดว่าตัวประมวลผลเหตุการณ์ควรจะดำเนินการเหตุการณ์ที่ได้รับมาจากคิวเหตุการณ์อย่างไร
// เปรียบเทียบกับ EventProcessor ตัวประมวลผลกลุ่มเหตุการณ์ช่วยให้ตัวประมวลผลหลายตัวสามารถแชร์ตัวสับสนได้
type EventGroupProcessor struct {
// ...
รหัสภายในเต็ม: github.com/ThreeDotsLabs/watermill/components/cqrs/event_processor_group.go
// ...
type EventGroupProcessorConfig struct {
// GenerateSubscribeTopic ใช้สำหรับสร้างหัวข้อสำหรับการสับสนกับตัวประมวลผลกลุ่มเหตุการณ์
// ตัวเลือกนี้จำเป็นสำหรับ EventProcessor เมื่อใช้กลุ่มตัวประมวลผล
GenerateSubscribeTopic EventGroupProcessorGenerateSubscribeTopicFn
// SubscriberConstructor ใช้สำหรับสร้างผู้สับสนสำหรับ GroupEventHandler
// ฟังก์ชันนี้จะถูกเรียกครั้งละครั้งต่อกลุ่มเหตุการณ์ - อนุญาตให้มีการสับสนต่อกลุ่มละหนึ่งครั้ง
// มันเป็นการใช้สร้างแต่ละกลุ่มเมื่อต้องการจัดการเหตุการณ์จากสตรีมตามลำดับ
SubscriberConstructor EventGroupProcessorSubscriberConstructorFn
// OnHandle จะถูกเรียกก่อนการจัดการเหตุการณ์
// OnHandle คล้ายกับ middleware: คุณสามารถฝังตัวติดต่อก่อนและหลังการจัดการเหตุการณ์ได้
//
// ดังนั้น คุณจำเป็นต้องเรียก params.Handler.Handle() เพื่อจัดการเหตุการณ์
//
// func(params EventGroupProcessorOnHandleParams) (err error) {
// // ตรรกะก่อนจัดการ
// // (...)
//
// err := params.Handler.Handle(params.Message.Context(), params.Event)
//
// // ตรรกะหลังจัดการ
// // (...)
//
// return err
// }
//
// ตัวเลือกนี้ไม่จำเป็น
OnHandle EventGroupProcessorOnHandleFn
// AckOnUnknownEvent ใช้สำหรับกำหนดว่าควรยอมรับหรือไม่ถ้าเหตุการณ์ไม่มีตัวจัดการที่กำหนด
AckOnUnknownEvent bool
// Marshaler ใช้สำหรับการเข้ารหัสและถอดรหัสเหตุการณ์
// นี้จำเป็น
Marshaler CommandEventMarshaler
// Logger ภาคผู้ใช้ที่ใช้สำหรับการเข้าสับสน
// หากไม่ได้รับการระวัง watermill.NopLogger จะถูกใช้
Logger watermill.LoggerAdapter
}
func (c *EventGroupProcessorConfig) setDefaults() {
if c.Logger == nil {
c.Logger = watermill.NopLogger{}
}
}
// ...
เรียนรู้เพิ่มเติมเกี่ยวกับตัวประมวลผลกลุ่มเหตุการณ์
ตัวจัดการเหตุการณ์
รหัสภายในเต็ม: github.com/ThreeDotsLabs/watermill/components/cqrs/event_handler.go
// ...
// EventHandler รับเหตุการณ์ที่กำหนดโดย NewEvent และจัดการด้วยวิธีการจัดการของตัวเอง
// หากใช้ DDD ตัวจัดการเหตุการณ์สามารถปรับแต่งและบันทึกความหนา
// มันยังสามารถเรียกตัวจัดการกระบวนการหรือตำแหน่ง หรือแค่สร้างโมเดลที่อ่าน
//
// ไม่เหมือนกับตัวจัดการคำสั่งที่แต่ละเหตุการณ์สามารถมีตัวจัดการเหตุการณ์หลายตัว
//
// ระหว่างการจัดการข้อความใช้ประการตัวนึงของ EventHandler
// เมื่อส่งผ่านเหตุการณ์หลายรายการพร้อมกัน วิธีการจัดการสามารถทำงานซ้อนสอมากครั้งพร้อมกัน
// ดังนั้น วิธีการจัดการค่อนข้างต้องปลอดภัยต่อเส้น
type EventHandler interface {
// ...
คำสั่ง
คำสั่งคือโครงสร้างข้อมูลง่ายที่แสดงคำขอให้ดำเนินการบางการทำงาน
คำสั่งบัส
เรห็ง ซอร์สโค้ด: github.com/ThreeDotsLabs/watermill/components/cqrs/command_bus.go
// ...
// CommandBus คือคอมโพเนนต์ที่ขนส่งคำสั่งไปยังผู้จัดการคำสั่ง
type CommandBus struct {
// ...
เรห็ง ซอร์สโค้ด: github.com/ThreeDotsLabs/watermill/components/cqrs/command_bus.go
// ...
type CommandBusConfig struct {
// GeneratePublishTopic ใช้สำหรับสร้างหัวข้อสำหรับการเผยแพร่คำสั่ง
GeneratePublishTopic CommandBusGeneratePublishTopicFn
// OnSend ถูกเรียกก่อนที่จะเผยแพร่คำสั่ง
// *message.Message สามารถถูกแก้ไขได้
//
// ตัวเลือกนี้ไม่บังคับ
OnSend CommandBusOnSendFn
// Marshaler ใช้สำหรับการตั้งค่าและยอดการเคลียร์คำสั่ง
// เรียกเป็นผลิตภัณฑ์
// จำเป็น
Marshaler CommandEventMarshaler
// Logger ที่ใช้สำหรับการบันทึก
// หากไม่มีการให้, จะใช้ watermill.NopLogger แทน
Logger watermill.LoggerAdapter
}
func (c *CommandBusConfig) setDefaults() {
if c.Logger == nil {
c.Logger = watermill.NopLogger{}
}
}
// ...
โปรเซสเซอร์คำสั่ง
เรห็ง ซอร์สโค้ด: github.com/ThreeDotsLabs/watermill/components/cqrs/command_processor.go
// ...
// CommandProcessorSubscriberConstructorFn ใช้สำหรับสร้างผู้ติดตามสำหรับ CommandHandler
// มันช่วยให้คุณสร้างผู้ติดตามที่แตกต่างกันสำหรับทุก CommandHandler
type CommandProcessorSubscriberConstructorFn func(CommandProcessorSubscriberConstructorParams) (message.Subscriber, error)
// ...
เรห็ง ซอร์สโค้ด: github.com/ThreeDotsLabs/watermill/components/cqrs/command_processor.go
// ...
type CommandProcessorConfig struct {
// GenerateSubscribeTopic ใช้สำหรับสร้างหัวข้อสำหรับการติดตามคำสั่ง
GenerateSubscribeTopic CommandProcessorGenerateSubscribeTopicFn
// SubscriberConstructor ใช้สำหรับสร้างผู้ติดตามสำหรับ CommandHandler
SubscriberConstructor CommandProcessorSubscriberConstructorFn
// OnHandle ถูกเรียกก่อนการจัดการคำสั่ง
// OnHandle ทำงานเหมือน middleware: คุณสามารถฉีกเกินิกความ logic เพิ่มเติมก่อนและหลังจากการจัดการคำสั่ง
//
// เนื่องจากสิ่งนี้, คุณต้องเรียก params.Handler.Handle() โดยชัดเจนเพื่อจัดการคำสั่ง
// func(params CommandProcessorOnHandleParams) (err error) {
// // ตรรกก่อนการจัดการ
// // (...)
//
// err := params.Handler.Handle(params.Message.Context(), params.Command)
//
// // ตรรกหลังจากการจัดการ
// // (...)
//
// return err
// }
//
// ตัวเลือกนี้ไม่บังคับ
OnHandle CommandProcessorOnHandleFn
// Marshaler ใช้สำหรับการตั้งค่าและยอดการเคลียร์คำสั่ง
// จำเป็น
Marshaler CommandEventMarshaler
// Logger สำหรับบันทึก
// หากไม่มีการให้, จะใช้ watermill.NopLogger แทน
Logger watermill.LoggerAdapter
// หากเป็นจริง, CommandProcessor จะ ack ข้อความ แม้ว่า CommandHandler จะส่งคืนข้อผิดพลาด
// หาก RequestReplyBackend ไม่เป็นค่าว่างและการส่งตอบกลับผิดพลาด, ข้อความยังคงจะถูก nacked
//
// คำเตือน: ไม่แนะนำที่จะใช้ตัวเลือกนี้เมื่อใช้คำสั่ง requestreply (requestreply.NewCommandHandler หรือ requestreply.NewCommandHandlerWithResult),
// เนื่องจากมันอาจ ack คำสั่งเมื่อการส่งตอบกลับผิดพลาด
//
// เมื่อใช้ requestreply, คุณควรใช้ requestreply.PubSubBackendConfig.AckCommandErrors
AckCommandHandlingErrors bool
// ปิดความสามารถในการเพิ่มตัวจัดการเองโดยอัตโนมัติ
// ใช้สำหรับเลวทางถอยหลังเท่านั้น
// ตั้งค่าเมื่อสร้าง CommandProcessor ด้วย NewCommandProcessor
// เลิกใช้: โปรดย้ายไปที่ NewCommandProcessorWithConfig แทน
disableRouterAutoAddHandlers bool
}
func (c *CommandProcessorConfig) setDefaults() {
if c.Logger == nil {
c.Logger = watermill.NopLogger{}
}
}
// ...
ตัวประมวลผลคำสั่ง
โค้ดซอร์สแบบเต็ม: github.com/ThreeDotsLabs/watermill/components/cqrs/command_handler.go
// ...
// CommandHandler รับคำสั่งที่กำหนดโดย NewCommand และจัดการกับมันโดยใช้เมธอด Handle
// หากใช้ DDD, CommandHandler อาจแก้ไขและบันทึกส่วนรวม
//
// ต่างจาก EventHandler, แต่ละ Command สามารถมีเพียง CommandHandler เดียวเท่านั้น
//
// ระหว่างการจัดการข้อมูล, ใช้ CommandHandler หนึ่งกรณี
// เมื่อมีการส่งมอบคำสั่งหลายรายการพร้อมกัน, เมธอด Handle อาจถูกทำงานพร้อมกันหลายครั้ง
// ดังนั้น, เมธอด Handle จำเป็นต้องประสิทธิภาพเป็นสาย!
type CommandHandler interface {
// ...
ตัวแปลงคำสั่งและเหตุการณ์
โค้ดซอร์สแบบเต็ม: github.com/ThreeDotsLabs/watermill/components/cqrs/marshaler.go
// ...
// CommandEventMarshaler แปลงคำสั่งและเหตุการณ์เป็นข้อความ Watermill และในทางกลับกัน
// พล็อตไพษของคำสั่งจำเป็นต้องถูกแปลงเป็น []bytes
type CommandEventMarshaler interface {
// Marshal แปลงคำสั่งหรือเหตุการณ์เป็นข้อความ Watermill
Marshal(v interface{}) (*message.Message, error)
// Unmarshal ถอดรหัสข้อความ Watermill เป็นคำสั่งหรือเหตุการณ์ v
Unmarshal(msg *message.Message, v interface{}) (err error)
// Name คืนชื่อของคำสั่งหรือเหตุการณ์
// ชื่อนี้สามารถใช้เพื่อกำหนดว่าคำสั่งหรือเหตุการณ์ที่ได้รับเป็นที่เราต้องการประมวลผลหรือไม่
Name(v interface{}) string
// NameFromMessage คืนชื่อของคำสั่งหรือเหตุการณ์จากข้อความ Watermill (ที่สร้างโดย Marshal)
//
// เมื่อเรามีคำสั่งหรือเหตุการณ์ที่ได้แปลงเป็นข้อความ Watermill, เราควรใช้ NameFromMessage แทน Name เพื่อหลีกเลี่ยงการถอดรหัสที่ไม่จำเป็น
NameFromMessage(msg *message.Message) string
}
// ...
การใช้งาน
ตัวอย่างโดเมน
การใช้โดเมนที่เรียกว่าการจัดการการจองห้องพักในโรงแรม
เราจะใช้สัญลักษณ์ Event Storming เพื่อแสดงโมเดลของโดเมนนี้
สัญลักษณ์ตัวช่วย:
- สติกกี้โน๊ต ส้ม คือคำสั่ง
- สติกกี้โน๊ต ส้มสีเขียว คือเหตุการณ์
- สติกกี้โน๊ต ส้มสีฟ้า คือแบบจำลองการอ่านที่สร้างออกมาอย่างแบบไม่เกี่ยวข้องกับเหตุการณ์
- สติกกี้โน๊ต ส้มสีม่วง คือนโยบายที่ถูกเรียกด้วยเหตุการณ์และสร้างคำสั่ง
- สติกกี้โน๊ต ส้มสีชมพู คือจุดร้อน; เราเคารพพื้นที่ที่เจอปัญหาบ่อยครั้ง
โดเมนง่าย:
- ลูกค้าสามารถ จองห้องพัก
- ทุกครั้งที่มีการจองห้องพัก, เราจะสั่งหมดปากชายสำหรับลูกค้า (เพราะเรารักลูกค้าของเรา)
- เรารู้ว่าบางครั้ง เบียร์จะหมด
- เราสร้าง รายงานการเงิน โดยอ้างอิงการจอง
การส่งคำสั่ง
ก่อนอื่น, เราต้องจำลองการกระทำของลูกค้า
โค้ดซอร์สแบบเต็ม: github.com/ThreeDotsLabs/watermill/_examples/basic/5-cqrs-protobuf
// ...
bookRoomCmd := &BookRoom{
RoomId: fmt.Sprintf("%d", i),
GuestName: "John",
StartDate: startDate,
EndDate: endDate,
}
if err := commandBus.Send(context.Background(), bookRoomCmd); err != nil {
panic(err)
}
// ...
ตัวจัดการคำสั่ง
BookRoomHandler
จะเป็นผู้จัดการคำสั่งของเรา
โค้ดภายใน: github.com/ThreeDotsLabs/watermill/_examples/basic/5-cqrs-protobuf/main.go
// ...
// BookRoomHandler เป็นตัวจัดการคำสั่งที่ประมวลผลคำสั่ง BookRoom และส่งออกเหตุการณ์ RoomBooked
//
// ใน CQRS คำสั่งจะต้องถูกประมวลผลโดยตัวจัดการ
// เมื่อเพิ่มตัวจัดการอีกตัวหนึ่งที่จะจัดการคำสั่งนี้ จะมีข้อผิดพลาดที่ถูกส่งกลับ
type BookRoomHandler struct {
eventBus *cqrs.EventBus
}
func (b BookRoomHandler) HandlerName() string {
return "BookRoomHandler"
}
// NewCommand จะคืนชนิดของคำสั่งที่ตัวจัดการควรประมวลผล มันต้องเป็นตัวแสดง
func (b BookRoomHandler) NewCommand() interface{} {
return &BookRoom{}
}
func (b BookRoomHandler) Handle(ctx context.Context, c interface{}) error {
// c เป็นชนิดที่ถูกคืนโดย `NewCommand` ดังนั้นการยืนยันชนิดจะคุ้มครองเสมอ
cmd := c.(*BookRoom)
// คำนวณราคาสุ่มซึ่งอาจจะถูกคำนวณในทางที่มีเหตุผลมากขึ้นในการผลิตจริง
price := (rand.Int63n(40) + 1) * 10
log.Printf(
"จอง %s, จาก %s ถึง %s",
cmd.RoomId,
cmd.GuestName,
time.Unix(cmd.StartDate.Seconds, int64(cmd.StartDate.Nanos)),
time.Unix(cmd.EndDate.Seconds, int64(cmd.EndDate.Nanos)),
)
// RoomBooked จะถูกจัดการโดยตัวจัดการเหตุการณ์ OrderBeerOnRoomBooked
// และในอนาคต RoomBooked สามารถถูกจัดการโดยตัวจัดการเหตุการณ์หลายตัว
if err := b.eventBus.Publish(ctx, &RoomBooked{
ReservationId: watermill.NewUUID(),
RoomId: cmd.RoomId,
GuestName: cmd.GuestName,
Price: price,
StartDate: cmd.StartDate,
EndDate: cmd.EndDate,
}); err != nil {
return err
}
return nil
}
// OrderBeerOnRoomBooked เป็นตัวจัดการเหตุการณ์ที่ประมวลผลเหตุการณ์ RoomBooked และส่งออกคำสั่ง OrderBeer
// ...
ตัวจัดการเหตุการณ์
ตามที่กล่าวไว้ก่อนหน้านี้ เราต้องการสั่งขวดเบียร์ทุกครั้งที่จองห้อง (ที่แสดงว่า "เมื่อห้องถูกจอง") เราทำได้โดยใช้คำสั่ง OrderBeer
โค้ดภายใน: github.com/ThreeDotsLabs/watermill/_examples/basic/5-cqrs-protobuf/main.go
// ...
// OrderBeerOnRoomBooked เป็นตัวจัดการเหตุการณ์ที่ประมวลผลเหตุการณ์ RoomBooked และส่งออกคำสั่ง OrderBeer
type OrderBeerOnRoomBooked struct {
commandBus *cqrs.CommandBus
}
func (o OrderBeerOnRoomBooked) HandlerName() string {
// ชื่อนี้ถูกส่งไปยัง EventsSubscriberConstructor เพื่อสร้างชื่อคิว
return "OrderBeerOnRoomBooked"
}
func (OrderBeerOnRoomBooked) NewEvent() interface{} {
return &RoomBooked{}
}
func (o OrderBeerOnRoomBooked) Handle(ctx context.Context, e interface{}) error {
event := e.(*RoomBooked)
orderBeerCmd := &OrderBeer{
RoomId: event.RoomId,
Count: rand.Int63n(10) + 1,
}
return o.commandBus.Send(ctx, orderBeerCmd)
}
// OrderBeerHandler เป็นตัวจัดการคำสั่งที่ประมวลผลคำสั่ง OrderBeer และส่งออกเหตุการณ์ BeerOrdered
// ...
OrderBeerHandler
เหมือนกับ BookRoomHandler
มาก ความแตกต่างเดียวคือ มันอาจคืนข้อผิดพลาดเมื่อไม่มีเบียร์เพียงพอ ทำให้ต้องออกคำสั่งใหม่ คุณสามารถค้นพบการปฏิบัติที่สมบูรณ์ใน โค้ดตัวอย่าง
กลุ่มผู้จัดการเหตุการณ์ (Event Handler Groups)
ตามค่าเริ่มต้น มีตัวรับสมาชิกแยกต่างหากสำหรับแต่ละตัวจัดการเหตุการณ์ วิธีการนี้สามารถทำงานได้ดีถ้ามีเพียงเพียงประเภทเหตุการณ์เดียวที่ถูกส่งไปยังหัวข้อ (topic) นั่นเอง
ในกรณีที่มีหลายประเภทของเหตุการณ์บนหัวข้อ มีทางเลือกสองทางดังนี้:
- คุณสามารถตั้งค่า
EventConfig.AckOnUnknownEvent
เป็นtrue
- นี้จะอนุมัติเหตุการณ์ทั้งหมดที่ไม่ได้รับการจัดการโดยตัวจัดการ. - คุณสามารถใช้กลไกของกลุ่มผู้จัดการเหตุการณ์ (event handler group mechanism)
ในการใช้กลุ่มเหตุการณ์ คุณต้องตั้งค่า GenerateHandlerGroupSubscribeTopic
และ GroupSubscriberConstructor
ในตัวเลือกของ EventConfig
จากนั้นคุณสามารถใช้ AddHandlersGroup
บน EventProcessor
รหัสภายใน: github.com/ThreeDotsLabs/watermill/_examples/basic/5-cqrs-protobuf/main.go
// ...
err = eventProcessor.AddHandlersGroup(
"events",
OrderBeerOnRoomBooked{commandBus},
NewBookingsFinancialReport(),
cqrs.NewGroupEventHandler(func(ctx context.Context, event *BeerOrdered) error {
logger.Info("การสั่งเบียร์", watermill.LogFields{
"room_id": event.RoomId,
})
return nil
}),
)
if err != nil {
// ...
ทั้ง GenerateHandlerGroupSubscribeTopic
และ GroupSubscriberConstructor
จะรับข้อมูลเกี่ยวกับชื่อกลุ่มเป็นพารามิเตอร์ของฟังก์ชัน
ตัวจัดการทั่วไป (Generic Handlers)
ตั้งแต่ Watermill เวอร์ชัน 1.3, ตัวจัดการทั่วไปสามารถใช้ในการจัดการคำสั่งและเหตุการณ์ได้ มีประโยชน์มากเมื่อคุณมีจำนวนคำสั่ง/เหตุการณ์มากและไม่ต้องการสร้างตัวจัดการสำหรับแต่ละคำสั่ง/เหตุการณ์
รหัสภายใน: github.com/ThreeDotsLabs/watermill/_examples/basic/5-cqrs-protobuf/main.go
// ...
cqrs.NewGroupEventHandler(func(ctx context.Context, event *BeerOrdered) error {
logger.Info("การสั่งเบียร์", watermill.LogFields{
"room_id": event.RoomId,
})
return nil
}),
// ...
เบื้องหลังแล้ว มันสร้างมาจากการสร้างตัวจัดการเหตุการณ์หรือตัวจัดการคำสั่ง มันเหมาะสำหรับทุกประเภทของตัวจัดการ
รหัสภายใน: github.com/ThreeDotsLabs/watermill/components/cqrs/command_handler.go
// ...
// NewCommandHandler สร้างการดำเนินการคำสั่งใหม่โดยใช้ฟังก์ชันที่ให้ไว้ และประเภทคำสั่งที่ได้รับมาจากพารามิเตอร์ของฟังก์ชัน
func NewCommandHandler[Command any](
// ...
รหัสภายใน: github.com/ThreeDotsLabs/watermill/components/cqrs/event_handler.go
// ...
// NewEventHandler สร้างการดำเนินการเหตุการณ์ใหม่โดยใช้ฟังก์ชันที่ให้ไว้ และประเภทเหตุการณ์ที่ได้รับมาจากพารามิเตอร์ของฟังก์ชัน
func NewEventHandler[T any](
// ...
รหัสภายใน: github.com/ThreeDotsLabs/watermill/components/cqrs/event_handler.go
// ...
// NewGroupEventHandler สร้างการดำเนินการกลุ่มเหตุการณ์ใหม่โดยใช้ฟังก์ชันที่ให้ไว้ และประเภทเหตุการณ์ที่ได้รับมาจากพารามิเตอร์ของฟังก์ชัน
func NewGroupEventHandler[T any](handleFunc func(ctx context.Context, event *T) error) GroupEventHandler {
// ...
สร้างโมเดลที่ใช้จากการใช้งานเหตุการณ์
โค้ดที่สมบูรณ์: github.com/ThreeDotsLabs/watermill/_examples/basic/5-cqrs-protobuf/main.go
// ...
// BookingsFinancialReport เป็นโมเดลที่ใช้สำหรับคำนวณว่าเราสามารถรับรายได้เท่าไหร่จากการจองห้องพัก
// มันจะฟังเหตุการณ์ RoomBooked เมื่อเกิดเหตุการณ์
//
// การประมวลผลนี้เพียงแค่เขียนลงในหน่วยความจำ ในสภาพแวดล้อมของการผลิต คุณอาจใช้รูปแบบการเก็บรักษาที่มีความต่อเนื่อง
type BookingsFinancialReport struct {
handledBookings map[string]struct{}
totalCharge int64
lock sync.Mutex
}
func NewBookingsFinancialReport() *BookingsFinancialReport {
return &BookingsFinancialReport{handledBookings: map[string]struct{}{}}
}
func (b BookingsFinancialReport) HandlerName() string {
// ชื่อนี้ถูกส่งให้กับ EventsSubscriberConstructor และถูกใช้เพื่อสร้างชื่อคิว
return "BookingsFinancialReport"
}
func (BookingsFinancialReport) NewEvent() interface{} {
return &RoomBooked{}
}
func (b *BookingsFinancialReport) Handle(ctx context.Context, e interface{}) error {
// การจัดการอาจถูกเรียกพร้อมกัน ดังนั้นต้องมีความปลอดภัยในเรื่องของเธรด
b.lock.Lock()
defer b.lock.Unlock()
event := e.(*RoomBooked)
// เมื่อใช้ Pub/Sub ซึ่งไม่ให้การส่งสารที่มีเพียงครั้งเดียวอย่างแน่นอน จำเป็นต้องทำการถอดความซ้ำกันของข้อความ
// GoChannel Pub/Sub ให้การส่งสารที่มีเพียงครั้งเดียว,
// แต่เพื่อเตรียมตัวสำหรับตัวอย่างนี้เล้วิต อย่างอื่น ๆ จำเป็นต้องทำการคัดคำซ้ำด้วย
if _, ok := b.handledBookings[event.ReservationId]; ok {
return nil
}
b.handledBookings[event.ReservationId] = struct{}{}
b.totalCharge += event.Price
fmt.Printf(">>> จองห้องพักให้เป็น $%d\n", b.totalCharge)
return nil
}
var amqpAddress = "amqp://guest:guest@rabbitmq:5672/"
func main() {
// ...
เชื่อมต่อทุกอย่าง
เรามีสิ่งที่จำเป็นทั้งหมดในการสร้างแอปพลิเคชัน CQRS แล้ว
เราจะใช้ AMQP (RabbitMQ) เป็น message broker ของเรา: AMQP.
ในพื้นที่ภายใน CQRS ใช้ Watermill's message router ซึ่งถ้าคุณยังไม่คุ้นเคยและต้องการเข้าใจว่ามันทำงานอย่างไร คุณควรตรวจสอบไกด์เริ่มต้น มันยังจะแสดงให้คุณเห็นว่าจะใช้รูปแบบการส่งข้อความมาตรฐานบางประการ เช่น การเก็บวัดผล คิวข้อความพิษ การจำกัดอัตรา, ความสัมพันธ์ และเครื่องมืออื่นที่ใช้โดยแอปพลิเคชันแบบส่งข้อความทุกๆ ประการ ๆ นี้ถูกสร้างใน Watermill แล้ว
มากลับสู่ CQRS กันบ้าง เช่นที่คุณทราบอยู่แล้ว CQRS ประกอบด้วยส่วนประกอบหลาย ๆ อย่าง เช่น command หรือ event buses, processors, และอื่น ๆ
รหัสภายใน: github.com/ThreeDotsLabs/watermill/_examples/basic/5-cqrs-protobuf/main.go
// ...
func main() {
logger := watermill.NewStdLogger(false, false)
cqrsMarshaler := cqrs.ProtobufMarshaler{}
// You can use any Pub/Sub implementation from here: https://watermill.io/pubsubs/
// Detailed RabbitMQ implementation: https://watermill.io/pubsubs/amqp/
// Commands will be send to queue, because they need to be consumed once.
commandsAMQPConfig := amqp.NewDurableQueueConfig(amqpAddress)
commandsPublisher, err := amqp.NewPublisher(commandsAMQPConfig, logger)
if err != nil {
panic(err)
}
commandsSubscriber, err := amqp.NewSubscriber(commandsAMQPConfig, logger)
if err != nil {
panic(err)
}
// Events will be published to PubSub configured Rabbit, because they may be consumed by multiple consumers.
// (in that case BookingsFinancialReport and OrderBeerOnRoomBooked).
eventsPublisher, err := amqp.NewPublisher(amqp.NewDurablePubSubConfig(amqpAddress, nil), logger)
if err != nil {
panic(err)
}
// CQRS is built on messages router. Detailed documentation: https://watermill.io/docs/messages-router/
router, err := message.NewRouter(message.RouterConfig{}, logger)
if err != nil {
panic(err)
}
// Simple middleware which will recover panics from event or command handlers.
// More about router middlewares you can find in the documentation:
// https://watermill.io/docs/messages-router/#middleware
//
// List of available middlewares you can find in message/router/middleware.
router.AddMiddleware(middleware.Recoverer)
commandBus, err := cqrs.NewCommandBusWithConfig(commandsPublisher, cqrs.CommandBusConfig{
GeneratePublishTopic: func(params cqrs.CommandBusGeneratePublishTopicParams) (string, error) {
// we are using queue RabbitMQ config, so we need to have topic per command type
return params.CommandName, nil
},
OnSend: func(params cqrs.CommandBusOnSendParams) error {
logger.Info("Sending command", watermill.LogFields{
"command_name": params.CommandName,
})
params.Message.Metadata.Set("sent_at", time.Now().String())
return nil
},
Marshaler: cqrsMarshaler,
Logger: logger,
})
if err != nil {
panic(err)
}
commandProcessor, err := cqrs.NewCommandProcessorWithConfig(
router,
cqrs.CommandProcessorConfig{
GenerateSubscribeTopic: func(params cqrs.CommandProcessorGenerateSubscribeTopicParams) (string, error) {
// we are using queue RabbitMQ config, so we need to have topic per command type
return params.CommandName, nil
},
SubscriberConstructor: func(params cqrs.CommandProcessorSubscriberConstructorParams) (message.Subscriber, error) {
// we can reuse subscriber, because all commands have separated topics
return commandsSubscriber, nil
},
OnHandle: func(params cqrs.CommandProcessorOnHandleParams) error {
start := time.Now()
err := params.Handler.Handle(params.Message.Context(), params.Command)
logger.Info("Command handled", watermill.LogFields{
"command_name": params.CommandName,
"duration": time.Since(start),
"err": err,
})
return err
},
Marshaler: cqrsMarshaler,
Logger: logger,
},
)
if err != nil {
panic(err)
}
eventBus, err := cqrs.NewEventBusWithConfig(eventsPublisher, cqrs.EventBusConfig{
GeneratePublishTopic: func(params cqrs.GenerateEventPublishTopicParams) (string, error) {
// because we are using PubSub RabbitMQ config, we can use one topic for all events
return "events", nil
// we can also use topic per event type
// return params.EventName, nil
},
OnPublish: func(params cqrs.OnEventSendParams) error {
logger.Info("Publishing event", watermill.LogFields{
"event_name": params.EventName,
})
params.Message.Metadata.Set("published_at", time.Now().String())
return nil
},
Marshaler: cqrsMarshaler,
Logger: logger,
})
if err != nil {
panic(err)
}
eventProcessor, err := cqrs.NewEventGroupProcessorWithConfig(
router,
cqrs.EventGroupProcessorConfig{
GenerateSubscribeTopic: func(params cqrs.EventGroupProcessorGenerateSubscribeTopicParams) (string, error) {
return "events", nil
},
SubscriberConstructor: func(params cqrs.EventGroupProcessorSubscriberConstructorParams) (message.Subscriber, error) {
config := amqp.NewDurablePubSubConfig(
amqpAddress,
amqp.GenerateQueueNameTopicNameWithSuffix(params.EventGroupName),
)
return amqp.NewSubscriber(config, logger)
},
OnHandle: func(params cqrs.EventGroupProcessorOnHandleParams) error {
start := time.Now()
err := params.Handler.Handle(params.Message.Context(), params.Event)
logger.Info("Event handled", watermill.LogFields{
"event_name": params.EventName,
"duration": time.Since(start),
"err": err,
})
return err
},
Marshaler: cqrsMarshaler,
Logger: logger,
},
)
if err != nil {
panic(err)
}
err = commandProcessor.AddHandlers(
BookRoomHandler{eventBus},
OrderBeerHandler{eventBus},
)
if err != nil {
panic(err)
}
err = eventProcessor.AddHandlersGroup(
"events",
OrderBeerOnRoomBooked{commandBus},
NewBookingsFinancialReport(),
cqrs.NewGroupEventHandler(func(ctx context.Context, event *BeerOrdered) error {
logger.Info("Beer ordered", watermill.LogFields{
"room_id": event.RoomId,
})
return nil
}),
)
if err != nil {
panic(err)
}
// publish BookRoom commands every second to simulate incoming traffic
go publishCommands(commandBus)
// processors are based on router, so they will work when router will start
if err := router.Run(context.Background()); err != nil {
panic(err)
}
}
// ...
นั้นคือ พวกเรามีแอปพลิเคชัน CQRS ที่สามารถทำงานได้แล้วครับ