Mecanismo CQRS

CQRS significa "Command Query Responsibility Segregation" (Separação de Responsabilidades de Comando e Consulta). Ele separa a responsabilidade de comandos (solicitações de escrita) e consultas (solicitações de leitura). As solicitações de escrita e leitura são tratadas por objetos diferentes.

Isso é CQRS. Podemos separar ainda mais o armazenamento de dados, tendo armazenamento de leitura e escrita separados. Depois que isso é feito, pode haver muitos armazenamentos de leitura otimizados para lidar com diferentes tipos de consultas ou abranger muitos contextos limitados. Embora o armazenamento de leitura/escrita separado seja frequentemente o tópico de discussão relacionado ao CQRS, ele não é o próprio CQRS. CQRS é apenas a primeira separação de comando e consulta.

Diagrama de Arquitetura CQRS

O componente cqrs fornece algumas abstrações úteis, construídas em cima de Pub/Sub e Router, para ajudar na implementação do padrão CQRS.

Você não precisa implementar todo o CQRS. Normalmente, apenas a parte de eventos do componente é usada para construir aplicativos orientados a eventos.

Blocos de Construção

Eventos

Eventos representam algo que já aconteceu. Os eventos são imutáveis.

Barramento de Eventos

Código fonte completo: github.com/ThreeDotsLabs/watermill/components/cqrs/event_bus.go

// ...
// EventBus transporta eventos para manipuladores de eventos.
type EventBus struct {
// ...

Código fonte completo: github.com/ThreeDotsLabs/watermill/components/cqrs/event_bus.go

// ...
type EventBusConfig struct {
    // GeneratePublishTopic é usado para gerar o nome do tópico para publicação de eventos.

    GeneratePublishTopic GenerateEventPublishTopicFn

    // OnPublish é chamado antes de enviar o evento. Pode modificar *message.Message.

    // Essa opção não é obrigatória.
    OnPublish OnEventSendFn

    // Marshaler é usado para codificar e decodificar eventos.
    // Isso é obrigatório.
    Marshaler CommandEventMarshaler

    // Instância Logger para logging. Se não fornecido, watermill.NopLogger é usado.
    Logger watermill.LoggerAdapter
}

func (c *EventBusConfig) setDefaults() {
    if c.Logger == nil {
        c.Logger = watermill.NopLogger{}
    }
}
// ...

Processador de Eventos

Código completo: github.com/ThreeDotsLabs/watermill/components/cqrs/event_processor.go

// ...
// EventProcessor é usado para determinar qual EventHandler deve lidar com os eventos recebidos do barramento de eventos.
type EventProcessor struct {
// ...

Código completo: github.com/ThreeDotsLabs/watermill/components/cqrs/event_processor.go

// ...
type EventProcessorConfig struct {
	// GenerateSubscribeTopic é usado para gerar o tópico para se inscrever nos eventos.
	// Se o processador de eventos usar grupos de manipuladores, então GenerateSubscribeTopic será usado.
	GenerateSubscribeTopic EventProcessorGenerateSubscribeTopicFn

	// SubscriberConstructor é usado para criar um assinante para o EventHandler.
	//
	// Esta função é chamada uma vez para cada instância de EventHandler.
	// Se você deseja reutilizar um assinante para vários manipuladores, use GroupEventProcessor.
	SubscriberConstructor EventProcessorSubscriberConstructorFn

	// OnHandle é chamado antes de lidar com o evento.
	// OnHandle funciona de forma semelhante a um middleware: você pode injetar lógica adicional antes e depois de lidar com o evento.
	//
	// Portanto, é necessário chamar explicitamente params.Handler.Handle() para lidar com o evento.
	//
	//  func(params EventProcessorOnHandleParams) (err error) {
	//      // Lógica antes de lidar
	//      //  (...)

	//      err := params.Handler.Handle(params.Message.Context(), params.Event)
	//
	//      // Lógica depois de lidar
	//      //  (...)

	//      return err
	//  }
	//
	// Esta opção não é obrigatória.
	OnHandle EventProcessorOnHandleFn

	// AckOnUnknownEvent é usado para determinar se a mensagem deve ser reconhecida quando o evento não tem um manipulador definido.
	AckOnUnknownEvent bool

	// Marshaler é usado para serializar e desserializar eventos.
	// Obrigatório.
	Marshaler CommandEventMarshaler

	// Instância de logger para registro.
	// Se não fornecido, watermill.NopLogger será usado.
	Logger watermill.LoggerAdapter

	// disableRouterAutoAddHandlers é para manter a compatibilidade com versões anteriores.
	// Este valor será definido ao criar um EventProcessor usando NewEventProcessor.
	// Depreciado: migrar para NewEventProcessorWithConfig.
	disableRouterAutoAddHandlers bool
}

func (c *EventProcessorConfig) setDefaults() {
	if c.Logger == nil {
		c.Logger = watermill.NopLogger{}
	}
}
// ...

Processador de Grupo de Eventos

Código-fonte completo: github.com/ThreeDotsLabs/watermill/components/cqrs/event_processor_group.go

// ...
// EventGroupProcessor determina qual processador de eventos deve lidar com os eventos recebidos do barramento de eventos.
// Comparado com EventProcessor, o EventGroupProcessor permite que vários processadores compartilhem a mesma instância de assinante.
type EventGroupProcessor struct {
// ...

Código-fonte completo: github.com/ThreeDotsLabs/watermill/components/cqrs/event_processor_group.go

// ...
type EventGroupProcessorConfig struct {
	// GenerateSubscribeTopic é usada para gerar o tópico para se inscrever nos processadores de grupo de eventos.
	// Esta opção é necessária para EventProcessor ao usar grupos de processadores.
	GenerateSubscribeTopic EventGroupProcessorGenerateSubscribeTopicFn

	// SubscriberConstructor é usada para criar um assinante para GroupEventHandler.
	// Esta função é chamada uma vez por grupo de eventos - permitindo a criação de uma assinatura para cada grupo.
	// É muito útil quando queremos lidar com eventos de um fluxo em ordem.
	SubscriberConstructor EventGroupProcessorSubscriberConstructorFn

	// OnHandle é chamada antes de lidar com o evento.
	// OnHandle é semelhante a um middleware: você pode injetar lógica adicional antes e depois de lidar com o evento.
	//
	// Portanto, você precisa chamar explicitamente params.Handler.Handle() para lidar com o evento.
	//
	// func(params EventGroupProcessorOnHandleParams) (err error) {
	//     // Lógica antes de lidar com
	//     //  (...)
	//
	//     err := params.Handler.Handle(params.Message.Context(), params.Event)
	//
	//     // Lógica depois de lidar com
	//     //  (...)
	//
	//     return err
	// }
	//
	// Esta opção não é obrigatória.
	OnHandle EventGroupProcessorOnHandleFn

	// AckOnUnknownEvent é usada para determinar se deve reconhecer se o evento não tem manipulador definido.
	AckOnUnknownEvent bool

	// Marshaler é usada para codificar e decodificar eventos.
	// Isto é necessário.
	Marshaler CommandEventMarshaler

	// Instância de Logger usada para fazer log.
	// Se não fornecido, watermill.NopLogger será usado.
	Logger watermill.LoggerAdapter
}

func (c *EventGroupProcessorConfig) setDefaults() {
	if c.Logger == nil {
		c.Logger = watermill.NopLogger{}
	}
}
// ...

Saiba mais sobre o Processador de Grupo de Eventos.

Manipulador de Eventos

Código-fonte completo: github.com/ThreeDotsLabs/watermill/components/cqrs/event_handler.go

// ...
// EventHandler recebe eventos definidos por NewEvent e os manipula usando seu método Handle.
// Se estiver usando DDD, o manipulador de eventos pode modificar e persistir agregações.
// Também pode invocar gerenciadores de processos, sagas ou apenas construir modelos de leitura.
//
// Ao contrário dos manipuladores de comandos, cada evento pode ter vários manipuladores de eventos.
//
// Durante o tratamento de mensagens, use uma instância de EventHandler.
// Ao passar vários eventos simultaneamente, o método Handle pode ser executado várias vezes de forma concorrente.
// Portanto, o método Handle precisa ser seguro para threads!
type EventHandler interface {
// ...

Comando

Um comando é uma estrutura de dados simples que representa uma solicitação para realizar alguma operação.

Barramento de Comando

Código-fonte completo: github.com/ThreeDotsLabs/watermill/components/cqrs/command_bus.go

// ...
// O CommandBus é o componente que transporta comandos para manipuladores de comandos.
type CommandBus struct {
// ...

Código-fonte completo: github.com/ThreeDotsLabs/watermill/components/cqrs/command_bus.go

// ...
type CommandBusConfig struct {
	// GeneratePublishTopic é usado para gerar o tópico para publicação de comandos.
	GeneratePublishTopic CommandBusGeneratePublishTopicFn

	// OnSend é chamado antes de publicar um comando.
	// *message.Message pode ser modificado.
	//
	// Esta opção não é obrigatória.
	OnSend CommandBusOnSendFn

	// O Marshaler é usado para serializar e desserializar comandos.
	// Necessário.
	Marshaler CommandEventMarshaler

	// Instância de Logger usada para registro.
	// Se não fornecida, watermill.NopLogger será usada.
	Logger watermill.LoggerAdapter
}

func (c *CommandBusConfig) setDefaults() {
	if c.Logger == nil {
		c.Logger = watermill.NopLogger{}
	}
}
// ...

Processador de Comando

Código-fonte completo: github.com/ThreeDotsLabs/watermill/components/cqrs/command_processor.go

// ...
// CommandProcessorSubscriberConstructorFn é usada para criar um assinante para o manipulador de comandos.
// Isso permite criar um assinante personalizado separado para cada manipulador de comandos.
type CommandProcessorSubscriberConstructorFn func(CommandProcessorSubscriberConstructorParams) (message.Subscriber, error)
// ...

Código-fonte completo: github.com/ThreeDotsLabs/watermill/components/cqrs/command_processor.go

// ...
type CommandProcessorConfig struct {
	// GenerateSubscribeTopic é usado para gerar o tópico para assinar comandos.
	GenerateSubscribeTopic CommandProcessorGenerateSubscribeTopicFn

	// SubscriberConstructor é usado para criar um assinante para o manipulador de comandos.
	SubscriberConstructor CommandProcessorSubscriberConstructorFn

	// OnHandle é chamado antes de manipular o comando.
	// OnHandle funciona como um middleware: você pode injetar lógica adicional antes e depois de manipular o comando.
	//
	// Por causa disso, é necessário chamar params.Handler.Handle() explicitamente para manipular o comando.
	//  func(params CommandProcessorOnHandleParams) (err error) {
	//      // lógica antes da manipulação
	//      // (...)
	//
	//      err := params.Handler.Handle(params.Message.Context(), params.Command)
	//
	//      // lógica depois da manipulação
	//      // (...)
	//
	//      return err
	//  }
	//
	// Esta opção não é obrigatória.
	OnHandle CommandProcessorOnHandleFn

	// O Marshaler é usado para serialização e desserialização de comandos.
	// Necessário.
	Marshaler CommandEventMarshaler

	// Instância de Logger para registro.
	// Se não fornecida, watermill.NopLogger será usada.
	Logger watermill.LoggerAdapter

	// Se verdadeiro, o CommandProcessor irá ack mensagens mesmo se o CommandHandler retornar um erro.
	// Se o RequestReplyBackend não for nulo e o envio da resposta falhar, a mensagem ainda será nacked.
	//
	// Aviso: Não é recomendado usar esta opção ao usar o componente requestreply (requestreply.NewCommandHandler ou requestreply.NewCommandHandlerWithResult),
	// pois pode ack o comando quando o envio da resposta falhar.
	//
	// Ao usar requestreply, você deve usar requestreply.PubSubBackendConfig.AckCommandErrors.
	AckCommandHandlingErrors bool

	// disableRouterAutoAddHandlers é usado para compatibilidade com versões anteriores.
	// É definido ao criar um CommandProcessor com NewCommandProcessor.
	// Depreciado: migre para NewCommandProcessorWithConfig, por favor.
	disableRouterAutoAddHandlers bool
}

func (c *CommandProcessorConfig) setDefaults() {
	if c.Logger == nulo {
		c.Logger = watermill.NopLogger{}
	}
}
// ...

Processador de Comando

Código-fonte completo: github.com/ThreeDotsLabs/watermill/components/cqrs/command_handler.go

// ...
// CommandHandler recebe o comando definido por NewCommand e o manipula usando o método Handle.
// Se estiver usando DDD, CommandHandler pode modificar e persistir agregados.
//
// Diferentemente de EventHandler, cada comando pode ter apenas um CommandHandler.
//
// Durante o tratamento de mensagens, use uma instância de CommandHandler.
// Quando vários comandos são entregues simultaneamente, o método Handle pode ser executado várias vezes simultaneamente.
// Portanto, o método Handle precisa ser thread-safe!
type CommandHandler interface {
// ...

Codificador e Decodificador de Comando e Evento

Código-fonte completo: github.com/ThreeDotsLabs/watermill/components/cqrs/marshaler.go

// ...
// CommandEventMarshaler codifica comandos e eventos em mensagens Watermill e vice-versa.
// A carga útil do comando precisa ser codificada em []bytes.
type CommandEventMarshaler interface {
	// Marshal codifica o comando ou evento em uma mensagem Watermill.
	Marshal(v interface{}) (*message.Message, error)

	// Unmarshal decodifica a mensagem Watermill no comando ou evento v.
	Unmarshal(msg *message.Message, v interface{}) (err error)

	// Name retorna o nome do comando ou evento.
	// O nome pode ser usado para determinar se o comando ou evento recebido é o que queremos processar.
	Name(v interface{}) string

	// NameFromMessage retorna o nome do comando ou evento da mensagem Watermill (gerado por Marshal).
	//
	// Quando temos comandos ou eventos codificados em mensagens Watermill, devemos usar NameFromMessage em vez de Name para evitar decodificações desnecessárias.
	NameFromMessage(msg *message.Message) string
}
// ...

Uso

Exemplo de Domínio

Usando um domínio simples que é responsável por lidar com reservas de quartos em um hotel.

Vamos usar os símbolos de Event Storming para mostrar o modelo deste domínio.

Legenda dos símbolos:

  • As notas adesivas azuis são comandos
  • As notas adesivas laranjas são eventos
  • As notas adesivas verdes são modelos de leitura gerados de forma assíncrona a partir de eventos
  • As notas adesivas roxas são políticas acionadas por eventos e que geram comandos
  • As notas adesivas rosas são pontos de interesse; marcamos áreas que frequentemente encontram problemas

CQRS Event Storming

O domínio é simples:

  • Os clientes podem reservar quartos.
  • Sempre que um quarto é reservado, nós pedimos uma garrafa de cerveja para o cliente (porque amamos nossos hóspedes).
    • Sabemos que às vezes a cerveja acaba.
  • Geramos um relatório financeiro com base na reserva.

Envio de Comandos

Primeiramente, precisamos simular ações do cliente.

Código-fonte completo: github.com/ThreeDotsLabs/watermill/_examples/basic/5-cqrs-protobuf

// ...
		bookRoomCmd := &BookRoom{
			RoomId:    fmt.Sprintf("%d", i),
			GuestName: "John",
			StartDate: startDate,
			EndDate:   endDate,
		}
		if err := commandBus.Send(context.Background(), bookRoomCmd); err != nil {
			panic(err)
		}
// ...

Manipulador de Comando

O BookRoomHandler irá lidar com os nossos comandos.

Código-fonte completo: github.com/ThreeDotsLabs/watermill/_examples/basic/5-cqrs-protobuf/main.go

// ...
// BookRoomHandler é um manipulador de comandos que processa o comando BookRoom e emite o evento RoomBooked.
//
// Em CQRS, um comando deve ser processado por um manipulador.
// Ao adicionar outro manipulador para lidar com este comando, será retornado um erro.
type BookRoomHandler struct {
	eventBus *cqrs.EventBus
}

func (b BookRoomHandler) HandlerName() string {
	return "BookRoomHandler"
}

// NewCommand retorna o tipo de comando que este manipulador deve processar. Deve ser um ponteiro.
func (b BookRoomHandler) NewCommand() interface{} {
	return &BookRoom{}
}

func (b BookRoomHandler) Handle(ctx context.Context, c interface{}) error {
	// c é sempre o tipo retornado por `NewCommand`, então a assertiva de tipo é sempre segura
	cmd := c.(*BookRoom)

// Algum preço aleatório, que pode ser calculado de forma mais sensata na produção real
price := (rand.Int63n(40) + 1) * 10

	log.Printf(
		"Reservado %s, de %s para %s",
		cmd.RoomId,
		cmd.GuestName,
		time.Unix(cmd.StartDate.Seconds, int64(cmd.StartDate.Nanos)),
		time.Unix(cmd.EndDate.Seconds, int64(cmd.EndDate.Nanos)),
	)

	// RoomBooked será manipulado pelo manipulador de evento OrderBeerOnRoomBooked,
	// e no futuro, RoomBooked pode ser manipulado por vários manipuladores de evento
	if err := b.eventBus.Publish(ctx, &RoomBooked{
		ReservationId: watermill.NewUUID(),
		RoomId:        cmd.RoomId,
		GuestName:     cmd.GuestName,
		Price:         price,
		StartDate:     cmd.StartDate,
		EndDate:       cmd.EndDate,
	}); err != nil {
		return err
	}

	return nil
}

// OrderBeerOnRoomBooked é um manipulador de evento que processa o evento RoomBooked e emite o comando OrderBeer.
// ...

Manipuladores de Evento

Como mencionado anteriormente, queremos encomendar uma garrafa de cerveja sempre que um quarto for reservado (rotulado com "Quando o quarto for reservado"). Conseguimos isso usando o comando OrderBeer.

Código-fonte completo: github.com/ThreeDotsLabs/watermill/_examples/basic/5-cqrs-protobuf/main.go

// ...
// OrderBeerOnRoomBooked é um manipulador de evento que processa o evento RoomBooked e emite o comando OrderBeer.
type OrderBeerOnRoomBooked struct {
	commandBus *cqrs.CommandBus
}

func (o OrderBeerOnRoomBooked) HandlerName() string {
// Este nome é passado para EventsSubscriberConstructor para a geração do nome da fila
	return "OrderBeerOnRoomBooked"
}

func (OrderBeerOnRoomBooked) NewEvent() interface{} {
	return &RoomBooked{}
}

func (o OrderBeerOnRoomBooked) Handle(ctx context.Context, e interface{}) error {
	event := e.(*RoomBooked)

	orderBeerCmd := &OrderBeer{
		RoomId: event.RoomId,
		Count:  rand.Int63n(10) + 1,
	}

	return o.commandBus.Send(ctx, orderBeerCmd)
}

// OrderBeerHandler é um manipulador de comando que processa o comando OrderBeer e emite o evento BeerOrdered.
// ...

OrderBeerHandler é muito semelhante ao BookRoomHandler. A única diferença é que às vezes retorna um erro quando não há cerveja suficiente, fazendo com que o comando seja reemitido. Você pode encontrar a implementação completa no código-fonte de exemplo.

Grupos de Manipuladores de Eventos

Por padrão, cada manipulador de eventos tem uma instância de assinante separada. Esta abordagem funciona bem se apenas um tipo de evento é enviado para o tópico.

No caso de vários tipos de eventos no tópico, existem duas opções:

  1. Você pode definir EventConfig.AckOnUnknownEvent como true - isso reconhecerá todos os eventos não tratados pelos manipuladores.
  2. Você pode usar o mecanismo de grupo de manipuladores de eventos.

Para usar grupos de eventos, é necessário definir as opções GenerateHandlerGroupSubscribeTopic e GroupSubscriberConstructor em EventConfig.

Em seguida, é possível usar AddHandlersGroup no EventProcessor.

Código-fonte completo: github.com/ThreeDotsLabs/watermill/_examples/basic/5-cqrs-protobuf/main.go

// ...
	err = eventProcessor.AddHandlersGroup(
		"events",
		OrderBeerOnRoomBooked{commandBus},

		NewBookingsFinancialReport(),

		cqrs.NewGroupEventHandler(func(ctx context.Context, event *BeerOrdered) error {
			logger.Info("Cerveja pedida", watermill.LogFields{
				"room_id": event.RoomId,
			})
			return nil
		}),
	)
	if err != nil {
// ...

Tanto GenerateHandlerGroupSubscribeTopic quanto GroupSubscriberConstructor recebem informações sobre o nome do grupo como parâmetros da função.

Manipuladores Genéricos

A partir do Watermill v1.3, manipuladores genéricos podem ser usados para lidar com comandos e eventos. Isso é muito útil quando se tem um grande número de comandos/eventos e não se deseja criar um manipulador para cada um.

Código-fonte completo: github.com/ThreeDotsLabs/watermill/_examples/basic/5-cqrs-protobuf/main.go

// ...
		cqrs.NewGroupEventHandler(func(ctx context.Context, event *BeerOrdered) error {
			logger.Info("Cerveja pedida", watermill.LogFields{
				"room_id": event.RoomId,
			})
			return nil
		}),
// ...

Nos bastidores, isso cria uma implementação de EventHandler ou CommandHandler. É adequado para todos os tipos de manipuladores.

Código-fonte completo: github.com/ThreeDotsLabs/watermill/components/cqrs/command_handler.go

// ...
// NewCommandHandler cria uma nova implementação de CommandHandler com base na função fornecida e no tipo de comando inferido a partir dos parâmetros da função.
func NewCommandHandler[Command any](
// ...

Código-fonte completo: github.com/ThreeDotsLabs/watermill/components/cqrs/event_handler.go

// ...
// NewEventHandler cria uma nova implementação de EventHandler com base na função fornecida e no tipo de evento inferido a partir dos parâmetros da função.
func NewEventHandler[T any](
// ...

Código-fonte completo: github.com/ThreeDotsLabs/watermill/components/cqrs/event_handler.go

// ...
// NewGroupEventHandler cria uma nova implementação de GroupEventHandler com base na função fornecida e no tipo de evento inferido a partir dos parâmetros da função.
func NewGroupEventHandler[T any](handleFunc func(ctx context.Context, event *T) error) GroupEventHandler {
// ...

Construindo um Modelo de Leitura Usando Manipuladores de Eventos

Código-fonte completo: github.com/ThreeDotsLabs/watermill/_examples/basic/5-cqrs-protobuf/main.go

// ...
// BookingsFinancialReport é um modelo de leitura que calcula o quanto podemos ganhar com reservas.
// Ele ouve os eventos de quarto reservado quando eles ocorrem.
//
// Esta implementação escreve simplesmente na memória. Em um ambiente de produção, você pode usar algum tipo de armazenamento persistente.
type BookingsFinancialReport struct {
	handledBookings map[string]struct{}
	totalCharge     int64
	lock            sync.Mutex
}

func NewBookingsFinancialReport() *BookingsFinancialReport {
	return &BookingsFinancialReport{handledBookings: map[string]struct{}{}}
}

func (b BookingsFinancialReport) HandlerName() string {
	// Este nome é passado para o EventsSubscriberConstructor e usado para gerar o nome da fila
	return "BookingsFinancialReport"
}

func (BookingsFinancialReport) NewEvent() interface{} {
	return &RoomBooked{}
}

func (b *BookingsFinancialReport) Handle(ctx context.Context, e interface{}) error {
	// O Handle pode ser chamado concorrentemente, então a segurança de thread é necessária.
	b.lock.Lock()
	defer b.lock.Unlock()

	event := e.(*RoomBooked)

	// Ao usar Pub/Sub que não fornece semântica de entrega exatamente uma vez, precisamos deduplicar mensagens.
	// O Pub/Sub do GoChannel fornece entrega exatamente uma vez,
	// mas vamos preparar este exemplo para outras implementações de Pub/Sub.
	if _, ok := b.handledBookings[event.ReservationId]; ok {
		return nil
	}
	b.handledBookings[event.ReservationId] = struct{}{}

	b.totalCharge += event.Price

	fmt.Printf(">>> Quarto reservado por $%d\n", b.totalCharge)
	return nil
}

var amqpAddress = "amqp://guest:guest@rabbitmq:5672/"

func main() {
// ...

Conectar Tudo

Já temos todos os componentes necessários para construir uma aplicação CQRS.

Vamos usar o AMQP (RabbitMQ) como nosso correio de mensagens: AMQP.

Por baixo dos panos, o CQRS utiliza o roteador de mensagens da Watermill. Se você não está familiarizado com isso e quer entender como funciona, deveria conferir o guia de início. Ele também irá lhe mostrar como usar alguns padrões de mensagens padrão como métricas, filas de mensagens inválidas, limitação de taxa, correlação, e outras ferramentas usadas por toda aplicação orientada a mensagens. Estas ferramentas já estão incorporadas na Watermill.

Vamos voltar ao CQRS. Como você já sabe, o CQRS consiste de vários componentes tais como barramentos de comando ou evento, processadores, e assim por diante.

Código fonte completo: github.com/ThreeDotsLabs/watermill/_examples/basic/5-cqrs-protobuf/main.go

// ...
func main() {
	logger := watermill.NewStdLogger(false, false)
	cqrsMarshaler := cqrs.ProtobufMarshaler{}

	// You can use any Pub/Sub implementation from here: https://watermill.io/pubsubs/
	// Detailed RabbitMQ implementation: https://watermill.io/pubsubs/amqp/
	// Commands will be send to queue, because they need to be consumed once.
	commandsAMQPConfig := amqp.NewDurableQueueConfig(amqpAddress)
	commandsPublisher, err := amqp.NewPublisher(commandsAMQPConfig, logger)
	if err != nil {
		panic(err)
	}
	commandsSubscriber, err := amqp.NewSubscriber(commandsAMQPConfig, logger)
	if err != nil {
		panic(err)
	}

	// Events will be published to PubSub configured Rabbit, because they may be consumed by multiple consumers.
	// (in that case BookingsFinancialReport and OrderBeerOnRoomBooked).
	eventsPublisher, err := amqp.NewPublisher(amqp.NewDurablePubSubConfig(amqpAddress, nil), logger)
	if err != nil {
		panic(err)
	}

	// CQRS is built on messages router. Detailed documentation: https://watermill.io/docs/messages-router/
	router, err := message.NewRouter(message.RouterConfig{}, logger)
	if err != nil {
		panic(err)
	}

	// Simple middleware which will recover panics from event or command handlers.
	// More about router middlewares you can find in the documentation:
	// https://watermill.io/docs/messages-router/#middleware
	//
	// List of available middlewares you can find in message/router/middleware.
	router.AddMiddleware(middleware.Recoverer)

	commandBus, err := cqrs.NewCommandBusWithConfig(commandsPublisher, cqrs.CommandBusConfig{
		GeneratePublishTopic: func(params cqrs.CommandBusGeneratePublishTopicParams) (string, error) {
			// we are using queue RabbitMQ config, so we need to have topic per command type
			return params.CommandName, nil
		},
		OnSend: func(params cqrs.CommandBusOnSendParams) error {
			logger.Info("Sending command", watermill.LogFields{
				"command_name": params.CommandName,
			})

			params.Message.Metadata.Set("sent_at", time.Now().String())

			return nil
		},
		Marshaler: cqrsMarshaler,
		Logger:    logger,
	})
	if err != nil {
		panic(err)
	}

	commandProcessor, err := cqrs.NewCommandProcessorWithConfig(
		router,
		cqrs.CommandProcessorConfig{
			GenerateSubscribeTopic: func(params cqrs.CommandProcessorGenerateSubscribeTopicParams) (string, error) {
				// we are using queue RabbitMQ config, so we need to have topic per command type
				return params.CommandName, nil
			},
			SubscriberConstructor: func(params cqrs.CommandProcessorSubscriberConstructorParams) (message.Subscriber, error) {
				// we can reuse subscriber, because all commands have separated topics
				return commandsSubscriber, nil
			},
			OnHandle: func(params cqrs.CommandProcessorOnHandleParams) error {
				start := time.Now()

				err := params.Handler.Handle(params.Message.Context(), params.Command)

				logger.Info("Command handled", watermill.LogFields{
					"command_name": params.CommandName,
					"duration":     time.Since(start),
					"err":          err,
				})

				return err
			},
			Marshaler: cqrsMarshaler,
			Logger:    logger,
		},
	)
	if err != nil {
		panic(err)
	}

	eventBus, err := cqrs.NewEventBusWithConfig(eventsPublisher, cqrs.EventBusConfig{
		GeneratePublishTopic: func(params cqrs.GenerateEventPublishTopicParams) (string, error) {
			// because we are using PubSub RabbitMQ config, we can use one topic for all events
			return "events", nil

			// we can also use topic per event type
			// return params.EventName, nil
		},

		OnPublish: func(params cqrs.OnEventSendParams) error {
			logger.Info("Publishing event", watermill.LogFields{
				"event_name": params.EventName,
			})

			params.Message.Metadata.Set("published_at", time.Now().String())

			return nil
		},

		Marshaler: cqrsMarshaler,
		Logger:    logger,
	})
	if err != nil {
		panic(err)
	}

	eventProcessor, err := cqrs.NewEventGroupProcessorWithConfig(
		router,
		cqrs.EventGroupProcessorConfig{
			GenerateSubscribeTopic: func(params cqrs.EventGroupProcessorGenerateSubscribeTopicParams) (string, error) {
				return "events", nil
			},
			SubscriberConstructor: func(params cqrs.EventGroupProcessorSubscriberConstructorParams) (message.Subscriber, error) {
				config := amqp.NewDurablePubSubConfig(
					amqpAddress,
					amqp.GenerateQueueNameTopicNameWithSuffix(params.EventGroupName),
				)

				return amqp.NewSubscriber(config, logger)
			},

			OnHandle: func(params cqrs.EventGroupProcessorOnHandleParams) error {
				start := time.Now()

				err := params.Handler.Handle(params.Message.Context(), params.Event)

				logger.Info("Event handled", watermill.LogFields{
					"event_name": params.EventName,
					"duration":   time.Since(start),
					"err":        err,
				})

				return err
			},

			Marshaler: cqrsMarshaler,
			Logger:    logger,
		},
	)
	if err != nil {
		panic(err)
	}

	err = commandProcessor.AddHandlers(
		BookRoomHandler{eventBus},
		OrderBeerHandler{eventBus},
	)
	if err != nil {
		panic(err)
	}

	err = eventProcessor.AddHandlersGroup(
		"events",
		OrderBeerOnRoomBooked{commandBus},

		NewBookingsFinancialReport(),

		cqrs.NewGroupEventHandler(func(ctx context.Context, event *BeerOrdered) error {
			logger.Info("Beer ordered", watermill.LogFields{
				"room_id": event.RoomId,
			})
			return nil
		}),
	)
	if err != nil {
		panic(err)
	}

	// publish BookRoom commands every second to simulate incoming traffic
	go publishCommands(commandBus)

	// processors are based on router, so they will work when router will start
	if err := router.Run(context.Background()); err != nil {
		panic(err)
	}
}
// ...

Isso é tudo. Temos uma aplicação CQRS executável.