CQRS Mechanism
CQRS stands for "Command Query Responsibility Segregation". It separates the responsibility of command (write requests) and query (read requests). Write requests and read requests are handled by different objects.
This is CQRS. We can further separate the data storage, having separate read and write storage. Once this is done, there may be many read storages optimized for handling different types of queries or spanning many bounded contexts. Although separate read/write storage is often the topic of discussion related to CQRS, it is not CQRS itself. CQRS is only the first separation of command and query.
The cqrs
component provides some useful abstractions, built on top of Pub/Sub and Router, to help implement the CQRS pattern.
You don't need to implement the entire CQRS. Typically, only the event part of the component is used to build event-driven applications.
Building Blocks
Events
Events represent something that has already happened. Events are immutable.
Event Bus
Full source code: github.com/ThreeDotsLabs/watermill/components/cqrs/event_bus.go
// ...
// EventBus transports events to event handlers.
type EventBus struct {
// ...
Full source code: github.com/ThreeDotsLabs/watermill/components/cqrs/event_bus.go
// ...
type EventBusConfig struct {
// GeneratePublishTopic is used to generate the topic name for publishing events.
GeneratePublishTopic GenerateEventPublishTopicFn
// OnPublish is called before sending the event. It can modify *message.Message.
//
// This option is not mandatory.
OnPublish OnEventSendFn
// Marshaler is used for encoding and decoding events.
// This is mandatory.
Marshaler CommandEventMarshaler
// Logger instance for logging. If not provided, watermill.NopLogger is used.
Logger watermill.LoggerAdapter
}
func (c *EventBusConfig) setDefaults() {
if c.Logger == nil {
c.Logger = watermill.NopLogger{}
}
}
// ...
Event Processor
Full code: github.com/ThreeDotsLabs/watermill/components/cqrs/event_processor.go
// ...
// EventProcessor is used to determine the EventHandler that should handle the events received from the event bus.
type EventProcessor struct {
// ...
Full code: github.com/ThreeDotsLabs/watermill/components/cqrs/event_processor.go
// ...
type EventProcessorConfig struct {
// GenerateSubscribeTopic is used to generate the topic for subscribing to events.
// If the event processor uses handler groups, then GenerateSubscribeTopic is used.
GenerateSubscribeTopic EventProcessorGenerateSubscribeTopicFn
// SubscriberConstructor is used to create a subscriber for the EventHandler.
//
// This function is called once for each EventHandler instance.
// If you want to reuse a subscriber for multiple handlers, use GroupEventProcessor.
SubscriberConstructor EventProcessorSubscriberConstructorFn
// OnHandle is called before handling the event.
// OnHandle works similar to middleware: you can inject additional logic before and after handling the event.
//
// Therefore, you need to explicitly call params.Handler.Handle() to handle the event.
//
// func(params EventProcessorOnHandleParams) (err error) {
// // Logic before handling
// // (...)
// err := params.Handler.Handle(params.Message.Context(), params.Event)
//
// // Logic after handling
// // (...)
// return err
// }
//
// This option is not mandatory.
OnHandle EventProcessorOnHandleFn
// AckOnUnknownEvent is used to determine if the message should be acknowledged when the event has no defined handler.
AckOnUnknownEvent bool
// Marshaler is used to marshal and unmarshal events.
// Required.
Marshaler CommandEventMarshaler
// Logger instance for logging.
// If not provided, watermill.NopLogger will be used.
Logger watermill.LoggerAdapter
// disableRouterAutoAddHandlers is for maintaining backward compatibility.
// This value will be set when creating EventProcessor using NewEventProcessor.
// Deprecated: migrate to NewEventProcessorWithConfig.
disableRouterAutoAddHandlers bool
}
func (c *EventProcessorConfig) setDefaults() {
if c.Logger == nil {
c.Logger = watermill.NopLogger{}
}
}
// ...
Event Group Processor
Full Source Code: github.com/ThreeDotsLabs/watermill/components/cqrs/event_processor_group.go
// ...
// EventGroupProcessor determines which event processor should handle the events received from the event bus.
// Compared to EventProcessor, EventGroupProcessor allows multiple processors to share the same subscriber instance.
type EventGroupProcessor struct {
// ...
Full Source Code: github.com/ThreeDotsLabs/watermill/components/cqrs/event_processor_group.go
// ...
type EventGroupProcessorConfig struct {
// GenerateSubscribeTopic is used to generate the topic for subscribing to group event processors.
// This option is required for EventProcessor when using processor groups.
GenerateSubscribeTopic EventGroupProcessorGenerateSubscribeTopicFn
// SubscriberConstructor is used to create a subscriber for GroupEventHandler.
// This function is called once per event group - allowing a subscription to be created for each group.
// It is very useful when we want to handle events from a stream in order.
SubscriberConstructor EventGroupProcessorSubscriberConstructorFn
// OnHandle is called before handling the event.
// OnHandle is similar to middleware: you can inject additional logic before and after handling the event.
//
// Therefore, you need to explicitly call params.Handler.Handle() to handle the event.
//
// func(params EventGroupProcessorOnHandleParams) (err error) {
// // Logic before handling
// // (...)
//
// err := params.Handler.Handle(params.Message.Context(), params.Event)
//
// // Logic after handling
// // (...)
//
// return err
// }
//
// This option is not required.
OnHandle EventGroupProcessorOnHandleFn
// AckOnUnknownEvent is used to determine whether to acknowledge if the event has no defined handler.
AckOnUnknownEvent bool
// Marshaler is used for encoding and decoding events.
// This is required.
Marshaler CommandEventMarshaler
// Logger instance used for logging.
// If not provided, watermill.NopLogger will be used.
Logger watermill.LoggerAdapter
}
func (c *EventGroupProcessorConfig) setDefaults() {
if c.Logger == nil {
c.Logger = watermill.NopLogger{}
}
}
// ...
Learn more about Event Group Processor.
Event Handler
Full Source Code: github.com/ThreeDotsLabs/watermill/components/cqrs/event_handler.go
// ...
// EventHandler receives events defined by NewEvent and handles them using its Handle method.
// If using DDD, the event handler can modify and persist aggregates.
// It can also invoke process managers, sagas, or just build read models.
//
// Unlike command handlers, each event can have multiple event handlers.
//
// During message handling, use one EventHandler instance.
// When passing multiple events simultaneously, the Handle method can be executed multiple times concurrently.
// Therefore, the Handle method needs to be thread-safe!
type EventHandler interface {
// ...
Command
A command is a simple data structure representing a request to perform some operation.
Command Bus
Complete source code: github.com/ThreeDotsLabs/watermill/components/cqrs/command_bus.go
// ...
// CommandBus is the component that transports commands to command handlers.
type CommandBus struct {
// ...
Complete source code: github.com/ThreeDotsLabs/watermill/components/cqrs/command_bus.go
// ...
type CommandBusConfig struct {
// GeneratePublishTopic is used to generate the topic for publishing commands.
GeneratePublishTopic CommandBusGeneratePublishTopicFn
// OnSend is called before publishing a command.
// *message.Message can be modified.
//
// This option is not obligatory.
OnSend CommandBusOnSendFn
// Marshaler is used for serializing and deserializing commands.
// Required.
Marshaler CommandEventMarshaler
// Logger instance used for logging.
// If not provided, watermill.NopLogger will be used.
Logger watermill.LoggerAdapter
}
func (c *CommandBusConfig) setDefaults() {
if c.Logger == nil {
c.Logger = watermill.NopLogger{}
}
}
// ...
Command Processor
Complete source code: github.com/ThreeDotsLabs/watermill/components/cqrs/command_processor.go
// ...
// CommandProcessorSubscriberConstructorFn is used to create subscriber for CommandHandler.
// It allows you to create separate custom subscriber for each command handler.
type CommandProcessorSubscriberConstructorFn func(CommandProcessorSubscriberConstructorParams) (message.Subscriber, error)
// ...
Complete source code: github.com/ThreeDotsLabs/watermill/components/cqrs/command_processor.go
// ...
type CommandProcessorConfig struct {
// GenerateSubscribeTopic is used to generate the topic for subscribing to commands.
GenerateSubscribeTopic CommandProcessorGenerateSubscribeTopicFn
// SubscriberConstructor is used to create subscriber for CommandHandler.
SubscriberConstructor CommandProcessorSubscriberConstructorFn
// OnHandle is called before handling the command.
// OnHandle works like a middleware: you can inject additional logic before and after handling the command.
//
// Because of this, you need to call params.Handler.Handle() explicitly to handle the command.
// func(params CommandProcessorOnHandleParams) (err error) {
// // logic before handling
// // (...)
//
// err := params.Handler.Handle(params.Message.Context(), params.Command)
//
// // logic after handling
// // (...)
//
// return err
// }
//
// This option is not required.
OnHandle CommandProcessorOnHandleFn
// Marshaler is used for serialization and deserialization of commands.
// Required.
Marshaler CommandEventMarshaler
// Logger instance for logging.
// If not provided, watermill.NopLogger will be used.
Logger watermill.LoggerAdapter
// If true, CommandProcessor will ack messages even if CommandHandler returns an error.
// If RequestReplyBackend is not null and sending reply fails, the message will still be nacked.
//
// Warning: It's not recommended to use this option when using the requestreply component (requestreply.NewCommandHandler or requestreply.NewCommandHandlerWithResult),
// as it may ack the command when sending reply fails.
//
// When using requestreply, you should use requestreply.PubSubBackendConfig.AckCommandErrors.
AckCommandHandlingErrors bool
// disableRouterAutoAddHandlers is used for backward compatibility.
// It is set when creating a CommandProcessor with NewCommandProcessor.
// Deprecated: please migrate to NewCommandProcessorWithConfig.
disableRouterAutoAddHandlers bool
}
func (c *CommandProcessorConfig) setDefaults() {
if c.Logger == nil {
c.Logger = watermill.NopLogger{}
}
}
// ...
Command Processor
Complete source code: github.com/ThreeDotsLabs/watermill/components/cqrs/command_handler.go
// ...
// CommandHandler receives the command defined by NewCommand and handles it using the Handle method.
// If using DDD, CommandHandler may modify and persist aggregates.
//
// Unlike EventHandler, each Command can only have one CommandHandler.
//
// During message handling, use one instance of CommandHandler.
// When multiple commands are delivered simultaneously, the Handle method may be executed multiple times concurrently.
// Therefore, the Handle method needs to be thread-safe!
type CommandHandler interface {
// ...
Command and Event Marshaler
Complete source code: github.com/ThreeDotsLabs/watermill/components/cqrs/marshaler.go
// ...
// CommandEventMarshaler marshals commands and events into Watermill messages, and vice versa.
// The payload of the command needs to be marshaled into []bytes.
type CommandEventMarshaler interface {
// Marshal marshals the command or event into a Watermill message.
Marshal(v interface{}) (*message.Message, error)
// Unmarshal decodes the Watermill message into the v command or event.
Unmarshal(msg *message.Message, v interface{}) (err error)
// Name returns the name of the command or event.
// The name can be used to determine if the received command or event is the one we want to process.
Name(v interface{}) string
// NameFromMessage returns the name of the command or event from the Watermill message (generated by Marshal).
//
// When we have commands or events marshaled into Watermill messages, we should use NameFromMessage instead of Name to avoid unnecessary decoding.
NameFromMessage(msg *message.Message) string
}
// ...
Usage
Example Domain
Using a simple domain that is responsible for handling room reservations in a hotel.
We will use Event Storming symbols to showcase the model of this domain.
Symbol legend:
- Blue sticky notes are commands
- Orange sticky notes are events
- Green sticky notes are read models asynchronously generated from events
- Purple sticky notes are policies triggered by events and generating commands
- Pink sticky notes are hotspots; we mark areas that often encounter problems
The domain is simple:
- Customers can book rooms.
-
Whenever a room is booked, we order a bottle of beer for the customer (because we love our guests).
- We know that sometimes the beer runs out.
- We generate a financial report based on the booking.
Sending Commands
Firstly, we need to simulate customer actions.
Complete source code: github.com/ThreeDotsLabs/watermill/_examples/basic/5-cqrs-protobuf
// ...
bookRoomCmd := &BookRoom{
RoomId: fmt.Sprintf("%d", i),
GuestName: "John",
StartDate: startDate,
EndDate: endDate,
}
if err := commandBus.Send(context.Background(), bookRoomCmd); err != nil {
panic(err)
}
// ...
Command Handler
The BookRoomHandler
will handle our commands.
Complete source code: github.com/ThreeDotsLabs/watermill/_examples/basic/5-cqrs-protobuf/main.go
// ...
// BookRoomHandler is a command handler that processes the BookRoom command and emits the RoomBooked event.
//
// In CQRS, a command must be processed by a handler.
// When adding another handler to handle this command, an error will be returned.
type BookRoomHandler struct {
eventBus *cqrs.EventBus
}
func (b BookRoomHandler) HandlerName() string {
return "BookRoomHandler"
}
// NewCommand returns the type of command this handler should process. It must be a pointer.
func (b BookRoomHandler) NewCommand() interface{} {
return &BookRoom{}
}
func (b BookRoomHandler) Handle(ctx context.Context, c interface{}) error {
// c is always the type returned by `NewCommand`, so type assertion is always safe
cmd := c.(*BookRoom)
// Some random pricing, which may be calculated in a more sensible way in actual production
price := (rand.Int63n(40) + 1) * 10
log.Printf(
"Booked %s, from %s to %s",
cmd.RoomId,
cmd.GuestName,
time.Unix(cmd.StartDate.Seconds, int64(cmd.StartDate.Nanos)),
time.Unix(cmd.EndDate.Seconds, int64(cmd.EndDate.Nanos)),
)
// RoomBooked will be handled by the OrderBeerOnRoomBooked event handler,
// and in the future, RoomBooked can be handled by multiple event handlers
if err := b.eventBus.Publish(ctx, &RoomBooked{
ReservationId: watermill.NewUUID(),
RoomId: cmd.RoomId,
GuestName: cmd.GuestName,
Price: price,
StartDate: cmd.StartDate,
EndDate: cmd.EndDate,
}); err != nil {
return err
}
return nil
}
// OrderBeerOnRoomBooked is an event handler that processes the RoomBooked event and emits the OrderBeer command.
// ...
Event Handlers
As mentioned earlier, we want to order a bottle of beer every time a room is booked (labelled with "When the room is booked"). We achieve this by using the OrderBeer
command.
Complete source code: github.com/ThreeDotsLabs/watermill/_examples/basic/5-cqrs-protobuf/main.go
// ...
// OrderBeerOnRoomBooked is an event handler that processes the RoomBooked event and emits the OrderBeer command.
type OrderBeerOnRoomBooked struct {
commandBus *cqrs.CommandBus
}
func (o OrderBeerOnRoomBooked) HandlerName() string {
// This name is passed to EventsSubscriberConstructor for generating the queue name
return "OrderBeerOnRoomBooked"
}
func (OrderBeerOnRoomBooked) NewEvent() interface{} {
return &RoomBooked{}
}
func (o OrderBeerOnRoomBooked) Handle(ctx context.Context, e interface{}) error {
event := e.(*RoomBooked)
orderBeerCmd := &OrderBeer{
RoomId: event.RoomId,
Count: rand.Int63n(10) + 1,
}
return o.commandBus.Send(ctx, orderBeerCmd)
}
// OrderBeerHandler is a command handler that processes the OrderBeer command and emits the BeerOrdered event.
// ...
OrderBeerHandler
is very similar to BookRoomHandler
. The only difference is that it sometimes returns an error when there is not enough beer, causing the command to be reissued. You can find the complete implementation in the example source code.
Event Handler Groups
By default, each event handler has a separate subscriber instance. This approach works fine if only one event type is sent to the topic.
In the case of multiple event types on the topic, there are two options:
- You can set
EventConfig.AckOnUnknownEvent
totrue
- this will acknowledge all events not handled by handlers. - You can use the event handler group mechanism.
To use event groups, you need to set the GenerateHandlerGroupSubscribeTopic
and GroupSubscriberConstructor
options in EventConfig
.
Then, you can use AddHandlersGroup
on the EventProcessor
.
Complete source code: github.com/ThreeDotsLabs/watermill/_examples/basic/5-cqrs-protobuf/main.go
// ...
err = eventProcessor.AddHandlersGroup(
"events",
OrderBeerOnRoomBooked{commandBus},
NewBookingsFinancialReport(),
cqrs.NewGroupEventHandler(func(ctx context.Context, event *BeerOrdered) error {
logger.Info("Beer ordered", watermill.LogFields{
"room_id": event.RoomId,
})
return nil
}),
)
if err != nil {
// ...
Both GenerateHandlerGroupSubscribeTopic
and GroupSubscriberConstructor
receive information about the group name as function parameters.
Generic Handlers
Starting from Watermill v1.3, generic handlers can be used to handle commands and events. This is very useful when you have a large number of commands/events and do not want to create a handler for each one.
Full source code: github.com/ThreeDotsLabs/watermill/_examples/basic/5-cqrs-protobuf/main.go
// ...
cqrs.NewGroupEventHandler(func(ctx context.Context, event *BeerOrdered) error {
logger.Info("Beer ordered", watermill.LogFields{
"room_id": event.RoomId,
})
return nil
}),
// ...
Behind the scenes, it creates an EventHandler or CommandHandler implementation. It is suitable for all types of handlers.
Full source code: github.com/ThreeDotsLabs/watermill/components/cqrs/command_handler.go
// ...
// NewCommandHandler creates a new CommandHandler implementation based on the provided function and the inferred command type from the function parameters.
func NewCommandHandler[Command any](
// ...
Full source code: github.com/ThreeDotsLabs/watermill/components/cqrs/event_handler.go
// ...
// NewEventHandler creates a new EventHandler implementation based on the provided function and the inferred event type from the function parameters.
func NewEventHandler[T any](
// ...
Full source code: github.com/ThreeDotsLabs/watermill/components/cqrs/event_handler.go
// ...
// NewGroupEventHandler creates a new GroupEventHandler implementation based on the provided function and the inferred event type from the function parameters.
func NewGroupEventHandler[T any](handleFunc func(ctx context.Context, event *T) error) GroupEventHandler {
// ...
Building a Read Model Using Event Handlers
Complete source code: github.com/ThreeDotsLabs/watermill/_examples/basic/5-cqrs-protobuf/main.go
// ...
// BookingsFinancialReport is a read model that calculates how much money we can earn from bookings.
// It listens for RoomBooked events when they occur.
//
// This implementation simply writes to memory. In a production environment, you might use some form of persistent storage.
type BookingsFinancialReport struct {
handledBookings map[string]struct{}
totalCharge int64
lock sync.Mutex
}
func NewBookingsFinancialReport() *BookingsFinancialReport {
return &BookingsFinancialReport{handledBookings: map[string]struct{}{}}
}
func (b BookingsFinancialReport) HandlerName() string {
// This name is passed to EventsSubscriberConstructor and used to generate queue name
return "BookingsFinancialReport"
}
func (BookingsFinancialReport) NewEvent() interface{} {
return &RoomBooked{}
}
func (b *BookingsFinancialReport) Handle(ctx context.Context, e interface{}) error {
// Handle may be called concurrently, so thread safety is needed.
b.lock.Lock()
defer b.lock.Unlock()
event := e.(*RoomBooked)
// When using Pub/Sub that does not provide exactly-once delivery semantics, we need to deduplicate messages.
// GoChannel Pub/Sub provides exactly-once delivery,
// but let's prepare this example for other Pub/Sub implementations.
if _, ok := b.handledBookings[event.ReservationId]; ok {
return nil
}
b.handledBookings[event.ReservationId] = struct{}{}
b.totalCharge += event.Price
fmt.Printf(">>> Room booked for $%d\n", b.totalCharge)
return nil
}
var amqpAddress = "amqp://guest:guest@rabbitmq:5672/"
func main() {
// ...
Connect Everything
We already have all the components needed to build a CQRS application.
We will use AMQP (RabbitMQ) as our message broker: AMQP.
Under the hood, CQRS uses Watermill's message router. If you are not familiar with this and want to understand how it works, you should check out the getting started guide. It will also show you how to use some standard messaging patterns such as metrics, poison message queues, rate limiting, correlation, and other tools used by every message-driven application. These tools are already built into Watermill.
Let's get back to CQRS. As you already know, CQRS consists of multiple components such as command or event buses, processors, and so on.
Complete source code: github.com/ThreeDotsLabs/watermill/_examples/basic/5-cqrs-protobuf/main.go
// ...
func main() {
logger := watermill.NewStdLogger(false, false)
cqrsMarshaler := cqrs.ProtobufMarshaler{}
// You can use any Pub/Sub implementation from here: https://watermill.io/pubsubs/
// Detailed RabbitMQ implementation: https://watermill.io/pubsubs/amqp/
// Commands will be send to queue, because they need to be consumed once.
commandsAMQPConfig := amqp.NewDurableQueueConfig(amqpAddress)
commandsPublisher, err := amqp.NewPublisher(commandsAMQPConfig, logger)
if err != nil {
panic(err)
}
commandsSubscriber, err := amqp.NewSubscriber(commandsAMQPConfig, logger)
if err != nil {
panic(err)
}
// Events will be published to PubSub configured Rabbit, because they may be consumed by multiple consumers.
// (in that case BookingsFinancialReport and OrderBeerOnRoomBooked).
eventsPublisher, err := amqp.NewPublisher(amqp.NewDurablePubSubConfig(amqpAddress, nil), logger)
if err != nil {
panic(err)
}
// CQRS is built on messages router. Detailed documentation: https://watermill.io/docs/messages-router/
router, err := message.NewRouter(message.RouterConfig{}, logger)
if err != nil {
panic(err)
}
// Simple middleware which will recover panics from event or command handlers.
// More about router middlewares you can find in the documentation:
// https://watermill.io/docs/messages-router/#middleware
//
// List of available middlewares you can find in message/router/middleware.
router.AddMiddleware(middleware.Recoverer)
commandBus, err := cqrs.NewCommandBusWithConfig(commandsPublisher, cqrs.CommandBusConfig{
GeneratePublishTopic: func(params cqrs.CommandBusGeneratePublishTopicParams) (string, error) {
// we are using queue RabbitMQ config, so we need to have topic per command type
return params.CommandName, nil
},
OnSend: func(params cqrs.CommandBusOnSendParams) error {
logger.Info("Sending command", watermill.LogFields{
"command_name": params.CommandName,
})
params.Message.Metadata.Set("sent_at", time.Now().String())
return nil
},
Marshaler: cqrsMarshaler,
Logger: logger,
})
if err != nil {
panic(err)
}
commandProcessor, err := cqrs.NewCommandProcessorWithConfig(
router,
cqrs.CommandProcessorConfig{
GenerateSubscribeTopic: func(params cqrs.CommandProcessorGenerateSubscribeTopicParams) (string, error) {
// we are using queue RabbitMQ config, so we need to have topic per command type
return params.CommandName, nil
},
SubscriberConstructor: func(params cqrs.CommandProcessorSubscriberConstructorParams) (message.Subscriber, error) {
// we can reuse subscriber, because all commands have separated topics
return commandsSubscriber, nil
},
OnHandle: func(params cqrs.CommandProcessorOnHandleParams) error {
start := time.Now()
err := params.Handler.Handle(params.Message.Context(), params.Command)
logger.Info("Command handled", watermill.LogFields{
"command_name": params.CommandName,
"duration": time.Since(start),
"err": err,
})
return err
},
Marshaler: cqrsMarshaler,
Logger: logger,
},
)
if err != nil {
panic(err)
}
eventBus, err := cqrs.NewEventBusWithConfig(eventsPublisher, cqrs.EventBusConfig{
GeneratePublishTopic: func(params cqrs.GenerateEventPublishTopicParams) (string, error) {
// because we are using PubSub RabbitMQ config, we can use one topic for all events
return "events", nil
// we can also use topic per event type
// return params.EventName, nil
},
OnPublish: func(params cqrs.OnEventSendParams) error {
logger.Info("Publishing event", watermill.LogFields{
"event_name": params.EventName,
})
params.Message.Metadata.Set("published_at", time.Now().String())
return nil
},
Marshaler: cqrsMarshaler,
Logger: logger,
})
if err != nil {
panic(err)
}
eventProcessor, err := cqrs.NewEventGroupProcessorWithConfig(
router,
cqrs.EventGroupProcessorConfig{
GenerateSubscribeTopic: func(params cqrs.EventGroupProcessorGenerateSubscribeTopicParams) (string, error) {
return "events", nil
},
SubscriberConstructor: func(params cqrs.EventGroupProcessorSubscriberConstructorParams) (message.Subscriber, error) {
config := amqp.NewDurablePubSubConfig(
amqpAddress,
amqp.GenerateQueueNameTopicNameWithSuffix(params.EventGroupName),
)
return amqp.NewSubscriber(config, logger)
},
OnHandle: func(params cqrs.EventGroupProcessorOnHandleParams) error {
start := time.Now()
err := params.Handler.Handle(params.Message.Context(), params.Event)
logger.Info("Event handled", watermill.LogFields{
"event_name": params.EventName,
"duration": time.Since(start),
"err": err,
})
return err
},
Marshaler: cqrsMarshaler,
Logger: logger,
},
)
if err != nil {
panic(err)
}
err = commandProcessor.AddHandlers(
BookRoomHandler{eventBus},
OrderBeerHandler{eventBus},
)
if err != nil {
panic(err)
}
err = eventProcessor.AddHandlersGroup(
"events",
OrderBeerOnRoomBooked{commandBus},
NewBookingsFinancialReport(),
cqrs.NewGroupEventHandler(func(ctx context.Context, event *BeerOrdered) error {
logger.Info("Beer ordered", watermill.LogFields{
"room_id": event.RoomId,
})
return nil
}),
)
if err != nil {
panic(err)
}
// publish BookRoom commands every second to simulate incoming traffic
go publishCommands(commandBus)
// processors are based on router, so they will work when router will start
if err := router.Run(context.Background()); err != nil {
panic(err)
}
}
// ...
That's it. We have a runnable CQRS application.