Mecanismo CQRS
CQRS significa "Command Query Responsibility Segregation" (Separação de Responsabilidades de Comando e Consulta). Ele separa a responsabilidade de comandos (solicitações de escrita) e consultas (solicitações de leitura). As solicitações de escrita e leitura são tratadas por objetos diferentes.
Isso é CQRS. Podemos separar ainda mais o armazenamento de dados, tendo armazenamento de leitura e escrita separados. Depois que isso é feito, pode haver muitos armazenamentos de leitura otimizados para lidar com diferentes tipos de consultas ou abranger muitos contextos limitados. Embora o armazenamento de leitura/escrita separado seja frequentemente o tópico de discussão relacionado ao CQRS, ele não é o próprio CQRS. CQRS é apenas a primeira separação de comando e consulta.
O componente cqrs
fornece algumas abstrações úteis, construídas em cima de Pub/Sub e Router, para ajudar na implementação do padrão CQRS.
Você não precisa implementar todo o CQRS. Normalmente, apenas a parte de eventos do componente é usada para construir aplicativos orientados a eventos.
Blocos de Construção
Eventos
Eventos representam algo que já aconteceu. Os eventos são imutáveis.
Barramento de Eventos
Código fonte completo: github.com/ThreeDotsLabs/watermill/components/cqrs/event_bus.go
// ...
// EventBus transporta eventos para manipuladores de eventos.
type EventBus struct {
// ...
Código fonte completo: github.com/ThreeDotsLabs/watermill/components/cqrs/event_bus.go
// ...
type EventBusConfig struct {
// GeneratePublishTopic é usado para gerar o nome do tópico para publicação de eventos.
GeneratePublishTopic GenerateEventPublishTopicFn
// OnPublish é chamado antes de enviar o evento. Pode modificar *message.Message.
// Essa opção não é obrigatória.
OnPublish OnEventSendFn
// Marshaler é usado para codificar e decodificar eventos.
// Isso é obrigatório.
Marshaler CommandEventMarshaler
// Instância Logger para logging. Se não fornecido, watermill.NopLogger é usado.
Logger watermill.LoggerAdapter
}
func (c *EventBusConfig) setDefaults() {
if c.Logger == nil {
c.Logger = watermill.NopLogger{}
}
}
// ...
Processador de Eventos
Código completo: github.com/ThreeDotsLabs/watermill/components/cqrs/event_processor.go
// ...
// EventProcessor é usado para determinar qual EventHandler deve lidar com os eventos recebidos do barramento de eventos.
type EventProcessor struct {
// ...
Código completo: github.com/ThreeDotsLabs/watermill/components/cqrs/event_processor.go
// ...
type EventProcessorConfig struct {
// GenerateSubscribeTopic é usado para gerar o tópico para se inscrever nos eventos.
// Se o processador de eventos usar grupos de manipuladores, então GenerateSubscribeTopic será usado.
GenerateSubscribeTopic EventProcessorGenerateSubscribeTopicFn
// SubscriberConstructor é usado para criar um assinante para o EventHandler.
//
// Esta função é chamada uma vez para cada instância de EventHandler.
// Se você deseja reutilizar um assinante para vários manipuladores, use GroupEventProcessor.
SubscriberConstructor EventProcessorSubscriberConstructorFn
// OnHandle é chamado antes de lidar com o evento.
// OnHandle funciona de forma semelhante a um middleware: você pode injetar lógica adicional antes e depois de lidar com o evento.
//
// Portanto, é necessário chamar explicitamente params.Handler.Handle() para lidar com o evento.
//
// func(params EventProcessorOnHandleParams) (err error) {
// // Lógica antes de lidar
// // (...)
// err := params.Handler.Handle(params.Message.Context(), params.Event)
//
// // Lógica depois de lidar
// // (...)
// return err
// }
//
// Esta opção não é obrigatória.
OnHandle EventProcessorOnHandleFn
// AckOnUnknownEvent é usado para determinar se a mensagem deve ser reconhecida quando o evento não tem um manipulador definido.
AckOnUnknownEvent bool
// Marshaler é usado para serializar e desserializar eventos.
// Obrigatório.
Marshaler CommandEventMarshaler
// Instância de logger para registro.
// Se não fornecido, watermill.NopLogger será usado.
Logger watermill.LoggerAdapter
// disableRouterAutoAddHandlers é para manter a compatibilidade com versões anteriores.
// Este valor será definido ao criar um EventProcessor usando NewEventProcessor.
// Depreciado: migrar para NewEventProcessorWithConfig.
disableRouterAutoAddHandlers bool
}
func (c *EventProcessorConfig) setDefaults() {
if c.Logger == nil {
c.Logger = watermill.NopLogger{}
}
}
// ...
Processador de Grupo de Eventos
Código-fonte completo: github.com/ThreeDotsLabs/watermill/components/cqrs/event_processor_group.go
// ...
// EventGroupProcessor determina qual processador de eventos deve lidar com os eventos recebidos do barramento de eventos.
// Comparado com EventProcessor, o EventGroupProcessor permite que vários processadores compartilhem a mesma instância de assinante.
type EventGroupProcessor struct {
// ...
Código-fonte completo: github.com/ThreeDotsLabs/watermill/components/cqrs/event_processor_group.go
// ...
type EventGroupProcessorConfig struct {
// GenerateSubscribeTopic é usada para gerar o tópico para se inscrever nos processadores de grupo de eventos.
// Esta opção é necessária para EventProcessor ao usar grupos de processadores.
GenerateSubscribeTopic EventGroupProcessorGenerateSubscribeTopicFn
// SubscriberConstructor é usada para criar um assinante para GroupEventHandler.
// Esta função é chamada uma vez por grupo de eventos - permitindo a criação de uma assinatura para cada grupo.
// É muito útil quando queremos lidar com eventos de um fluxo em ordem.
SubscriberConstructor EventGroupProcessorSubscriberConstructorFn
// OnHandle é chamada antes de lidar com o evento.
// OnHandle é semelhante a um middleware: você pode injetar lógica adicional antes e depois de lidar com o evento.
//
// Portanto, você precisa chamar explicitamente params.Handler.Handle() para lidar com o evento.
//
// func(params EventGroupProcessorOnHandleParams) (err error) {
// // Lógica antes de lidar com
// // (...)
//
// err := params.Handler.Handle(params.Message.Context(), params.Event)
//
// // Lógica depois de lidar com
// // (...)
//
// return err
// }
//
// Esta opção não é obrigatória.
OnHandle EventGroupProcessorOnHandleFn
// AckOnUnknownEvent é usada para determinar se deve reconhecer se o evento não tem manipulador definido.
AckOnUnknownEvent bool
// Marshaler é usada para codificar e decodificar eventos.
// Isto é necessário.
Marshaler CommandEventMarshaler
// Instância de Logger usada para fazer log.
// Se não fornecido, watermill.NopLogger será usado.
Logger watermill.LoggerAdapter
}
func (c *EventGroupProcessorConfig) setDefaults() {
if c.Logger == nil {
c.Logger = watermill.NopLogger{}
}
}
// ...
Saiba mais sobre o Processador de Grupo de Eventos.
Manipulador de Eventos
Código-fonte completo: github.com/ThreeDotsLabs/watermill/components/cqrs/event_handler.go
// ...
// EventHandler recebe eventos definidos por NewEvent e os manipula usando seu método Handle.
// Se estiver usando DDD, o manipulador de eventos pode modificar e persistir agregações.
// Também pode invocar gerenciadores de processos, sagas ou apenas construir modelos de leitura.
//
// Ao contrário dos manipuladores de comandos, cada evento pode ter vários manipuladores de eventos.
//
// Durante o tratamento de mensagens, use uma instância de EventHandler.
// Ao passar vários eventos simultaneamente, o método Handle pode ser executado várias vezes de forma concorrente.
// Portanto, o método Handle precisa ser seguro para threads!
type EventHandler interface {
// ...
Comando
Um comando é uma estrutura de dados simples que representa uma solicitação para realizar alguma operação.
Barramento de Comando
Código-fonte completo: github.com/ThreeDotsLabs/watermill/components/cqrs/command_bus.go
// ...
// O CommandBus é o componente que transporta comandos para manipuladores de comandos.
type CommandBus struct {
// ...
Código-fonte completo: github.com/ThreeDotsLabs/watermill/components/cqrs/command_bus.go
// ...
type CommandBusConfig struct {
// GeneratePublishTopic é usado para gerar o tópico para publicação de comandos.
GeneratePublishTopic CommandBusGeneratePublishTopicFn
// OnSend é chamado antes de publicar um comando.
// *message.Message pode ser modificado.
//
// Esta opção não é obrigatória.
OnSend CommandBusOnSendFn
// O Marshaler é usado para serializar e desserializar comandos.
// Necessário.
Marshaler CommandEventMarshaler
// Instância de Logger usada para registro.
// Se não fornecida, watermill.NopLogger será usada.
Logger watermill.LoggerAdapter
}
func (c *CommandBusConfig) setDefaults() {
if c.Logger == nil {
c.Logger = watermill.NopLogger{}
}
}
// ...
Processador de Comando
Código-fonte completo: github.com/ThreeDotsLabs/watermill/components/cqrs/command_processor.go
// ...
// CommandProcessorSubscriberConstructorFn é usada para criar um assinante para o manipulador de comandos.
// Isso permite criar um assinante personalizado separado para cada manipulador de comandos.
type CommandProcessorSubscriberConstructorFn func(CommandProcessorSubscriberConstructorParams) (message.Subscriber, error)
// ...
Código-fonte completo: github.com/ThreeDotsLabs/watermill/components/cqrs/command_processor.go
// ...
type CommandProcessorConfig struct {
// GenerateSubscribeTopic é usado para gerar o tópico para assinar comandos.
GenerateSubscribeTopic CommandProcessorGenerateSubscribeTopicFn
// SubscriberConstructor é usado para criar um assinante para o manipulador de comandos.
SubscriberConstructor CommandProcessorSubscriberConstructorFn
// OnHandle é chamado antes de manipular o comando.
// OnHandle funciona como um middleware: você pode injetar lógica adicional antes e depois de manipular o comando.
//
// Por causa disso, é necessário chamar params.Handler.Handle() explicitamente para manipular o comando.
// func(params CommandProcessorOnHandleParams) (err error) {
// // lógica antes da manipulação
// // (...)
//
// err := params.Handler.Handle(params.Message.Context(), params.Command)
//
// // lógica depois da manipulação
// // (...)
//
// return err
// }
//
// Esta opção não é obrigatória.
OnHandle CommandProcessorOnHandleFn
// O Marshaler é usado para serialização e desserialização de comandos.
// Necessário.
Marshaler CommandEventMarshaler
// Instância de Logger para registro.
// Se não fornecida, watermill.NopLogger será usada.
Logger watermill.LoggerAdapter
// Se verdadeiro, o CommandProcessor irá ack mensagens mesmo se o CommandHandler retornar um erro.
// Se o RequestReplyBackend não for nulo e o envio da resposta falhar, a mensagem ainda será nacked.
//
// Aviso: Não é recomendado usar esta opção ao usar o componente requestreply (requestreply.NewCommandHandler ou requestreply.NewCommandHandlerWithResult),
// pois pode ack o comando quando o envio da resposta falhar.
//
// Ao usar requestreply, você deve usar requestreply.PubSubBackendConfig.AckCommandErrors.
AckCommandHandlingErrors bool
// disableRouterAutoAddHandlers é usado para compatibilidade com versões anteriores.
// É definido ao criar um CommandProcessor com NewCommandProcessor.
// Depreciado: migre para NewCommandProcessorWithConfig, por favor.
disableRouterAutoAddHandlers bool
}
func (c *CommandProcessorConfig) setDefaults() {
if c.Logger == nulo {
c.Logger = watermill.NopLogger{}
}
}
// ...
Processador de Comando
Código-fonte completo: github.com/ThreeDotsLabs/watermill/components/cqrs/command_handler.go
// ...
// CommandHandler recebe o comando definido por NewCommand e o manipula usando o método Handle.
// Se estiver usando DDD, CommandHandler pode modificar e persistir agregados.
//
// Diferentemente de EventHandler, cada comando pode ter apenas um CommandHandler.
//
// Durante o tratamento de mensagens, use uma instância de CommandHandler.
// Quando vários comandos são entregues simultaneamente, o método Handle pode ser executado várias vezes simultaneamente.
// Portanto, o método Handle precisa ser thread-safe!
type CommandHandler interface {
// ...
Codificador e Decodificador de Comando e Evento
Código-fonte completo: github.com/ThreeDotsLabs/watermill/components/cqrs/marshaler.go
// ...
// CommandEventMarshaler codifica comandos e eventos em mensagens Watermill e vice-versa.
// A carga útil do comando precisa ser codificada em []bytes.
type CommandEventMarshaler interface {
// Marshal codifica o comando ou evento em uma mensagem Watermill.
Marshal(v interface{}) (*message.Message, error)
// Unmarshal decodifica a mensagem Watermill no comando ou evento v.
Unmarshal(msg *message.Message, v interface{}) (err error)
// Name retorna o nome do comando ou evento.
// O nome pode ser usado para determinar se o comando ou evento recebido é o que queremos processar.
Name(v interface{}) string
// NameFromMessage retorna o nome do comando ou evento da mensagem Watermill (gerado por Marshal).
//
// Quando temos comandos ou eventos codificados em mensagens Watermill, devemos usar NameFromMessage em vez de Name para evitar decodificações desnecessárias.
NameFromMessage(msg *message.Message) string
}
// ...
Uso
Exemplo de Domínio
Usando um domínio simples que é responsável por lidar com reservas de quartos em um hotel.
Vamos usar os símbolos de Event Storming para mostrar o modelo deste domínio.
Legenda dos símbolos:
- As notas adesivas azuis são comandos
- As notas adesivas laranjas são eventos
- As notas adesivas verdes são modelos de leitura gerados de forma assíncrona a partir de eventos
- As notas adesivas roxas são políticas acionadas por eventos e que geram comandos
- As notas adesivas rosas são pontos de interesse; marcamos áreas que frequentemente encontram problemas
O domínio é simples:
- Os clientes podem reservar quartos.
-
Sempre que um quarto é reservado, nós pedimos uma garrafa de cerveja para o cliente (porque amamos nossos hóspedes).
- Sabemos que às vezes a cerveja acaba.
- Geramos um relatório financeiro com base na reserva.
Envio de Comandos
Primeiramente, precisamos simular ações do cliente.
Código-fonte completo: github.com/ThreeDotsLabs/watermill/_examples/basic/5-cqrs-protobuf
// ...
bookRoomCmd := &BookRoom{
RoomId: fmt.Sprintf("%d", i),
GuestName: "John",
StartDate: startDate,
EndDate: endDate,
}
if err := commandBus.Send(context.Background(), bookRoomCmd); err != nil {
panic(err)
}
// ...
Manipulador de Comando
O BookRoomHandler
irá lidar com os nossos comandos.
Código-fonte completo: github.com/ThreeDotsLabs/watermill/_examples/basic/5-cqrs-protobuf/main.go
// ...
// BookRoomHandler é um manipulador de comandos que processa o comando BookRoom e emite o evento RoomBooked.
//
// Em CQRS, um comando deve ser processado por um manipulador.
// Ao adicionar outro manipulador para lidar com este comando, será retornado um erro.
type BookRoomHandler struct {
eventBus *cqrs.EventBus
}
func (b BookRoomHandler) HandlerName() string {
return "BookRoomHandler"
}
// NewCommand retorna o tipo de comando que este manipulador deve processar. Deve ser um ponteiro.
func (b BookRoomHandler) NewCommand() interface{} {
return &BookRoom{}
}
func (b BookRoomHandler) Handle(ctx context.Context, c interface{}) error {
// c é sempre o tipo retornado por `NewCommand`, então a assertiva de tipo é sempre segura
cmd := c.(*BookRoom)
// Algum preço aleatório, que pode ser calculado de forma mais sensata na produção real
price := (rand.Int63n(40) + 1) * 10
log.Printf(
"Reservado %s, de %s para %s",
cmd.RoomId,
cmd.GuestName,
time.Unix(cmd.StartDate.Seconds, int64(cmd.StartDate.Nanos)),
time.Unix(cmd.EndDate.Seconds, int64(cmd.EndDate.Nanos)),
)
// RoomBooked será manipulado pelo manipulador de evento OrderBeerOnRoomBooked,
// e no futuro, RoomBooked pode ser manipulado por vários manipuladores de evento
if err := b.eventBus.Publish(ctx, &RoomBooked{
ReservationId: watermill.NewUUID(),
RoomId: cmd.RoomId,
GuestName: cmd.GuestName,
Price: price,
StartDate: cmd.StartDate,
EndDate: cmd.EndDate,
}); err != nil {
return err
}
return nil
}
// OrderBeerOnRoomBooked é um manipulador de evento que processa o evento RoomBooked e emite o comando OrderBeer.
// ...
Manipuladores de Evento
Como mencionado anteriormente, queremos encomendar uma garrafa de cerveja sempre que um quarto for reservado (rotulado com "Quando o quarto for reservado"). Conseguimos isso usando o comando OrderBeer
.
Código-fonte completo: github.com/ThreeDotsLabs/watermill/_examples/basic/5-cqrs-protobuf/main.go
// ...
// OrderBeerOnRoomBooked é um manipulador de evento que processa o evento RoomBooked e emite o comando OrderBeer.
type OrderBeerOnRoomBooked struct {
commandBus *cqrs.CommandBus
}
func (o OrderBeerOnRoomBooked) HandlerName() string {
// Este nome é passado para EventsSubscriberConstructor para a geração do nome da fila
return "OrderBeerOnRoomBooked"
}
func (OrderBeerOnRoomBooked) NewEvent() interface{} {
return &RoomBooked{}
}
func (o OrderBeerOnRoomBooked) Handle(ctx context.Context, e interface{}) error {
event := e.(*RoomBooked)
orderBeerCmd := &OrderBeer{
RoomId: event.RoomId,
Count: rand.Int63n(10) + 1,
}
return o.commandBus.Send(ctx, orderBeerCmd)
}
// OrderBeerHandler é um manipulador de comando que processa o comando OrderBeer e emite o evento BeerOrdered.
// ...
OrderBeerHandler
é muito semelhante ao BookRoomHandler
. A única diferença é que às vezes retorna um erro quando não há cerveja suficiente, fazendo com que o comando seja reemitido. Você pode encontrar a implementação completa no código-fonte de exemplo.
Grupos de Manipuladores de Eventos
Por padrão, cada manipulador de eventos tem uma instância de assinante separada. Esta abordagem funciona bem se apenas um tipo de evento é enviado para o tópico.
No caso de vários tipos de eventos no tópico, existem duas opções:
- Você pode definir
EventConfig.AckOnUnknownEvent
comotrue
- isso reconhecerá todos os eventos não tratados pelos manipuladores. - Você pode usar o mecanismo de grupo de manipuladores de eventos.
Para usar grupos de eventos, é necessário definir as opções GenerateHandlerGroupSubscribeTopic
e GroupSubscriberConstructor
em EventConfig
.
Em seguida, é possível usar AddHandlersGroup
no EventProcessor
.
Código-fonte completo: github.com/ThreeDotsLabs/watermill/_examples/basic/5-cqrs-protobuf/main.go
// ...
err = eventProcessor.AddHandlersGroup(
"events",
OrderBeerOnRoomBooked{commandBus},
NewBookingsFinancialReport(),
cqrs.NewGroupEventHandler(func(ctx context.Context, event *BeerOrdered) error {
logger.Info("Cerveja pedida", watermill.LogFields{
"room_id": event.RoomId,
})
return nil
}),
)
if err != nil {
// ...
Tanto GenerateHandlerGroupSubscribeTopic
quanto GroupSubscriberConstructor
recebem informações sobre o nome do grupo como parâmetros da função.
Manipuladores Genéricos
A partir do Watermill v1.3, manipuladores genéricos podem ser usados para lidar com comandos e eventos. Isso é muito útil quando se tem um grande número de comandos/eventos e não se deseja criar um manipulador para cada um.
Código-fonte completo: github.com/ThreeDotsLabs/watermill/_examples/basic/5-cqrs-protobuf/main.go
// ...
cqrs.NewGroupEventHandler(func(ctx context.Context, event *BeerOrdered) error {
logger.Info("Cerveja pedida", watermill.LogFields{
"room_id": event.RoomId,
})
return nil
}),
// ...
Nos bastidores, isso cria uma implementação de EventHandler ou CommandHandler. É adequado para todos os tipos de manipuladores.
Código-fonte completo: github.com/ThreeDotsLabs/watermill/components/cqrs/command_handler.go
// ...
// NewCommandHandler cria uma nova implementação de CommandHandler com base na função fornecida e no tipo de comando inferido a partir dos parâmetros da função.
func NewCommandHandler[Command any](
// ...
Código-fonte completo: github.com/ThreeDotsLabs/watermill/components/cqrs/event_handler.go
// ...
// NewEventHandler cria uma nova implementação de EventHandler com base na função fornecida e no tipo de evento inferido a partir dos parâmetros da função.
func NewEventHandler[T any](
// ...
Código-fonte completo: github.com/ThreeDotsLabs/watermill/components/cqrs/event_handler.go
// ...
// NewGroupEventHandler cria uma nova implementação de GroupEventHandler com base na função fornecida e no tipo de evento inferido a partir dos parâmetros da função.
func NewGroupEventHandler[T any](handleFunc func(ctx context.Context, event *T) error) GroupEventHandler {
// ...
Construindo um Modelo de Leitura Usando Manipuladores de Eventos
Código-fonte completo: github.com/ThreeDotsLabs/watermill/_examples/basic/5-cqrs-protobuf/main.go
// ...
// BookingsFinancialReport é um modelo de leitura que calcula o quanto podemos ganhar com reservas.
// Ele ouve os eventos de quarto reservado quando eles ocorrem.
//
// Esta implementação escreve simplesmente na memória. Em um ambiente de produção, você pode usar algum tipo de armazenamento persistente.
type BookingsFinancialReport struct {
handledBookings map[string]struct{}
totalCharge int64
lock sync.Mutex
}
func NewBookingsFinancialReport() *BookingsFinancialReport {
return &BookingsFinancialReport{handledBookings: map[string]struct{}{}}
}
func (b BookingsFinancialReport) HandlerName() string {
// Este nome é passado para o EventsSubscriberConstructor e usado para gerar o nome da fila
return "BookingsFinancialReport"
}
func (BookingsFinancialReport) NewEvent() interface{} {
return &RoomBooked{}
}
func (b *BookingsFinancialReport) Handle(ctx context.Context, e interface{}) error {
// O Handle pode ser chamado concorrentemente, então a segurança de thread é necessária.
b.lock.Lock()
defer b.lock.Unlock()
event := e.(*RoomBooked)
// Ao usar Pub/Sub que não fornece semântica de entrega exatamente uma vez, precisamos deduplicar mensagens.
// O Pub/Sub do GoChannel fornece entrega exatamente uma vez,
// mas vamos preparar este exemplo para outras implementações de Pub/Sub.
if _, ok := b.handledBookings[event.ReservationId]; ok {
return nil
}
b.handledBookings[event.ReservationId] = struct{}{}
b.totalCharge += event.Price
fmt.Printf(">>> Quarto reservado por $%d\n", b.totalCharge)
return nil
}
var amqpAddress = "amqp://guest:guest@rabbitmq:5672/"
func main() {
// ...
Conectar Tudo
Já temos todos os componentes necessários para construir uma aplicação CQRS.
Vamos usar o AMQP (RabbitMQ) como nosso correio de mensagens: AMQP.
Por baixo dos panos, o CQRS utiliza o roteador de mensagens da Watermill. Se você não está familiarizado com isso e quer entender como funciona, deveria conferir o guia de início. Ele também irá lhe mostrar como usar alguns padrões de mensagens padrão como métricas, filas de mensagens inválidas, limitação de taxa, correlação, e outras ferramentas usadas por toda aplicação orientada a mensagens. Estas ferramentas já estão incorporadas na Watermill.
Vamos voltar ao CQRS. Como você já sabe, o CQRS consiste de vários componentes tais como barramentos de comando ou evento, processadores, e assim por diante.
Código fonte completo: github.com/ThreeDotsLabs/watermill/_examples/basic/5-cqrs-protobuf/main.go
// ...
func main() {
logger := watermill.NewStdLogger(false, false)
cqrsMarshaler := cqrs.ProtobufMarshaler{}
// You can use any Pub/Sub implementation from here: https://watermill.io/pubsubs/
// Detailed RabbitMQ implementation: https://watermill.io/pubsubs/amqp/
// Commands will be send to queue, because they need to be consumed once.
commandsAMQPConfig := amqp.NewDurableQueueConfig(amqpAddress)
commandsPublisher, err := amqp.NewPublisher(commandsAMQPConfig, logger)
if err != nil {
panic(err)
}
commandsSubscriber, err := amqp.NewSubscriber(commandsAMQPConfig, logger)
if err != nil {
panic(err)
}
// Events will be published to PubSub configured Rabbit, because they may be consumed by multiple consumers.
// (in that case BookingsFinancialReport and OrderBeerOnRoomBooked).
eventsPublisher, err := amqp.NewPublisher(amqp.NewDurablePubSubConfig(amqpAddress, nil), logger)
if err != nil {
panic(err)
}
// CQRS is built on messages router. Detailed documentation: https://watermill.io/docs/messages-router/
router, err := message.NewRouter(message.RouterConfig{}, logger)
if err != nil {
panic(err)
}
// Simple middleware which will recover panics from event or command handlers.
// More about router middlewares you can find in the documentation:
// https://watermill.io/docs/messages-router/#middleware
//
// List of available middlewares you can find in message/router/middleware.
router.AddMiddleware(middleware.Recoverer)
commandBus, err := cqrs.NewCommandBusWithConfig(commandsPublisher, cqrs.CommandBusConfig{
GeneratePublishTopic: func(params cqrs.CommandBusGeneratePublishTopicParams) (string, error) {
// we are using queue RabbitMQ config, so we need to have topic per command type
return params.CommandName, nil
},
OnSend: func(params cqrs.CommandBusOnSendParams) error {
logger.Info("Sending command", watermill.LogFields{
"command_name": params.CommandName,
})
params.Message.Metadata.Set("sent_at", time.Now().String())
return nil
},
Marshaler: cqrsMarshaler,
Logger: logger,
})
if err != nil {
panic(err)
}
commandProcessor, err := cqrs.NewCommandProcessorWithConfig(
router,
cqrs.CommandProcessorConfig{
GenerateSubscribeTopic: func(params cqrs.CommandProcessorGenerateSubscribeTopicParams) (string, error) {
// we are using queue RabbitMQ config, so we need to have topic per command type
return params.CommandName, nil
},
SubscriberConstructor: func(params cqrs.CommandProcessorSubscriberConstructorParams) (message.Subscriber, error) {
// we can reuse subscriber, because all commands have separated topics
return commandsSubscriber, nil
},
OnHandle: func(params cqrs.CommandProcessorOnHandleParams) error {
start := time.Now()
err := params.Handler.Handle(params.Message.Context(), params.Command)
logger.Info("Command handled", watermill.LogFields{
"command_name": params.CommandName,
"duration": time.Since(start),
"err": err,
})
return err
},
Marshaler: cqrsMarshaler,
Logger: logger,
},
)
if err != nil {
panic(err)
}
eventBus, err := cqrs.NewEventBusWithConfig(eventsPublisher, cqrs.EventBusConfig{
GeneratePublishTopic: func(params cqrs.GenerateEventPublishTopicParams) (string, error) {
// because we are using PubSub RabbitMQ config, we can use one topic for all events
return "events", nil
// we can also use topic per event type
// return params.EventName, nil
},
OnPublish: func(params cqrs.OnEventSendParams) error {
logger.Info("Publishing event", watermill.LogFields{
"event_name": params.EventName,
})
params.Message.Metadata.Set("published_at", time.Now().String())
return nil
},
Marshaler: cqrsMarshaler,
Logger: logger,
})
if err != nil {
panic(err)
}
eventProcessor, err := cqrs.NewEventGroupProcessorWithConfig(
router,
cqrs.EventGroupProcessorConfig{
GenerateSubscribeTopic: func(params cqrs.EventGroupProcessorGenerateSubscribeTopicParams) (string, error) {
return "events", nil
},
SubscriberConstructor: func(params cqrs.EventGroupProcessorSubscriberConstructorParams) (message.Subscriber, error) {
config := amqp.NewDurablePubSubConfig(
amqpAddress,
amqp.GenerateQueueNameTopicNameWithSuffix(params.EventGroupName),
)
return amqp.NewSubscriber(config, logger)
},
OnHandle: func(params cqrs.EventGroupProcessorOnHandleParams) error {
start := time.Now()
err := params.Handler.Handle(params.Message.Context(), params.Event)
logger.Info("Event handled", watermill.LogFields{
"event_name": params.EventName,
"duration": time.Since(start),
"err": err,
})
return err
},
Marshaler: cqrsMarshaler,
Logger: logger,
},
)
if err != nil {
panic(err)
}
err = commandProcessor.AddHandlers(
BookRoomHandler{eventBus},
OrderBeerHandler{eventBus},
)
if err != nil {
panic(err)
}
err = eventProcessor.AddHandlersGroup(
"events",
OrderBeerOnRoomBooked{commandBus},
NewBookingsFinancialReport(),
cqrs.NewGroupEventHandler(func(ctx context.Context, event *BeerOrdered) error {
logger.Info("Beer ordered", watermill.LogFields{
"room_id": event.RoomId,
})
return nil
}),
)
if err != nil {
panic(err)
}
// publish BookRoom commands every second to simulate incoming traffic
go publishCommands(commandBus)
// processors are based on router, so they will work when router will start
if err := router.Run(context.Background()); err != nil {
panic(err)
}
}
// ...
Isso é tudo. Temos uma aplicação CQRS executável.