Mecanismo CQRS

CQRS significa "Segregación de Responsabilidad de Comando y Consulta". Separa la responsabilidad del comando (solicitudes de escritura) y la consulta (solicitudes de lectura). Las solicitudes de escritura y las de lectura son manejadas por objetos diferentes.

Esto es CQRS. Podemos separar aún más el almacenamiento de datos, teniendo un almacenamiento de lectura y escritura separado. Una vez hecho esto, puede haber muchos almacenamientos de lectura optimizados para manejar diferentes tipos de consultas o que abarcan muchos contextos delimitados. Aunque el almacenamiento de lectura/escritura separado a menudo es tema de discusión relacionado con CQRS, no es CQRS en sí mismo. CQRS es solo la primera separación de comando y consulta.

Diagrama de Arquitectura CQRS

El componente cqrs proporciona algunas abstracciones útiles, construidas sobre Pub/Sub y Router, para ayudar a implementar el patrón CQRS.

No es necesario implementar todo el CQRS. Normalmente, solo se usa la parte de eventos del componente para construir aplicaciones basadas en eventos.

Elementos Constructivos

Eventos

Los eventos representan algo que ya ha sucedido. Los eventos son inmutables.

Bus de Eventos

Código fuente completo: github.com/ThreeDotsLabs/watermill/components/cqrs/event_bus.go

// ...
// EventBus transporta eventos a controladores de eventos.
type EventBus struct {
// ...

Código fuente completo: github.com/ThreeDotsLabs/watermill/components/cqrs/event_bus.go

// ...
type EventBusConfig struct {
    // GeneratePublishTopic se utiliza para generar el nombre del tema para publicar eventos.
    GeneratePublishTopic GenerateEventPublishTopicFn

    // OnPublish se llama antes de enviar el evento. Puede modificar *message.Message.
    //
    // Esta opción no es obligatoria.
    OnPublish OnEventSendFn

    // Marshaler se utiliza para codificar y decodificar eventos.
    // Esto es obligatorio.
    Marshaler CommandEventMarshaler

    // Instancia de registro para el registro. Si no se proporciona, se utiliza watermill.NopLogger.
    Logger watermill.LoggerAdapter
}

func (c *EventBusConfig) setDefaults() {
    if c.Logger == nil {
        c.Logger = watermill.NopLogger{}
    }
}
// ...

Procesador de Eventos

Código completo: github.com/ThreeDotsLabs/watermill/components/cqrs/event_processor.go

// ...
// EventProcessor se utiliza para determinar qué EventHandler debe manejar los eventos recibidos del bus de eventos.
type EventProcessor struct {
// ...

Código completo: github.com/ThreeDotsLabs/watermill/components/cqrs/event_processor.go

// ...
type EventProcessorConfig struct {
	// GenerateSubscribeTopic se utiliza para generar el tema para suscribirse a eventos.
	// Si el procesador de eventos utiliza grupos de manejadores, se utiliza GenerateSubscribeTopic.
	GenerateSubscribeTopic EventProcessorGenerateSubscribeTopicFn

	// SubscriberConstructor se utiliza para crear un suscriptor para el EventHandler.
	//
	// Esta función se llama una vez para cada instancia de EventHandler.
	// Si desea reutilizar un suscriptor para varios manejadores, utilice GroupEventProcessor.
	SubscriberConstructor EventProcessorSubscriberConstructorFn

	// OnHandle se llama antes de manejar el evento.
	// OnHandle funciona de manera similar a un middleware: puede inyectar lógica adicional antes y después de manejar el evento.
	//
	// Por lo tanto, es necesario llamar explícitamente a params.Handler.Handle() para manejar el evento.
	//
	//  func(params EventProcessorOnHandleParams) (err error) {
	//      // Lógica antes del manejo
	//      //  (...)

	//      err := params.Handler.Handle(params.Message.Context(), params.Event)
	//
	//      // Lógica después del manejo
	//      //  (...)

	//      return err
	//  }
	//
	// Esta opción no es obligatoria.
	OnHandle EventProcessorOnHandleFn

	// AckOnUnknownEvent se utiliza para determinar si el mensaje debe ser reconocido cuando el evento no tiene un manejador definido.
	AckOnUnknownEvent bool

	// Marshaler se utiliza para serializar y deserializar eventos.
	// Requerido.
	Marshaler CommandEventMarshaler

	// Instancia de Logger para el registro.
	// Si no se proporciona, se utilizará watermill.NopLogger.
	Logger watermill.LoggerAdapter

	// disableRouterAutoAddHandlers es para mantener la compatibilidad con versiones anteriores.
	// Este valor se establecerá al crear EventProcessor utilizando NewEventProcessor.
	// Obsoleto: migrar a NewEventProcessorWithConfig.
	disableRouterAutoAddHandlers bool
}

func (c *EventProcessorConfig) setDefaults() {
	if c.Logger == nil {
		c.Logger = watermill.NopLogger{}
	}
}
// ...

Procesador de Grupo de Eventos

Código fuente completo: github.com/ThreeDotsLabs/watermill/components/cqrs/event_processor_group.go

// ...
// EventGroupProcessor determina qué procesador de eventos debe manejar los eventos recibidos del bus de eventos.
// En comparación con EventProcessor, EventGroupProcessor permite que varios procesadores compartan la misma instancia de suscriptor.
type EventGroupProcessor struct {
// ...

Código fuente completo: github.com/ThreeDotsLabs/watermill/components/cqrs/event_processor_group.go

// ...
type EventGroupProcessorConfig struct {
	// GenerateSubscribeTopic se utiliza para generar el tema para suscribirse a los procesadores de eventos de grupo.
	// Esta opción es necesaria para EventProcessor cuando se utilizan grupos de procesadores.
	GenerateSubscribeTopic EventGroupProcessorGenerateSubscribeTopicFn

	// SubscriberConstructor se utiliza para crear un suscriptor para GroupEventHandler.
	// Esta función se llama una vez por grupo de eventos, lo que permite crear una suscripción para cada grupo.
	// Es muy útil cuando queremos manejar eventos de un flujo en orden.
	SubscriberConstructor EventGroupProcessorSubscriberConstructorFn

	// OnHandle se llama antes de manejar el evento.
	// OnHandle es similar a un middleware: se puede inyectar lógica adicional antes y después de manejar el evento.
	//
	// Por lo tanto, es necesario llamar explícitamente a params.Handler.Handle() para manejar el evento.
	//
	// func(params EventGroupProcessorOnHandleParams) (err error) {
	//     // Lógica antes del manejo
	//     //  (...)
	//
	//     err := params.Handler.Handle(params.Message.Context(), params.Event)
	//
	//     // Lógica después del manejo
	//     //  (...)
	//
	//     return err
	// }
	//
	// Esta opción no es necesaria.
	OnHandle EventGroupProcessorOnHandleFn

	// AckOnUnknownEvent se utiliza para determinar si se debe reconocer si el evento no tiene un manejador definido.
	AckOnUnknownEvent bool

	// Marshaler se utiliza para codificar y decodificar eventos.
	// Esto es necesario.
	Marshaler CommandEventMarshaler

	// Instancia de Logger utilizada para registros.
	// Si no se proporciona, se usará watermill.NopLogger.
	Logger watermill.LoggerAdapter
}

func (c *EventGroupProcessorConfig) setDefaults() {
	if c.Logger == nil {
		c.Logger = watermill.NopLogger{}
	}
}
// ...

Conoce más sobre el Procesador de Grupo de Eventos.

Manejador de Eventos

Código fuente completo: github.com/ThreeDotsLabs/watermill/components/cqrs/event_handler.go

// ...
// EventHandler recibe eventos definidos por NewEvent y los maneja utilizando su método Handle.
// Si se utiliza DDD, el manejador de eventos puede modificar y persistir agregados.
// También puede invocar gestores de procesos, sagas o simplemente construir modelos de lectura.
//
// A diferencia de los manejadores de comandos, cada evento puede tener varios manejadores de eventos.
//
// Durante el manejo de mensajes, se utiliza una instancia de EventHandler.
// Al pasar varios eventos simultáneamente, el método Handle puede ejecutarse varias veces en paralelo.
// ¡Por lo tanto, el método Handle necesita ser seguro para subprocesos!
type EventHandler interface {
// ...

Comando

Un comando es una estructura de datos simple que representa una solicitud para realizar alguna operación.

Bus de Comandos

Código fuente completo: github.com/ThreeDotsLabs/watermill/components/cqrs/command_bus.go

// ...
// CommandBus es el componente que transporta comandos a manejadores de comandos.
type CommandBus struct {
// ...

Código fuente completo: github.com/ThreeDotsLabs/watermill/components/cqrs/command_bus.go

// ...
type CommandBusConfig struct {
	// GeneratePublishTopic se utiliza para generar el tema para publicar comandos.
	GeneratePublishTopic CommandBusGeneratePublishTopicFn

	// OnSend se llama antes de publicar un comando.
	// *message.Message se puede modificar.
	//
	// Esta opción no es obligatoria.
	OnSend CommandBusOnSendFn

	// Marshaler se utiliza para serializar y deserializar comandos.
	// Requerido.
	Marshaler CommandEventMarshaler

	// Instancia de Logger utilizada para el registro.
	// Si no se proporciona, se usará watermill.NopLogger.
	Logger watermill.LoggerAdapter
}

func (c *CommandBusConfig) setDefaults() {
	if c.Logger == nil {
		c.Logger = watermill.NopLogger{}
	}
}
// ...

Procesador de Comandos

Código fuente completo: github.com/ThreeDotsLabs/watermill/components/cqrs/command_processor.go

// ...
// CommandProcessorSubscriberConstructorFn se utiliza para crear un suscriptor para CommandHandler.
// Esto le permite crear un suscriptor personalizado separado para cada manejador de comandos.
type CommandProcessorSubscriberConstructorFn func(CommandProcessorSubscriberConstructorParams) (message.Subscriber, error)
// ...

Código fuente completo: github.com/ThreeDotsLabs/watermill/components/cqrs/command_processor.go

// ...
type CommandProcessorConfig struct {
	// GenerateSubscribeTopic se utiliza para generar el tema para suscribirse a comandos.
	GenerateSubscribeTopic CommandProcessorGenerateSubscribeTopicFn

	// SubscriberConstructor se utiliza para crear un suscriptor para CommandHandler.
	SubscriberConstructor CommandProcessorSubscriberConstructorFn

	// OnHandle se llama antes de manejar el comando.
	// OnHandle funciona como un middleware: puede inyectar lógica adicional antes y después de manejar el comando.
	//
	// Debido a esto, debe llamar explícitamente a params.Handler.Handle() para manejar el comando.
	//  func(params CommandProcessorOnHandleParams) (err error) {
	//      // lógica antes de manejar
	//      // (...)
	//
	//      err := params.Handler.Handle(params.Message.Context(), params.Command)
	//
	//      // lógica después de manejar
	//      // (...)
	//
	//      return err
	//  }
	//
	// Esta opción no es obligatoria.
	OnHandle CommandProcessorOnHandleFn

	// Marshaler se utiliza para la serialización y deserialización de comandos.
	// Requerido.
	Marshaler CommandEventMarshaler

	// Instancia de Logger para el registro.
	// Si no se proporciona, se usará watermill.NopLogger.
	Logger watermill.LoggerAdapter

	// Si es verdadero, CommandProcessor confirmará los mensajes incluso si CommandHandler devuelve un error.
	// Si RequestReplyBackend no es nulo y el envío de la respuesta falla, el mensaje seguirá siendo rechazado.
	//
	// Advertencia: no se recomienda usar esta opción al usar el componente requestreply (requestreply.NewCommandHandler o requestreply.NewCommandHandlerWithResult),
	// ya que puede confirmar el comando cuando falla el envío de la respuesta.
	//
	// Al usar requestreply, debe usar requestreply.PubSubBackendConfig.AckCommandErrors.
	AckCommandHandlingErrors bool

	// disableRouterAutoAddHandlers se utiliza por razones de compatibilidad hacia atrás.
	// Se establece al crear un CommandProcessor con NewCommandProcessor.
	// Obsoleto: por favor migre a NewCommandProcessorWithConfig.
	disableRouterAutoAddHandlers bool
}

func (c *CommandProcessorConfig) setDefaults() {
	if c.Logger == nil {
		c.Logger = watermill.NopLogger{}
	}
}
// ...

Procesador de Comandos

Código fuente completo: github.com/ThreeDotsLabs/watermill/components/cqrs/command_handler.go

// ...
// CommandHandler recibe el comando definido por NewCommand y lo maneja utilizando el método Handle.
// Si se utiliza DDD, CommandHandler puede modificar y persistir agregados.
//
// A diferencia de EventHandler, cada Command solo puede tener un CommandHandler.
//
// Durante el manejo del mensaje, se utiliza una única instancia de CommandHandler.
// Cuando se entregan múltiples comandos simultáneamente, el método Handle puede ejecutarse varias veces de forma concurrente.
// ¡Por lo tanto, el método Handle debe ser seguro para subprocesos!
type CommandHandler interface {
// ...

Marshalizador de Comando y Evento

Código fuente completo: github.com/ThreeDotsLabs/watermill/components/cqrs/marshaler.go

// ...
// CommandEventMarshaler convierte comandos y eventos en mensajes de Watermill, y viceversa.
// El payload del comando debe ser convertido en []bytes.
type CommandEventMarshaler interface {
	// Marshal convierte el comando o evento en un mensaje de Watermill.
	Marshal(v interface{}) (*message.Message, error)

	// Unmarshal decodifica el mensaje de Watermill en el comando o evento v.
	Unmarshal(msg *message.Message, v interface{}) (err error)

	// Name devuelve el nombre del comando o evento.
	// El nombre se puede utilizar para determinar si el comando o evento recibido es el que queremos procesar.
	Name(v interface{}) string

	// NameFromMessage devuelve el nombre del comando o evento del mensaje de Watermill (generado por Marshal).
	//
	// Cuando tenemos comandos o eventos convertidos en mensajes de Watermill, debemos usar NameFromMessage en lugar de Name para evitar decodificaciones innecesarias.
	NameFromMessage(msg *message.Message) string
}
// ...

Uso

Ejemplo de Dominio

Utilizando un dominio simple que es responsable de manejar reservas de habitaciones en un hotel.

Utilizaremos símbolos de Event Storming para mostrar el modelo de este dominio.

Leyenda de símbolos:

  • Las notas adhesivas azules son comandos
  • Las notas adhesivas naranjas son eventos
  • Las notas adhesivas verdes son modelos de lectura generados de forma asincrónica a partir de eventos
  • Las notas adhesivas moradas son políticas activadas por eventos y generando comandos
  • Las notas adhesivas rosadas son puntos críticos; marcamos áreas que a menudo encuentran problemas

CQRS Event Storming

El dominio es simple:

  • Los clientes pueden reservar habitaciones.
  • Siempre que se reserve una habitación, pedimos una botella de cerveza para el cliente (porque amamos a nuestros huéspedes).
    • Sabemos que a veces se acaba la cerveza.
  • Generamos un informe financiero basado en la reserva.

Envío de Comandos

En primer lugar, necesitamos simular acciones de los clientes.

Código fuente completo: github.com/ThreeDotsLabs/watermill/_examples/basic/5-cqrs-protobuf

// ...
		bookRoomCmd := &BookRoom{
			RoomId:    fmt.Sprintf("%d", i),
			GuestName: "John",
			StartDate: startDate,
			EndDate:   endDate,
		}
		if err := commandBus.Send(context.Background(), bookRoomCmd); err != nil {
			panic(err)
		}
// ...

Manejador de Comandos

El BookRoomHandler manejará nuestros comandos.

Código fuente completo: github.com/ThreeDotsLabs/watermill/_examples/basic/5-cqrs-protobuf/main.go

// ...
// BookRoomHandler es un manejador de comandos que procesa el comando BookRoom y emite el evento RoomBooked.
//
// En CQRS, un comando debe ser procesado por un manejador.
// Al agregar otro manejador para manejar este comando, se devolverá un error.
type BookRoomHandler struct {
	eventBus *cqrs.EventBus
}

func (b BookRoomHandler) HandlerName() string {
	return "BookRoomHandler"
}

// NewCommand devuelve el tipo de comando que este manejador debe procesar. Debe ser un puntero.
func (b BookRoomHandler) NewCommand() interface{} {
	return &BookRoom{}
}

func (b BookRoomHandler) Handle(ctx context.Context, c interface{}) error {
	// c siempre es el tipo devuelto por `NewCommand`, por lo que la aserción de tipo siempre es segura
	cmd := c.(*BookRoom)

	// Algunos precios aleatorios, que pueden calcularse de una manera más sensata en la producción real
	precio := (rand.Int63n(40) + 1) * 10

	log.Printf(
		"Reservado %s, desde %s hasta %s",
		cmd.RoomId,
		cmd.GuestName,
		time.Unix(cmd.StartDate.Seconds, int64(cmd.StartDate.Nanos)),
		time.Unix(cmd.EndDate.Seconds, int64(cmd.EndDate.Nanos)),
	)

	// RoomBooked será manejado por el manejador de eventos OrderBeerOnRoomBooked,
	// y en el futuro, RoomBooked puede ser manejado por múltiples manejadores de eventos
	if err := b.eventBus.Publish(ctx, &RoomBooked{
		ReservationId: watermill.NewUUID(),
		RoomId:        cmd.RoomId,
		GuestName:     cmd.GuestName,
		Precio:        precio,
		StartDate:     cmd.StartDate,
		EndDate:       cmd.EndDate,
	}); err != nil {
		return err
	}

	return nil
}

// OrderBeerOnRoomBooked es un manejador de eventos que procesa el evento RoomBooked y emite el comando OrderBeer.
// ...

Manejadores de Eventos

Como se mencionó anteriormente, queremos pedir una botella de cerveza cada vez que se reserve una habitación (etiquetada como "Cuando se reserva la habitación"). Esto lo logramos usando el comando OrderBeer.

Código fuente completo: github.com/ThreeDotsLabs/watermill/_examples/basic/5-cqrs-protobuf/main.go

// ...
// OrderBeerOnRoomBooked es un manejador de eventos que procesa el evento RoomBooked y emite el comando OrderBeer.
type OrderBeerOnRoomBooked struct {
	commandBus *cqrs.CommandBus
}

func (o OrderBeerOnRoomBooked) HandlerName() string {
	// Este nombre se pasa a EventsSubscriberConstructor para generar el nombre de la cola
	return "OrderBeerOnRoomBooked"
}

func (OrderBeerOnRoomBooked) NewEvent() interface{} {
	return &RoomBooked{}
}

func (o OrderBeerOnRoomBooked) Handle(ctx context.Context, e interface{}) error {
	event := e.(*RoomBooked)

	orderBeerCmd := &OrderBeer{
		RoomId: event.RoomId,
		Count:  rand.Int63n(10) + 1,
	}

	return o.commandBus.Send(ctx, orderBeerCmd)
}

// OrderBeerHandler es un manejador de comandos que procesa el comando OrderBeer y emite el evento BeerOrdered.
// ...

OrderBeerHandler es muy similar a BookRoomHandler. La única diferencia es que a veces devuelve un error cuando no hay suficiente cerveza, lo que provoca que se vuelva a emitir el comando. Puede encontrar la implementación completa en el código fuente del ejemplo.

Grupos de controladores de eventos

De forma predeterminada, cada controlador de eventos tiene una instancia de suscriptor por separado. Este enfoque funciona bien si solo se envía un tipo de evento al tema.

En el caso de múltiples tipos de eventos en el tema, existen dos opciones:

  1. Puede configurar EventConfig.AckOnUnknownEvent a true - esto reconocerá todos los eventos no manejados por los controladores.
  2. Puede utilizar el mecanismo de grupos de controladores de eventos.

Para usar grupos de eventos, es necesario configurar las opciones GenerateHandlerGroupSubscribeTopic y GroupSubscriberConstructor en EventConfig.

Luego, puede utilizar AddHandlersGroup en el EventProcessor.

Código fuente completo: github.com/ThreeDotsLabs/watermill/_examples/basic/5-cqrs-protobuf/main.go

// ...
	err = eventProcessor.AddHandlersGroup(
		"eventos",
		OrderBeerOnRoomBooked{commandBus},

		NewBookingsFinancialReport(),

		cqrs.NewGroupEventHandler(func(ctx context.Context, event *BeerOrdered) error {
			logger.Info("Cerveza ordenada", watermill.LogFields{
				"room_id": event.RoomId,
			})
			return nil
		}),
	)
	if err != nil {
// ...

Tanto GenerateHandlerGroupSubscribeTopic como GroupSubscriberConstructor reciben información sobre el nombre del grupo como parámetros de función.

Controladores genéricos

A partir de Watermill v1.3, se pueden utilizar controladores genéricos para manejar comandos y eventos. Esto es muy útil cuando se tienen un gran número de comandos/eventos y no se desea crear un controlador para cada uno.

Código fuente completo: github.com/ThreeDotsLabs/watermill/_examples/basic/5-cqrs-protobuf/main.go

// ...
		cqrs.NewGroupEventHandler(func(ctx context.Context, event *BeerOrdered) error {
			logger.Info("Cerveza ordenada", watermill.LogFields{
				"room_id": event.RoomId,
			})
			return nil
		}),
// ...

En segundo plano, crea una implementación de EventHandler o CommandHandler. Es adecuado para todo tipo de controladores.

Código fuente completo: github.com/ThreeDotsLabs/watermill/components/cqrs/command_handler.go

// ...
// NewCommandHandler crea una nueva implementación de CommandHandler basada en la función proporcionada y el tipo de comando inferido a partir de los parámetros de la función.
func NewCommandHandler[Command any](
// ...

Código fuente completo: github.com/ThreeDotsLabs/watermill/components/cqrs/event_handler.go

// ...
// NewEventHandler crea una nueva implementación de EventHandler basada en la función proporcionada y el tipo de evento inferido a partir de los parámetros de la función.
func NewEventHandler[T any](
// ...

Código fuente completo: github.com/ThreeDotsLabs/watermill/components/cqrs/event_handler.go

// ...
// NewGroupEventHandler crea una nueva implementación de GroupEventHandler basada en la función proporcionada y el tipo de evento inferido a partir de los parámetros de la función.
func NewGroupEventHandler[T any](handleFunc func(ctx context.Context, event *T) error) GroupEventHandler {
// ...

Construcción de un Modelo de Lectura Utilizando Controladores de Eventos

Código fuente completo: github.com/ThreeDotsLabs/watermill/_examples/basic/5-cqrs-protobuf/main.go

// ...
// BookingsFinancialReport es un modelo de lectura que calcula cuánto dinero podemos ganar con las reservas.
// Escucha eventos de RoomBooked cuando ocurren.
//
// Esta implementación simplemente escribe en memoria. En un entorno de producción, podrías usar algún tipo de almacenamiento persistente.
type BookingsFinancialReport struct {
	handledBookings map[string]struct{}
	totalCharge     int64
	lock            sync.Mutex
}

func NewBookingsFinancialReport() *BookingsFinancialReport {
	return &BookingsFinancialReport{handledBookings: map[string]struct{}{}}
}

func (b BookingsFinancialReport) HandlerName() string {
	// Este nombre se pasa a EventsSubscriberConstructor y se usa para generar el nombre de la cola
	return "BookingsFinancialReport"
}

func (BookingsFinancialReport) NewEvent() interface{} {
	return &RoomBooked{}
}

func (b *BookingsFinancialReport) Handle(ctx context.Context, e interface{}) error {
	// Handle puede ser llamado concurrentemente, por lo que se necesita seguridad de hilos.
	b.lock.Lock()
	defer b.lock.Unlock()

	event := e.(*RoomBooked)

	// Cuando se utiliza Pub/Sub que no proporciona semántica de entrega exactamente una vez, es necesario deduplicar mensajes.
	// Pub/Sub de GoChannel proporciona entrega exactamente una vez,
	// pero preparemos este ejemplo para otras implementaciones de Pub/Sub.
	if _, ok := b.handledBookings[event.ReservationId]; ok {
		return nil
	}
	b.handledBookings[event.ReservationId] = struct{}{}

	b.totalCharge += event.Price

	fmt.Printf(">>> Habitación reservada por $%d\n", b.totalCharge)
	return nil
}

var amqpAddress = "amqp://guest:guest@rabbitmq:5672/"

func main() {
// ...

Conectar todo

Ya tenemos todos los componentes necesarios para construir una aplicación CQRS.

Utilizaremos AMQP (RabbitMQ) como nuestro broker de mensajes.

En el fondo, CQRS utiliza el enrutador de mensajes de Watermill. Si no está familiarizado con esto y quiere entender cómo funciona, debería consultar la guía de inicio. También le mostrará cómo utilizar algunos patrones de mensajería estándar, como métricas, colas de mensajes envenenados, limitación de velocidad, correlación y otras herramientas utilizadas por cada aplicación impulsada por mensajes. Estas herramientas ya están integradas en Watermill.

Volviendo a CQRS. Como ya sabes, CQRS consta de múltiples componentes, como buses de comandos o eventos, procesadores, y demás.

Código fuente completo: github.com/ThreeDotsLabs/watermill/_examples/basic/5-cqrs-protobuf/main.go

// ...
func main() {
	logger := watermill.NewStdLogger(false, false)
	cqrsMarshaler := cqrs.ProtobufMarshaler{}

	// You can use any Pub/Sub implementation from here: https://watermill.io/pubsubs/
	// Detailed RabbitMQ implementation: https://watermill.io/pubsubs/amqp/
	// Commands will be send to queue, because they need to be consumed once.
	commandsAMQPConfig := amqp.NewDurableQueueConfig(amqpAddress)
	commandsPublisher, err := amqp.NewPublisher(commandsAMQPConfig, logger)
	if err != nil {
		panic(err)
	}
	commandsSubscriber, err := amqp.NewSubscriber(commandsAMQPConfig, logger)
	if err != nil {
		panic(err)
	}

	// Events will be published to PubSub configured Rabbit, because they may be consumed by multiple consumers.
	// (in that case BookingsFinancialReport and OrderBeerOnRoomBooked).
	eventsPublisher, err := amqp.NewPublisher(amqp.NewDurablePubSubConfig(amqpAddress, nil), logger)
	if err != nil {
		panic(err)
	}

	// CQRS is built on messages router. Detailed documentation: https://watermill.io/docs/messages-router/
	router, err := message.NewRouter(message.RouterConfig{}, logger)
	if err != nil {
		panic(err)
	}

	// Simple middleware which will recover panics from event or command handlers.
	// More about router middlewares you can find in the documentation:
	// https://watermill.io/docs/messages-router/#middleware
	//
	// List of available middlewares you can find in message/router/middleware.
	router.AddMiddleware(middleware.Recoverer)

	commandBus, err := cqrs.NewCommandBusWithConfig(commandsPublisher, cqrs.CommandBusConfig{
		GeneratePublishTopic: func(params cqrs.CommandBusGeneratePublishTopicParams) (string, error) {
			// we are using queue RabbitMQ config, so we need to have topic per command type
			return params.CommandName, nil
		},
		OnSend: func(params cqrs.CommandBusOnSendParams) error {
			logger.Info("Sending command", watermill.LogFields{
				"command_name": params.CommandName,
			})

			params.Message.Metadata.Set("sent_at", time.Now().String())

			return nil
		},
		Marshaler: cqrsMarshaler,
		Logger:    logger,
	})
	if err != nil {
		panic(err)
	}

	commandProcessor, err := cqrs.NewCommandProcessorWithConfig(
		router,
		cqrs.CommandProcessorConfig{
			GenerateSubscribeTopic: func(params cqrs.CommandProcessorGenerateSubscribeTopicParams) (string, error) {
				// we are using queue RabbitMQ config, so we need to have topic per command type
				return params.CommandName, nil
			},
			SubscriberConstructor: func(params cqrs.CommandProcessorSubscriberConstructorParams) (message.Subscriber, error) {
				// we can reuse subscriber, because all commands have separated topics
				return commandsSubscriber, nil
			},
			OnHandle: func(params cqrs.CommandProcessorOnHandleParams) error {
				start := time.Now()

				err := params.Handler.Handle(params.Message.Context(), params.Command)

				logger.Info("Command handled", watermill.LogFields{
					"command_name": params.CommandName,
					"duration":     time.Since(start),
					"err":          err,
				})

				return err
			},
			Marshaler: cqrsMarshaler,
			Logger:    logger,
		},
	)
	if err != nil {
		panic(err)
	}

	eventBus, err := cqrs.NewEventBusWithConfig(eventsPublisher, cqrs.EventBusConfig{
		GeneratePublishTopic: func(params cqrs.GenerateEventPublishTopicParams) (string, error) {
			// because we are using PubSub RabbitMQ config, we can use one topic for all events
			return "events", nil

			// we can also use topic per event type
			// return params.EventName, nil
		},

		OnPublish: func(params cqrs.OnEventSendParams) error {
			logger.Info("Publishing event", watermill.LogFields{
				"event_name": params.EventName,
			})

			params.Message.Metadata.Set("published_at", time.Now().String())

			return nil
		},

		Marshaler: cqrsMarshaler,
		Logger:    logger,
	})
	if err != nil {
		panic(err)
	}

	eventProcessor, err := cqrs.NewEventGroupProcessorWithConfig(
		router,
		cqrs.EventGroupProcessorConfig{
			GenerateSubscribeTopic: func(params cqrs.EventGroupProcessorGenerateSubscribeTopicParams) (string, error) {
				return "events", nil
			},
			SubscriberConstructor: func(params cqrs.EventGroupProcessorSubscriberConstructorParams) (message.Subscriber, error) {
				config := amqp.NewDurablePubSubConfig(
					amqpAddress,
					amqp.GenerateQueueNameTopicNameWithSuffix(params.EventGroupName),
				)

				return amqp.NewSubscriber(config, logger)
			},

			OnHandle: func(params cqrs.EventGroupProcessorOnHandleParams) error {
				start := time.Now()

				err := params.Handler.Handle(params.Message.Context(), params.Event)

				logger.Info("Event handled", watermill.LogFields{
					"event_name": params.EventName,
					"duration":   time.Since(start),
					"err":        err,
				})

				return err
			},

			Marshaler: cqrsMarshaler,
			Logger:    logger,
		},
	)
	if err != nil {
		panic(err)
	}

	err = commandProcessor.AddHandlers(
		BookRoomHandler{eventBus},
		OrderBeerHandler{eventBus},
	)
	if err != nil {
		panic(err)
	}

	err = eventProcessor.AddHandlersGroup(
		"events",
		OrderBeerOnRoomBooked{commandBus},

		NewBookingsFinancialReport(),

		cqrs.NewGroupEventHandler(func(ctx context.Context, event *BeerOrdered) error {
			logger.Info("Beer ordered", watermill.LogFields{
				"room_id": event.RoomId,
			})
			return nil
		}),
	)
	if err != nil {
		panic(err)
	}

	// publish BookRoom commands every second to simulate incoming traffic
	go publishCommands(commandBus)

	// processors are based on router, so they will work when router will start
	if err := router.Run(context.Background()); err != nil {
		panic(err)
	}
}
// ...

¡Listo! Tenemos una aplicación CQRS funcional.