Cơ chế CQRS
CQRS viết tắt của "Command Query Responsibility Segregation" (Phân chia Trách nhiệm Lệnh và Truy vấn). Nó phân tách trách nhiệm của lệnh (yêu cầu ghi) và truy vấn (yêu cầu đọc). Yêu cầu ghi và yêu cầu đọc được xử lý bởi các đối tượng khác nhau.
Đây là CQRS. Chúng ta có thể tiến hành phân chia lưu trữ dữ liệu, có lưu trữ đọc và lưu trữ ghi riêng biệt. Khi điều này được thực hiện, có thể có nhiều lưu trữ đọc được tối ưu hóa để xử lý các loại truy vấn khác nhau hoặc bao gồm nhiều ngữ cảnh có giới hạn khác nhau. Mặc dù việc phân chia lưu trữ đọc/ghi riêng biệt thường là đề tài thảo luận liên quan đến CQRS, nhưng đó không phải là CQRS chính. CQRS chỉ là sự phân chia đầu tiên giữa lệnh và truy vấn.
Thành phần cqrs
cung cấp một số trừu tượng hữu ích, được xây dựng dựa trên Pub/Sub và Router, để giúp triển khai mô hình CQRS.
Bạn không cần triển khai toàn bộ CQRS. Thông thường, chỉ phần sự kiện của thành phần được sử dụng để xây dựng ứng dụng dựa trên sự kiện.
Các Khối Xây dựng
Sự kiện
Sự kiện đại diện cho điều đã xảy ra. Sự kiện không thể thay đổi.
Bus Sự kiện
Mã nguồn đầy đủ: github.com/ThreeDotsLabs/watermill/components/cqrs/event_bus.go
// ...
// EventBus vận chuyển các sự kiện đến trình xử lý sự kiện.
type EventBus struct {
// ...
Mã nguồn đầy đủ: github.com/ThreeDotsLabs/watermill/components/cqrs/event_bus.go
// ...
type EventBusConfig struct {
// GeneratePublishTopic được sử dụng để tạo tên chủ đề cho việc xuất bản sự kiện.
GeneratePublishTopic GenerateEventPublishTopicFn
// OnPublish được gọi trước khi gửi sự kiện. Nó có thể sửa đổi *message.Message.
//
// Tùy chọn này không bắt buộc.
OnPublish OnEventSendFn
// Marshaler được sử dụng để mã hóa và giải mã các sự kiện.
// Điều này là bắt buộc.
Marshaler CommandEventMarshaler
// Đối tượng Logger để ghi log. Nếu không được cung cấp, watermill.NopLogger sẽ được sử dụng.
Logger watermill.LoggerAdapter
}
func (c *EventBusConfig) setDefaults() {
if c.Logger == nil {
c.Logger = watermill.NopLogger{}
}
}
// ...
Bộ xử lý Sự kiện
Mã nguồn đầy đủ: github.com/ThreeDotsLabs/watermill/components/cqrs/event_processor.go
// ...
// EventProcessor được sử dụng để xác định EventHandler nào sẽ xử lý các sự kiện nhận được từ bus sự kiện.
type EventProcessor struct {
// ...
Mã nguồn đầy đủ: github.com/ThreeDotsLabs/watermill/components/cqrs/event_processor.go
// ...
type EventProcessorConfig struct {
// GenerateSubscribeTopic được sử dụng để tạo ra chủ đề để đăng ký nhận các sự kiện.
// Nếu bộ xử lý sự kiện sử dụng các nhóm xử lý, thì GenerateSubscribeTopic được sử dụng.
GenerateSubscribeTopic EventProcessorGenerateSubscribeTopicFn
// SubscriberConstructor được sử dụng để tạo ra một người đăng ký cho EventHandler.
//
// Hàm này được gọi một lần cho mỗi instance của EventHandler.
// Nếu bạn muốn tái sử dụng một người đăng ký cho nhiều xử lý, hãy sử dụng GroupEventProcessor.
SubscriberConstructor EventProcessorSubscriberConstructorFn
// OnHandle được gọi trước khi xử lý sự kiện.
// OnHandle hoạt động tương tự như middleware: bạn có thể chèn logic bổ sung trước và sau khi xử lý sự kiện.
//
// Do đó, bạn cần gọi một cách rõ ràng params.Handler.Handle() để xử lý sự kiện.
//
// func(params EventProcessorOnHandleParams) (err error) {
// // Logic trước khi xử lý
// // (...)
// err := params.Handler.Handle(params.Message.Context(), params.Event)
//
// // Logic sau khi xử lý
// // (...)
// return err
// }
//
// Tùy chọn này không bắt buộc.
OnHandle EventProcessorOnHandleFn
// AckOnUnknownEvent được sử dụng để xác định nếu tin nhắn nên được chấp nhận khi sự kiện không có xử lý đã được xác định.
AckOnUnknownEvent bool
// Marshaler được sử dụng để mã hóa và giải mã các sự kiện.
// Cần thiết.
Marshaler CommandEventMarshaler
// Logger instance dùng để ghi log.
// Nếu không được cung cấp, watermill.NopLogger sẽ được sử dụng.
Logger watermill.LoggerAdapter
// disableRouterAutoAddHandlers là để duy trì tính tương thích ngược.
// Giá trị này sẽ được thiết lập khi tạo EventProcessor bằng cách sử dụng NewEventProcessor.
// Đã lỗi thời: di chuyển sang NewEventProcessorWithConfig.
disableRouterAutoAddHandlers bool
}
func (c *EventProcessorConfig) setDefaults() {
if c.Logger == nil {
c.Logger = watermill.NopLogger{}
}
}
// ...
Bộ xử lý Nhóm Sự kiện
Toàn bộ Mã nguồn: github.com/ThreeDotsLabs/watermill/components/cqrs/event_processor_group.go
// ...
// EventGroupProcessor xác định bộ xử lý sự kiện nào sẽ xử lý các sự kiện được nhận từ bus sự kiện.
// So với EventProcessor, EventGroupProcessor cho phép nhiều bộ xử lý chia sẻ cùng một phiên đăng ký.
type EventGroupProcessor struct {
// ...
Toàn bộ Mã nguồn: github.com/ThreeDotsLabs/watermill/components/cqrs/event_processor_group.go
// ...
type EventGroupProcessorConfig struct {
// GenerateSubscribeTopic được sử dụng để tạo chủ đề để đăng ký bộ xử lý sự kiện nhóm.
// Tùy chọn này là bắt buộc đối với EventProcessor khi sử dụng các nhóm bộ xử lý.
GenerateSubscribeTopic EventGroupProcessorGenerateSubscribeTopicFn
// SubscriberConstructor được sử dụng để tạo một người đăng ký cho GroupEventHandler.
// Hàm này được gọi một lần cho mỗi nhóm sự kiện - cho phép một phiên đăng ký được tạo cho mỗi nhóm.
// Nó rất hữu ích khi chúng ta muốn xử lý các sự kiện từ một luồng theo thứ tự.
SubscriberConstructor EventGroupProcessorSubscriberConstructorFn
// OnHandle được gọi trước khi xử lý sự kiện.
// OnHandle tương tự như middleware: bạn có thể chèn logic bổ sung trước và sau khi xử lý sự kiện.
//
// Do đó, bạn cần gọi rõ ràng params.Handler.Handle() để xử lý sự kiện.
//
// func(params EventGroupProcessorOnHandleParams) (err error) {
// // Logic trước khi xử lý
// // (...)
//
// err := params.Handler.Handle(params.Message.Context(), params.Event)
//
// // Logic sau khi xử lý
// // (...)
//
// return err
// }
//
// Tùy chọn này không bắt buộc.
OnHandle EventGroupProcessorOnHandleFn
// AckOnUnknownEvent được sử dụng để xác định liệu có nên chấp nhận nếu sự kiện không có bộ xử lý được xác định.
AckOnUnknownEvent bool
// Marshaler được sử dụng để mã hóa và giải mã các sự kiện.
// Điều này là bắt buộc.
Marshaler CommandEventMarshaler
// Logger được sử dụng để ghi nhật ký.
// Nếu không được cung cấp, watermill.NopLogger sẽ được sử dụng.
Logger watermill.LoggerAdapter
}
func (c *EventGroupProcessorConfig) setDefaults() {
if c.Logger == nil {
c.Logger = watermill.NopLogger{}
}
}
// ...
Tìm hiểu thêm về Bộ xử lý Nhóm Sự kiện.
Bộ xử lý Sự kiện
Toàn bộ Mã nguồn: github.com/ThreeDotsLabs/watermill/components/cqrs/event_handler.go
// ...
// EventHandler nhận các sự kiện được xác định bởi NewEvent và xử lý chúng bằng phương thức Handle của mình.
// Nếu sử dụng DDD, bộ xử lý sự kiện có thể sửa đổi và lưu trữ các tổng hợp.
// Nó cũng có thể kích hoạt các quản lý quy trình, saga, hoặc chỉ xây dựng các mô hình đọc.
//
// Khác biệt với bộ xử lý lệnh, mỗi sự kiện có thể có nhiều bộ xử lý sự kiện.
//
// Trong quá trình xử lý tin nhắn, sử dụng một trường hợp EventHandler.
// Khi truyền nhiều sự kiện cùng một lúc, phương thức Handle có thể được thực thi nhiều lần song song.
// Do đó, phương thức Handle cần phải được an toàn đối với luồng!
type EventHandler interface {
// ...
Lệnh
Một lệnh là một cấu trúc dữ liệu đơn giản đại diện cho một yêu cầu thực hiện một số thao tác.
Bus lệnh
Complete source code: github.com/ThreeDotsLabs/watermill/components/cqrs/command_bus.go
// ...
// Bus lệnh là thành phần chuyển tải các lệnh đến trình xử lý lệnh.
type CommandBus struct {
// ...
Complete source code: github.com/ThreeDotsLabs/watermill/components/cqrs/command_bus.go
// ...
type CommandBusConfig struct {
// GeneratePublishTopic được sử dụng để tạo chủ đề để xuất bản các lệnh.
GeneratePublishTopic CommandBusGeneratePublishTopicFn
// OnSend được gọi trước khi xuất bản một lệnh.
// *message.Message có thể được sửa đổi.
//
// Tùy chọn này không bắt buộc.
OnSend CommandBusOnSendFn
// Marshaler được sử dụng để serialize và deserialize các lệnh.
// Cần thiết.
Marshaler CommandEventMarshaler
// Thể hiện của Logger được sử dụng để logging.
// Nếu không được cung cấp, watermill.NopLogger sẽ được sử dụng.
Logger watermill.LoggerAdapter
}
func (c *CommandBusConfig) setDefaults() {
if c.Logger == nil {
c.Logger = watermill.NopLogger{}
}
}
// ...
Xử lý lệnh
Complete source code: github.com/ThreeDotsLabs/watermill/components/cqrs/command_processor.go
// ...
// CommandProcessorSubscriberConstructorFn được sử dụng để tạo người đăng ký cho CommandHandler.
// Nó cho phép bạn tạo người đăng ký tùy chỉnh riêng cho mỗi trình xử lý lệnh.
type CommandProcessorSubscriberConstructorFn func(CommandProcessorSubscriberConstructorParams) (message.Subscriber, error)
// ...
Complete source code: github.com/ThreeDotsLabs/watermill/components/cqrs/command_processor.go
// ...
type CommandProcessorConfig struct {
// GenerateSubscribeTopic được sử dụng để tạo chủ đề để đăng ký nhận các lệnh.
GenerateSubscribeTopic CommandProcessorGenerateSubscribeTopicFn
// SubscriberConstructor được sử dụng để tạo người đăng ký cho CommandHandler.
SubscriberConstructor CommandProcessorSubscriberConstructorFn
// OnHandle được gọi trước khi xử lý lệnh.
// OnHandle hoạt động như một middleware: bạn có thể tiêm logic bổ sung trước và sau khi xử lý lệnh.
//
// Do đó, bạn cần gọi params.Handler.Handle() một cách rõ ràng để xử lý lệnh.
// func(params CommandProcessorOnHandleParams) (err error) {
// // logic trước khi xử lý
// // (...)
//
// err := params.Handler.Handle(params.Message.Context(), params.Command)
//
// // logic sau khi xử lý
// // (...)
//
// return err
// }
//
// Tùy chọn này không bắt buộc.
OnHandle CommandProcessorOnHandleFn
// Marshaler được sử dụng để serialize và deserialize các lệnh.
// Cần thiết.
Marshaler CommandEventMarshaler
// Thể hiện của Logger để logging.
// Nếu không được cung cấp, watermill.NopLogger sẽ được sử dụng.
Logger watermill.LoggerAdapter
// Nếu đúng, CommandProcessor sẽ ack các thông điệp ngay cả khi CommandHandler trả về lỗi.
// Nếu RequestReplyBackend không null và gửi reply thất bại, thông điệp vẫn sẽ bị nacked.
//
// Cảnh báo: Không khuyến khích sử dụng tùy chọn này khi sử dụng thành phần requestreply (requestreply.NewCommandHandler hoặc requestreply.NewCommandHandlerWithResult),
// vì nó có thể ack lệnh khi gửi reply thất bại.
//
// Khi sử dụng requestreply, bạn nên sử dụng requestreply.PubSubBackendConfig.AckCommandErrors.
AckCommandHandlingErrors bool
// disableRouterAutoAddHandlers được sử dụng cho tính tương thích ngược lại.
// Nó được thiết lập khi tạo một CommandProcessor với NewCommandProcessor.
// Đã lỗi thời: vui lòng di chuyển sang NewCommandProcessorWithConfig.
disableRouterAutoAddHandlers bool
}
func (c *CommandProcessorConfig) setDefaults() {
if c.Logger == nil {
c.Logger = watermill.NopLogger{}
}
}
// ...
Trình xử lý lệnh
Hoàn thành mã nguồn: github.com/ThreeDotsLabs/watermill/components/cqrs/command_handler.go
// ...
// CommandHandler nhận lệnh được xác định bởi NewCommand và xử lý nó bằng cách sử dụng phương thức Handle.
// Nếu sử dụng DDD, CommandHandler có thể sửa đổi và lưu trữ các tổng hợp.
//
// Khác với EventHandler, mỗi Command chỉ có thể có một CommandHandler.
//
// Trong quá trình xử lý tin nhắn, sử dụng một phiên bản của CommandHandler.
// Khi nhiều lệnh được giao cùng một lúc, phương thức Handle có thể được thực thi nhiều lần đồng thời.
// Do đó, phương thức Handle cần phải an toàn với luồng dữ liệu!
type CommandHandler interface {
// ...
Trình mã hóa lệnh và sự kiện
Hoàn thành mã nguồn: github.com/ThreeDotsLabs/watermill/components/cqrs/marshaler.go
// ...
// CommandEventMarshaler mã hóa lệnh và sự kiện thành các tin nhắn Watermill, và ngược lại.
// Dữ liệu của lệnh cần phải được mã hóa thành []bytes.
type CommandEventMarshaler interface {
// Marshal mã hóa lệnh hoặc sự kiện thành một tin nhắn Watermill.
Marshal(v interface{}) (*message.Message, error)
// Unmarshal giải mã tin nhắn Watermill thành lệnh hoặc sự kiện v.
Unmarshal(msg *message.Message, v interface{}) (err error)
// Name trả về tên của lệnh hoặc sự kiện.
// Tên có thể được sử dụng để xác định xem lệnh hoặc sự kiện nhận được có phải là cái chúng ta muốn xử lý hay không.
Name(v interface{}) string
// NameFromMessage trả về tên của lệnh hoặc sự kiện từ tin nhắn Watermill (được tạo ra bởi Marshal).
//
// Khi chúng ta có lệnh hoặc sự kiện được mã hóa thành các tin nhắn Watermill, chúng ta nên sử dụng NameFromMessage thay vì Name để tránh việc giải mã không cần thiết.
NameFromMessage(msg *message.Message) string
}
// ...
Sử dụng
Ví dụ về Lĩnh vực
Sử dụng một lĩnh vực đơn giản chịu trách nhiệm xử lý đặt phòng trong một khách sạn.
Chúng ta sẽ sử dụng biểu tượng Event Storming để trình bày mô hình của lĩnh vực này.
Ký hiệu huyền thoại:
- Giấy dán màu xanh là lệnh
- Giấy dán màu cam là sự kiện
- Giấy dán màu xanh lá là các mô hình đọc được tạo ra từ các sự kiện theo cách không đồng bộ
- Giấy dán màu tím là các chính sách được kích hoạt bởi sự kiện và tạo ra lệnh
- Giấy dán màu hồng là điểm nóng; chúng ta đánh dấu các vùng thường gặp vấn đề
Lĩnh vực đơn giản như sau:
- Khách hàng có thể đặt phòng.
-
Mỗi khi một phòng được đặt, chúng ta đặt một chai bia cho khách hàng (bởi vì chúng ta yêu thương khách của mình).
- Chúng ta biết rằng đôi khi bia cạn kho.
- Chúng ta tạo ra một báo cáo tài chính dựa trên việc đặt phòng.
Gửi Lệnh
Đầu tiên, chúng ta cần mô phỏng hành động của khách hàng.
Hoàn chỉnh mã nguồn: github.com/ThreeDotsLabs/watermill/_examples/basic/5-cqrs-protobuf
// ...
bookRoomCmd := &BookRoom{
RoomId: fmt.Sprintf("%d", i),
GuestName: "John",
StartDate: startDate,
EndDate: endDate,
}
if err := commandBus.Send(context.Background(), bookRoomCmd); err != nil {
panic(err)
}
// ...
Trình xử lý lệnh
BookRoomHandler
sẽ xử lý các lệnh của chúng ta.
Toàn bộ mã nguồn: github.com/ThreeDotsLabs/watermill/_examples/basic/5-cqrs-protobuf/main.go
// ...
// BookRoomHandler là trình xử lý lệnh xử lý lệnh BookRoom và phát ra sự kiện RoomBooked.
//
// Trong CQRS, một lệnh phải được xử lý bởi một trình xử lý.
// Khi thêm một trình xử lý khác để xử lý lệnh này, sẽ trả về một lỗi.
type BookRoomHandler struct {
eventBus *cqrs.EventBus
}
func (b BookRoomHandler) HandlerName() string {
return "BookRoomHandler"
}
// NewCommand trả về loại lệnh mà trình xử lý này nên xử lý. Nó phải là một con trỏ.
func (b BookRoomHandler) NewCommand() interface{} {
return &BookRoom{}
}
func (b BookRoomHandler) Handle(ctx context.Context, c interface{}) error {
// c luôn là loại được trả về bởi `NewCommand`, nên sự khẳng định kiểu luôn an toàn
cmd := c.(*BookRoom)
// Một số giá cả ngẫu nhiên, có thể được tính toán bằng cách hợp lý hơn trong môi trường sản xuất thực tế
price := (rand.Int63n(40) + 1) * 10
log.Printf(
"Đặt phòng %s, từ %s đến %s",
cmd.RoomId,
cmd.GuestName,
time.Unix(cmd.StartDate.Seconds, int64(cmd.StartDate.Nanos)),
time.Unix(cmd.EndDate.Seconds, int64(cmd.EndDate.Nanos)),
)
// RoomBooked sẽ được xử lý bởi trình xử lý sự kiện OrderBeerOnRoomBooked,
// và trong tương lai, RoomBooked có thể được xử lý bởi nhiều trình xử lý sự kiện
if err := b.eventBus.Publish(ctx, &RoomBooked{
ReservationId: watermill.NewUUID(),
RoomId: cmd.RoomId,
GuestName: cmd.GuestName,
Price: price,
StartDate: cmd.StartDate,
EndDate: cmd.EndDate,
}); err != nil {
return err
}
return nil
}
// OrderBeerOnRoomBooked là một trình xử lý sự kiện xử lý sự kiện RoomBooked và phát ra lệnh OrderBeer.
// ...
Trình xử lý sự kiện
Như đã đề cập trước đó, chúng tôi muốn đặt một chai bia mỗi khi đặt phòng (được gắn nhãn "Khi đặt phòng"). Chúng tôi đạt được điều này bằng cách sử dụng lệnh OrderBeer
.
Toàn bộ mã nguồn: github.com/ThreeDotsLabs/watermill/_examples/basic/5-cqrs-protobuf/main.go
// ...
// OrderBeerOnRoomBooked là một trình xử lý sự kiện xử lý sự kiện RoomBooked và phát ra lệnh OrderBeer.
type OrderBeerOnRoomBooked struct {
commandBus *cqrs.CommandBus
}
func (o OrderBeerOnRoomBooked) HandlerName() string {
// Tên này được truyền vào EventsSubscriberConstructor để tạo ra tên hàng đợi
return "OrderBeerOnRoomBooked"
}
func (OrderBeerOnRoomBooked) NewEvent() interface{} {
return &RoomBooked{}
}
func (o OrderBeerOnRoomBooked) Handle(ctx context.Context, e interface{}) error {
event := e.(*RoomBooked)
orderBeerCmd := &OrderBeer{
RoomId: event.RoomId,
Count: rand.Int63n(10) + 1,
}
return o.commandBus.Send(ctx, orderBeerCmd)
}
// OrderBeerHandler là một trình xử lý lệnh xử lý lệnh OrderBeer và phát ra sự kiện BeerOrdered.
// ...
OrderBeerHandler
rất giống với BookRoomHandler
. Sự khác biệt duy nhất là đôi khi nó trả về một lỗi khi không đủ bia, dẫn đến việc lệnh được phát lại. Bạn có thể tìm thấy triển khai đầy đủ trong mã nguồn ví dụ.
Nhóm Xử lý Sự kiện
Mặc định, mỗi trình xử lý sự kiện có một phiên bản người đăng ký riêng biệt. Cách tiếp cận này hoạt động tốt nếu chỉ có một loại sự kiện được gửi đến chủ đề.
Trong trường hợp có nhiều loại sự kiện trên chủ đề, có hai lựa chọn:
- Bạn có thể thiết lập
EventConfig.AckOnUnknownEvent
thànhtrue
- điều này sẽ xác nhận tất cả các sự kiện không được xử lý bởi trình xử lý. - Bạn có thể sử dụng cơ chế nhóm trình xử lý sự kiện.
Để sử dụng nhóm sự kiện, bạn cần thiết lập các tùy chọn GenerateHandlerGroupSubscribeTopic
và GroupSubscriberConstructor
trong EventConfig
.
Sau đó, bạn có thể sử dụng AddHandlersGroup
trên EventProcessor
.
Mã nguồn đầy đủ: github.com/ThreeDotsLabs/watermill/_examples/basic/5-cqrs-protobuf/main.go
// ...
err = eventProcessor.AddHandlersGroup(
"events",
OrderBeerOnRoomBooked{commandBus},
NewBookingsFinancialReport(),
cqrs.NewGroupEventHandler(func(ctx context.Context, event *BeerOrdered) error {
logger.Info("Bia đã được đặt", watermill.LogFields{
"room_id": event.RoomId,
})
return nil
}),
)
if err != nil {
// ...
Cả GenerateHandlerGroupSubscribeTopic
và GroupSubscriberConstructor
đều nhận thông tin về tên nhóm làm tham số hàm.
Trình Xử Lý Tổng Quát
Bắt đầu từ Watermill phiên bản 1.3, trình xử lý tổng quát có thể được sử dụng để xử lý lệnh và sự kiện. Điều này rất hữu ích khi bạn có một số lượng lớn các lệnh/sự kiện và không muốn tạo một trình xử lý cho mỗi loại.
Mã nguồn đầy đủ: github.com/ThreeDotsLabs/watermill/_examples/basic/5-cqrs-protobuf/main.go
// ...
cqrs.NewGroupEventHandler(func(ctx context.Context, event *BeerOrdered) error {
logger.Info("Bia đã được đặt", watermill.LogFields{
"room_id": event.RoomId,
})
return nil
}),
// ...
Ở phía sau cảnh, nó tạo ra một triển khai Trình xử lý Sự kiện hoặc Trình xử lý Lệnh. Nó phù hợp cho tất cả các loại trình xử lý.
Mã nguồn đầy đủ: github.com/ThreeDotsLabs/watermill/components/cqrs/command_handler.go
// ...
// NewCommandHandler tạo một triển khai Trình xử lý Lệnh mới dựa trên chức năng cung cấp và loại lệnh suy ra từ các tham số chức năng.
func NewCommandHandler[Lệnh bất kỳ](
// ...
Mã nguồn đầy đủ: github.com/ThreeDotsLabs/watermill/components/cqrs/event_handler.go
// ...
// NewEventHandler tạo một triển khai Trình xử lý Sự kiện mới dựa trên chức năng cung cấp và loại sự kiện suy ra từ các tham số chức năng.
func NewEventHandler[T bất kỳ](
// ...
Mã nguồn đầy đủ: github.com/ThreeDotsLabs/watermill/components/cqrs/event_handler.go
// ...
// NewGroupEventHandler tạo một triển khai Trình xử lý Nhóm mới dựa trên chức năng cung cấp và loại sự kiện suy ra từ các tham số chức năng.
func NewGroupEventHandler[T bất kỳ](handleFunc func(ctx context.Context, event *T) error) GroupEventHandler {
// ...
Xây dựng Mô hình Đọc Sử dụng Người xử lý Sự kiện
Toàn bộ mã nguồn: github.com/ThreeDotsLabs/watermill/_examples/basic/5-cqrs-protobuf/main.go
// ...
// BookingsFinancialReport là một mô hình đọc tính toán được bao nhiêu tiền chúng ta có thể kiếm được từ các đặt phòng.
// Đối tượng này lắng nghe sự kiện RoomBooked khi chúng xuất hiện.
//
// Triển khai này đơn giản là ghi vào bộ nhớ. Trong môi trường sản xuất, bạn có thể sử dụng một hình thức lưu trữ cố định nào đó.
type BookingsFinancialReport struct {
handledBookings map[string]struct{}
totalCharge int64
lock sync.Mutex
}
func NewBookingsFinancialReport() *BookingsFinancialReport {
return &BookingsFinancialReport{handledBookings: map[string]struct{}{}}
}
func (b BookingsFinancialReport) HandlerName() string {
// Tên này được chuyển đến EventsSubscriberConstructor và được sử dụng để tạo tên hàng đợi
return "BookingsFinancialReport"
}
func (BookingsFinancialReport) NewEvent() interface{} {
return &RoomBooked{}
}
func (b *BookingsFinancialReport) Handle(ctx context.Context, e interface{}) error {
// Xử lý có thể được gọi đồng thời, vì vậy cần an toàn luồng.
b.lock.Lock()
defer b.lock.Unlock()
event := e.(*RoomBooked)
// Khi sử dụng Pub/Sub không cung cấp bản chuyển phát chính xác một lần, chúng ta cần loại bỏ các thông điệp trùng lặp.
// GoChannel Pub/Sub cung cấp bản chuyển phát chính xác một lần,
// nhưng hãy chuẩn bị ví dụ này cho các triển khai Pub/Sub khác.
if _, ok := b.handledBookings[event.ReservationId]; ok {
return nil
}
b.handledBookings[event.ReservationId] = struct{}{}
b.totalCharge += event.Price
fmt.Printf(">>> Phòng được đặt giá $%d\n", b.totalCharge)
return nil
}
var amqpAddress = "amqp://guest:guest@rabbitmq:5672/"
func main() {
// ...
Kết nối Mọi Thứ
Chúng ta đã có tất cả các thành phần cần thiết để xây dựng một ứng dụng CQRS.
Chúng ta sẽ sử dụng AMQP (RabbitMQ) như trình điều khiển tin nhắn của chúng ta: AMQP.
Bên dưới, CQRS sử dụng bộ định tuyến tin nhắn của Watermill. Nếu bạn không quen thuộc và muốn hiểu cách nó hoạt động, bạn nên xem hướng dẫn bắt đầu. Nó cũng sẽ hướng dẫn bạn cách sử dụng một số mẫu thông điệp tiêu chuẩn như thống kê, hàng đợi thông điệp độc hại, giới hạn tốc độ, tương quan và các công cụ khác được sử dụng bởi mọi ứng dụng dựa trên thông điệp. Những công cụ này đã được tích hợp sẵn vào Watermill.
Hãy quay trở lại với CQRS. Như bạn đã biết, CQRS bao gồm nhiều thành phần như bus lệnh hoặc sự kiện, bộ xử lý, và cũng như vậy.
Toàn bộ mã nguồn: github.com/ThreeDotsLabs/watermill/_examples/basic/5-cqrs-protobuf/main.go
// ...
func main() {
logger := watermill.NewStdLogger(false, false)
cqrsMarshaler := cqrs.ProtobufMarshaler{}
// You can use any Pub/Sub implementation from here: https://watermill.io/pubsubs/
// Detailed RabbitMQ implementation: https://watermill.io/pubsubs/amqp/
// Commands will be send to queue, because they need to be consumed once.
commandsAMQPConfig := amqp.NewDurableQueueConfig(amqpAddress)
commandsPublisher, err := amqp.NewPublisher(commandsAMQPConfig, logger)
if err != nil {
panic(err)
}
commandsSubscriber, err := amqp.NewSubscriber(commandsAMQPConfig, logger)
if err != nil {
panic(err)
}
// Events will be published to PubSub configured Rabbit, because they may be consumed by multiple consumers.
// (in that case BookingsFinancialReport and OrderBeerOnRoomBooked).
eventsPublisher, err := amqp.NewPublisher(amqp.NewDurablePubSubConfig(amqpAddress, nil), logger)
if err != nil {
panic(err)
}
// CQRS is built on messages router. Detailed documentation: https://watermill.io/docs/messages-router/
router, err := message.NewRouter(message.RouterConfig{}, logger)
if err != nil {
panic(err)
}
// Simple middleware which will recover panics from event or command handlers.
// More about router middlewares you can find in the documentation:
// https://watermill.io/docs/messages-router/#middleware
//
// List of available middlewares you can find in message/router/middleware.
router.AddMiddleware(middleware.Recoverer)
commandBus, err := cqrs.NewCommandBusWithConfig(commandsPublisher, cqrs.CommandBusConfig{
GeneratePublishTopic: func(params cqrs.CommandBusGeneratePublishTopicParams) (string, error) {
// we are using queue RabbitMQ config, so we need to have topic per command type
return params.CommandName, nil
},
OnSend: func(params cqrs.CommandBusOnSendParams) error {
logger.Info("Sending command", watermill.LogFields{
"command_name": params.CommandName,
})
params.Message.Metadata.Set("sent_at", time.Now().String())
return nil
},
Marshaler: cqrsMarshaler,
Logger: logger,
})
if err != nil {
panic(err)
}
commandProcessor, err := cqrs.NewCommandProcessorWithConfig(
router,
cqrs.CommandProcessorConfig{
GenerateSubscribeTopic: func(params cqrs.CommandProcessorGenerateSubscribeTopicParams) (string, error) {
// we are using queue RabbitMQ config, so we need to have topic per command type
return params.CommandName, nil
},
SubscriberConstructor: func(params cqrs.CommandProcessorSubscriberConstructorParams) (message.Subscriber, error) {
// we can reuse subscriber, because all commands have separated topics
return commandsSubscriber, nil
},
OnHandle: func(params cqrs.CommandProcessorOnHandleParams) error {
start := time.Now()
err := params.Handler.Handle(params.Message.Context(), params.Command)
logger.Info("Command handled", watermill.LogFields{
"command_name": params.CommandName,
"duration": time.Since(start),
"err": err,
})
return err
},
Marshaler: cqrsMarshaler,
Logger: logger,
},
)
if err != nil {
panic(err)
}
eventBus, err := cqrs.NewEventBusWithConfig(eventsPublisher, cqrs.EventBusConfig{
GeneratePublishTopic: func(params cqrs.GenerateEventPublishTopicParams) (string, error) {
// because we are using PubSub RabbitMQ config, we can use one topic for all events
return "events", nil
// we can also use topic per event type
// return params.EventName, nil
},
OnPublish: func(params cqrs.OnEventSendParams) error {
logger.Info("Publishing event", watermill.LogFields{
"event_name": params.EventName,
})
params.Message.Metadata.Set("published_at", time.Now().String())
return nil
},
Marshaler: cqrsMarshaler,
Logger: logger,
})
if err != nil {
panic(err)
}
eventProcessor, err := cqrs.NewEventGroupProcessorWithConfig(
router,
cqrs.EventGroupProcessorConfig{
GenerateSubscribeTopic: func(params cqrs.EventGroupProcessorGenerateSubscribeTopicParams) (string, error) {
return "events", nil
},
SubscriberConstructor: func(params cqrs.EventGroupProcessorSubscriberConstructorParams) (message.Subscriber, error) {
config := amqp.NewDurablePubSubConfig(
amqpAddress,
amqp.GenerateQueueNameTopicNameWithSuffix(params.EventGroupName),
)
return amqp.NewSubscriber(config, logger)
},
OnHandle: func(params cqrs.EventGroupProcessorOnHandleParams) error {
start := time.Now()
err := params.Handler.Handle(params.Message.Context(), params.Event)
logger.Info("Event handled", watermill.LogFields{
"event_name": params.EventName,
"duration": time.Since(start),
"err": err,
})
return err
},
Marshaler: cqrsMarshaler,
Logger: logger,
},
)
if err != nil {
panic(err)
}
err = commandProcessor.AddHandlers(
BookRoomHandler{eventBus},
OrderBeerHandler{eventBus},
)
if err != nil {
panic(err)
}
err = eventProcessor.AddHandlersGroup(
"events",
OrderBeerOnRoomBooked{commandBus},
NewBookingsFinancialReport(),
cqrs.NewGroupEventHandler(func(ctx context.Context, event *BeerOrdered) error {
logger.Info("Beer ordered", watermill.LogFields{
"room_id": event.RoomId,
})
return nil
}),
)
if err != nil {
panic(err)
}
// publish BookRoom commands every second to simulate incoming traffic
go publishCommands(commandBus)
// processors are based on router, so they will work when router will start
if err := router.Run(context.Background()); err != nil {
panic(err)
}
}
// ...
Đó là tất cả. Chúng ta có một ứng dụng CQRS có thể chạy được.