Механизм CQRS

CQRS означает "Segregation of Command Query Responsibility". Он разделяет ответственность за команду (запросы на запись) и запрос (запросы на чтение). Запросы на запись и запросы на чтение обрабатываются различными объектами.

Это CQRS. Мы также можем разделить хранилище данных на отдельные хранилища для чтения и записи. После этого может быть много хранилищ для чтения, оптимизированных для обработки различных типов запросов или используемых в различных ограниченных контекстах. Хотя отдельное хранилище для чтения/записи часто является темой обсуждения, связанной с CQRS, само CQRS не является им. CQRS представляет собой только первое разделение команды и запроса.

CQRS Architecture Diagram

Компонент cqrs предоставляет некоторые полезные абстракции, основанные на Pub/Sub и Router, чтобы помочь реализовать паттерн CQRS.

Вам не нужно реализовывать всю CQRS. Обычно используется только часть, отвечающая за события, для построения приложений, основанных на событиях.

Строительные блоки

События

События представляют собой что-то, что уже произошло. События неизменяемы.

Шина событий

Полный исходный код: github.com/ThreeDotsLabs/watermill/components/cqrs/event_bus.go

// ...
// Шина событий передает события обработчикам событий.
type EventBus struct {
// ...

Полный исходный код: github.com/ThreeDotsLabs/watermill/components/cqrs/event_bus.go

// ...
type EventBusConfig struct {
    // GeneratePublishTopic используется для генерации имени темы для публикации событий.
    GeneratePublishTopic GenerateEventPublishTopicFn

    // OnPublish вызывается перед отправкой события. Он может модифицировать *message.Message.
    //
    // Этот параметр не является обязательным.
    OnPublish OnEventSendFn

    // Marshaler используется для кодирования и декодирования событий.
    // Это обязательный параметр.
    Marshaler CommandEventMarshaler

    // Экземпляр Logger для ведения журнала. Если не предоставлен, используется watermill.NopLogger.
    Logger watermill.LoggerAdapter
}

func (c *EventBusConfig) setDefaults() {
    if c.Logger == nil {
        c.Logger = watermill.NopLogger{}
    }
}
// ...

Обработчик событий

Полный код: github.com/ThreeDotsLabs/watermill/components/cqrs/event_processor.go

// ...
// EventProcessor используется для определения EventHandler, который должен обрабатывать полученные события от шины событий.
type EventProcessor struct {
// ...

Полный код: github.com/ThreeDotsLabs/watermill/components/cqrs/event_processor.go

// ...
type EventProcessorConfig struct {
	// GenerateSubscribeTopic используется для генерации темы для подписки на события.
	// Если процессор событий использует группы обработчиков, то используется GenerateSubscribeTopic.
	GenerateSubscribeTopic EventProcessorGenerateSubscribeTopicFn

	// SubscriberConstructor используется для создания подписчика для EventHandler.
	//
	// Эта функция вызывается один раз для каждого экземпляра EventHandler.
	// Если вы хотите использовать подписчика повторно для нескольких обработчиков, используйте GroupEventProcessor.
	SubscriberConstructor EventProcessorSubscriberConstructorFn

	// OnHandle вызывается перед обработкой события.
	// OnHandle работает аналогично промежуточному программному обеспечению: вы можете внедрить дополнительную логику перед и после обработки события.
	//
	// Поэтому вам нужно явно вызвать params.Handler.Handle(), чтобы обработать событие.
	//
	//  func(params EventProcessorOnHandleParams) (err error) {
	//      // Логика перед обработкой
	//      //  (...)

	//      err := params.Handler.Handle(params.Message.Context(), params.Event)
	//
	//      // Логика после обработки
	//      //  (...)

	//      return err
	//  }
	//
	// Этот параметр не является обязательным.
	OnHandle EventProcessorOnHandleFn

	// AckOnUnknownEvent используется для определения, должно ли сообщение подтверждаться, когда у события нет определенного обработчика.
	AckOnUnknownEvent bool

	// Marshaler используется для marshal и unmarshal событий.
	// Обязательный.
	Marshaler CommandEventMarshaler

	// Экземпляр Logger для ведения журнала.
	// Если не предоставлен, будет использован watermill.NopLogger.
	Logger watermill.LoggerAdapter

	// disableRouterAutoAddHandlers используется для поддержания обратной совместимости.
	// Это значение будет установлено при создании EventProcessor с использованием NewEventProcessor.
	// Устарело: перейдите к NewEventProcessorWithConfig.
	disableRouterAutoAddHandlers bool
}

func (c *EventProcessorConfig) setDefaults() {
	if c.Logger == nil {
		c.Logger = watermill.NopLogger{}
	}
}
// ...

Процессор группы событий

Полный исходный код: github.com/ThreeDotsLabs/watermill/components/cqrs/event_processor_group.go

// ...
// EventGroupProcessor определяет, какой процессор событий должен обрабатывать события, полученные от шины событий.
// В отличие от EventProcessor, EventGroupProcessor позволяет нескольким процессорам использовать один экземпляр подписчика.
type EventGroupProcessor struct {
// ...

Полный исходный код: github.com/ThreeDotsLabs/watermill/components/cqrs/event_processor_group.go

// ...
type EventGroupProcessorConfig struct {
    // GenerateSubscribeTopic используется для генерации темы подписки на групповые процессоры событий.
    // Этот параметр необходим для EventProcessor при использовании групп процессоров.
    GenerateSubscribeTopic EventGroupProcessorGenerateSubscribeTopicFn
  
    // SubscriberConstructor используется для создания подписчика для GroupEventHandler.
    // Эта функция вызывается один раз для каждой группы событий, что позволяет создать подписку для каждой группы.
    // Очень полезно, когда требуется обработка событий из потока в определенном порядке.
    SubscriberConstructor EventGroupProcessorSubscriberConstructorFn
  
    // OnHandle вызывается перед обработкой события.
    // OnHandle похож на промежуточное ПО: можно внедрить дополнительную логику до и после обработки события.
    //
    // Следовательно, необходимо явно вызвать params.Handler.Handle(), чтобы обработать событие.
    //
    // func(params EventGroupProcessorOnHandleParams) (err error) {
    //     // Логика перед обработкой
    //     //  (...)
    //
    //     err := params.Handler.Handle(params.Message.Context(), params.Event)
    //
    //     // Логика после обработки
    //     //  (...)
    //
    //     return err
    // }
    //
    // Данный параметр не обязателен.
    OnHandle EventGroupProcessorOnHandleFn
  
    // AckOnUnknownEvent используется для определения необходимости подтверждения в случае отсутствия определенного обработчика событий.
    AckOnUnknownEvent bool
  
    // Marshaler используется для кодирования и декодирования событий.
    // Это обязательный параметр.
    Marshaler CommandEventMarshaler
  
    // Экземпляр регистратора, используемый для ведения журнала.
    // Если не указан, будет использован watermill.NopLogger.
    Logger watermill.LoggerAdapter
}

func (c *EventGroupProcessorConfig) setDefaults() {
    if c.Logger == nil {
        c.Logger = watermill.NopLogger{}
    }
}
// ...

Узнайте больше о процессоре группы событий.

Обработчик событий

Полный исходный код: github.com/ThreeDotsLabs/watermill/components/cqrs/event_handler.go

// ...
// EventHandler получает события, определенные посредством NewEvent, и обрабатывает их с помощью своего метода Handle.
// Если используется DDD, обработчик событий может изменять и сохранять агрегаты.
// Он также может вызывать менеджеры процессов, саги или просто создавать модели чтения.
//
// В отличие от обработчиков команд, у каждого события может быть несколько обработчиков событий.
//
// Во время обработки сообщений используйте один экземпляр EventHandler.
// При передаче нескольких событий одновременно метод Handle может выполняться несколько раз параллельно.
// Следовательно, метод Handle должен быть потокобезопасным!
type EventHandler interface {
// ...

Команда

Команда - это простая структура данных, представляющая запрос на выполнение некоторой операции.

Шина команд

Полный исходный код: github.com/ThreeDotsLabs/watermill/components/cqrs/command_bus.go

// ...
// CommandBus - это компонент, который передает команды обработчикам команд.
type CommandBus struct {
// ...


Полный исходный код: github.com/ThreeDotsLabs/watermill/components/cqrs/command_bus.go

// ...
type CommandBusConfig struct {
	// GeneratePublishTopic используется для генерации темы для публикации команд.
	GeneratePublishTopic CommandBusGeneratePublishTopicFn

	// OnSend вызывается перед публикацией команды.
	// *message.Message можно изменить.
	//
	// Этот параметр не является обязательным.
	OnSend CommandBusOnSendFn

	// Marshaler используется для сериализации и десериализации команд.
	// Требуется.
	Marshaler CommandEventMarshaler

	// Используется логгер для ведения журнала.
	// Если не предоставлен, будет использован watermill.NopLogger.
	Logger watermill.LoggerAdapter
}

func (c *CommandBusConfig) setDefaults() {
	if c.Logger == nil {
		c.Logger = watermill.NopLogger{}
	}
}
// ...

Обработчик команд

Полный исходный код: github.com/ThreeDotsLabs/watermill/components/cqrs/command_processor.go

// ...
// CommandProcessorSubscriberConstructorFn используется для создания подписчика для обработчика команд.
// Это позволяет создавать отдельных пользовательских подписчиков для каждого обработчика команд.
type CommandProcessorSubscriberConstructorFn func(CommandProcessorSubscriberConstructorParams) (message.Subscriber, error)
// ...

Полный исходный код: github.com/ThreeDotsLabs/watermill/components/cqrs/command_processor.go

// ...
type CommandProcessorConfig struct {
	// GenerateSubscribeTopic используется для генерации темы для подписки на команды.
	GenerateSubscribeTopic CommandProcessorGenerateSubscribeTopicFn

	// SubscriberConstructor используется для создания подписчика для обработчика команд.
	SubscriberConstructor CommandProcessorSubscriberConstructorFn

	// OnHandle вызывается перед обработкой команды.
	// OnHandle работает как промежуточное ПО: вы можете внедрить дополнительную логику перед и после обработки команды.
	//
	// Из-за этого вам нужно явно вызвать params.Handler.Handle(), чтобы обработать команду.
	//  func(params CommandProcessorOnHandleParams) (err error) {
	//      // логика перед обработкой
	//      // (...)
	//
	//      err := params.Handler.Handle(params.Message.Context(), params.Command)
	//
	//      // логика после обработки
	//      // (...)
	//
	//      return err
	//  }
	//
	// Этот параметр не обязателен.
	OnHandle CommandProcessorOnHandleFn

	// Marshaler используется для сериализации и десериализации команд.
	// Требуется.
	Marshaler CommandEventMarshaler

	// Инстанс логгера для ведения журнала.
	// Если не предоставлен, будет использован watermill.NopLogger.
	Logger watermill.LoggerAdapter

	// Если true, CommandProcessor будет подтверждать сообщения даже если CommandHandler возвращает ошибку.
	// Если RequestReplyBackend не равен null и отправка ответа завершается неудачей, сообщение все равно будет отменено.
	//
	// Предупреждение: не рекомендуется использовать этот параметр при использовании компонента requestreply (requestreply.NewCommandHandler или requestreply.NewCommandHandlerWithResult),
	// так как он может подтверждать команду, когда отправка ответа завершается неудачей.
	//
	// При использовании requestreply, вы должны использовать requestreply.PubSubBackendConfig.AckCommandErrors.
	AckCommandHandlingErrors bool

	// disableRouterAutoAddHandlers используется для обратной совместимости.
	// Он устанавливается при создании CommandProcessor с помощью NewCommandProcessor.
	// Устарело: пожалуйста, перейдите на использование NewCommandProcessorWithConfig.
	disableRouterAutoAddHandlers bool
}

func (c *CommandProcessorConfig) setDefaults() {
	if c.Logger == nil {
		c.Logger = watermill.NopLogger{}
	}
}
// ...

Обработчик команд

Полный исходный код: github.com/ThreeDotsLabs/watermill/components/cqrs/command_handler.go

// ...
// CommandHandler принимает команду, определенную через NewCommand, и обрабатывает ее с помощью метода Handle.
// Если используется DDD, CommandHandler может изменять и сохранять агрегаты.
//
// В отличие от EventHandler, каждая команда может иметь только один CommandHandler.
//
// Во время обработки сообщений используйте один экземпляр CommandHandler.
// При доставке нескольких команд одновременно метод Handle может быть выполнен несколько раз параллельно.
// Поэтому метод Handle должен быть потокобезопасным!
type CommandHandler interface {
// ...

Маршалер команд и событий

Полный исходный код: github.com/ThreeDotsLabs/watermill/components/cqrs/marshaler.go

// ...
// CommandEventMarshaler маршализует команды и события в сообщения Watermill и наоборот.
// Полезная нагрузка команды должна быть преобразована в []байты.
type CommandEventMarshaler interface {
	// Marshal маршализует команду или событие в сообщение Watermill.
	Marshal(v interface{}) (*message.Message, error)

	// Unmarshal декодирует сообщение Watermill в команду или событие v.
	Unmarshal(msg *message.Message, v interface{}) (err error)

	// Name возвращает имя команды или события.
	// Имя может использоваться для определения, является ли полученная команда или событие тем, которое мы хотим обработать.
	Name(v interface{}) string

	// NameFromMessage возвращает имя команды или события из сообщения Watermill (сгенерированного Marshal).
	//
	// Когда у нас есть команды или события, маршализованные в сообщения Watermill, следует использовать NameFromMessage вместо Name, чтобы избежать излишней декодировки.
	NameFromMessage(msg *message.Message) string
}
// ...

Использование

Пример области применения

Используя простую область применения, ответственную за обработку бронирования комнат в отеле.

Мы будем использовать символы Event Storming, чтобы продемонстрировать модель этой области.

Легенда символов:

  • Синие клейкие заметки - команды
  • Оранжевые клейкие заметки - события
  • Зеленые клейкие заметки - асинхронно созданные чтения модели из событий
  • Фиолетовые клейкие заметки - политики, запущенные событиями и генерация команд
  • Розовые клейкие заметки - точки нагрева; мы отмечаем области, которые часто сталкиваются с проблемами

CQRS Event Storming

Область проста:

  • Клиенты могут забронировать комнаты.
  • Всякий раз, когда комната забронирована, мы заказываем бутылку пива для клиента (потому что мы любим наших гостей).
    • Мы знаем, что иногда пиво заканчивается.
  • Мы составляем финансовый отчет на основе бронирования.

Отправка команд

Сначала нам нужно имитировать действия клиентов.

Полный исходный код: github.com/ThreeDotsLabs/watermill/_examples/basic/5-cqrs-protobuf

// ...
		bookRoomCmd := &BookRoom{
			RoomId:    fmt.Sprintf("%d", i),
			GuestName: "John",
			StartDate: startDate,
			EndDate:   endDate,
		}
		if err := commandBus.Send(context.Background(), bookRoomCmd); err != nil {
			panic(err)
		}
// ...

Обработчик команд

BookRoomHandler будет обрабатывать наши команды.

Полный исходный код: github.com/ThreeDotsLabs/watermill/_examples/basic/5-cqrs-protobuf/main.go

// ...
// BookRoomHandler - обработчик команд, обрабатывающий команду BookRoom и генерирующий событие RoomBooked.
//
// В CQRS команду должен обработать обработчик.
// При добавлении другого обработчика для этой команды будет возвращена ошибка.
type BookRoomHandler struct {
	eventBus *cqrs.EventBus
}

func (b BookRoomHandler) HandlerName() string {
	return "BookRoomHandler"
}

// NewCommand возвращает тип команды, которую должен обработать этот обработчик. Она должна быть указателем.
func (b BookRoomHandler) NewCommand() interface{} {
	return &BookRoom{}
}

func (b BookRoomHandler) Handle(ctx context.Context, c interface{}) error {
	// c всегда является типом, возвращаемым `NewCommand`, поэтому утверждение типа всегда безопасно
	cmd := c.(*BookRoom)

	// Некоторая случайная цена, которая может быть рассчитана более разумным способом в реальном производстве
	price := (rand.Int63n(40) + 1) * 10

	log.Printf(
		"Забронировано %s, от %s до %s",
		cmd.RoomId,
		cmd.GuestName,
		time.Unix(cmd.StartDate.Seconds, int64(cmd.StartDate.Nanos)),
		time.Unix(cmd.EndDate.Seconds, int64(cmd.EndDate.Nanos)),
	)

	// RoomBooked будет обработан обработчиком событий OrderBeerOnRoomBooked,
	// и в будущем RoomBooked можно будет обрабатывать несколькими обработчиками событий
	if err := b.eventBus.Publish(ctx, &RoomBooked{
		ReservationId: watermill.NewUUID(),
		RoomId:        cmd.RoomId,
		GuestName:     cmd.GuestName,
		Price:         price,
		StartDate:     cmd.StartDate,
		EndDate:       cmd.EndDate,
	}); err != nil {
		return err
	}

	return nil
}

// OrderBeerOnRoomBooked - обработчик событий, обрабатывающий событие RoomBooked и генерирующий команду OrderBeer.
// ...

Обработчики событий

Как упоминалось ранее, мы хотим заказать бутылку пива каждый раз, когда номер забронирован (отмечено как "Когда номер забронирован"). Мы делаем это с помощью команды OrderBeer.

Полный исходный код: github.com/ThreeDotsLabs/watermill/_examples/basic/5-cqrs-protobuf/main.go

// ...
// OrderBeerOnRoomBooked - обработчик событий, обрабатывающий событие RoomBooked и генерирующий команду OrderBeer.
type OrderBeerOnRoomBooked struct {
	commandBus *cqrs.CommandBus
}

func (o OrderBeerOnRoomBooked) HandlerName() string {
	// Это имя передается в EventsSubscriberConstructor для генерации имени очереди
	return "OrderBeerOnRoomBooked"
}

func (OrderBeerOnRoomBooked) NewEvent() interface{} {
	return &RoomBooked{}
}

func (o OrderBeerOnRoomBooked) Handle(ctx context.Context, e interface{}) error {
	event := e.(*RoomBooked)

	orderBeerCmd := &OrderBeer{
		RoomId: event.RoomId,
		Count:  rand.Int63n(10) + 1,
	}

	return o.commandBus.Send(ctx, orderBeerCmd)
}

// OrderBeerHandler - обработчик команд, обрабатывающий команду OrderBeer и генерирующий событие BeerOrdered.
// ...

OrderBeerHandler очень похож на BookRoomHandler. Единственное отличие заключается в том, что иногда он возвращает ошибку, когда пива недостаточно, что приводит к повторной выдаче команды. Полную реализацию можно найти в исходном коде примера.

Группы обработчиков событий

По умолчанию каждый обработчик событий имеет отдельный экземпляр подписчика. Этот подход работает хорошо, если только один тип события отправляется в тему.

В случае нескольких типов событий в теме существуют два варианта:

  1. Можно установить EventConfig.AckOnUnknownEvent в true - это подтвердит все события, не обработанные обработчиками.
  2. Можно использовать механизм группы обработчиков событий.

Для использования групп событий вам необходимо установить параметры GenerateHandlerGroupSubscribeTopic и GroupSubscriberConstructor в EventConfig.

Затем вы можете использовать AddHandlersGroup на EventProcessor.

Полный исходный код: github.com/ThreeDotsLabs/watermill/_examples/basic/5-cqrs-protobuf/main.go

// ...
	err = eventProcessor.AddHandlersGroup(
		"events",
		OrderBeerOnRoomBooked{commandBus},

		NewBookingsFinancialReport(),

		cqrs.NewGroupEventHandler(func(ctx context.Context, event *BeerOrdered) error {
			logger.Info("Пиво заказано", watermill.LogFields{
				"room_id": event.RoomId,
			})
			return nil
		}),
	)
	if err != nil {
// ...

Оба GenerateHandlerGroupSubscribeTopic и GroupSubscriberConstructor получают информацию о имени группы в качестве параметров функции.

Обобщенные обработчики

Начиная с Watermill v1.3, можно использовать обобщенные обработчики для обработки команд и событий. Это очень удобно, когда у вас большое количество команд/событий, и вы не хотите создавать обработчик для каждого из них.

Полный исходный код: github.com/ThreeDotsLabs/watermill/_examples/basic/5-cqrs-protobuf/main.go

// ...
		cqrs.NewGroupEventHandler(func(ctx context.Context, event *BeerOrdered) error {
			logger.Info("Пиво заказано", watermill.LogFields{
				"room_id": event.RoomId,
			})
			return nil
		}),
// ...

Внутри создается реализация обработчика событий или команд. Подходит для всех типов обработчиков.

Полный исходный код: github.com/ThreeDotsLabs/watermill/components/cqrs/command_handler.go

// ...
// NewCommandHandler создает новую реализацию обработчика команд на основе указанной функции и выведенного типа команды из параметров функции.
func NewCommandHandler[Command any](
// ...

Полный исходный код: github.com/ThreeDotsLabs/watermill/components/cqrs/event_handler.go

// ...
// NewEventHandler создает новую реализацию обработчика событий на основе указанной функции и выведенного типа события из параметров функции.
func NewEventHandler[T any](
// ...

Полный исходный код: github.com/ThreeDotsLabs/watermill/components/cqrs/event_handler.go

// ...
// NewGroupEventHandler создает новую реализацию группового обработчика событий на основе указанной функции и выведенного типа события из параметров функции.
func NewGroupEventHandler[T any](handleFunc func(ctx context.Context, event *T) error) GroupEventHandler {
// ...

Построение модели чтения с использованием обработчиков событий

Полный исходный код: github.com/ThreeDotsLabs/watermill/_examples/basic/5-cqrs-protobuf/main.go

// ...
// BookingsFinancialReport - это модель чтения, которая вычисляет, сколько денег мы можем заработать на бронированиях.
// Он слушает события RoomBooked при их возникновении.
//
// Эта реализация просто пишет в память. В рабочей среде вы можете использовать какую-то форму постоянного хранения.
type BookingsFinancialReport struct {
	handledBookings map[string]struct{}
	totalCharge     int64
	lock            sync.Mutex
}

func NewBookingsFinancialReport() *BookingsFinancialReport {
	return &BookingsFinancialReport{handledBookings: map[string]struct{}{}}
}

func (b BookingsFinancialReport) HandlerName() string {
	// Это имя передается EventsSubscriberConstructor и используется для генерации имени очереди
	return "BookingsFinancialReport"
}

func (BookingsFinancialReport) NewEvent() interface{} {
	return &RoomBooked{}
}

func (b *BookingsFinancialReport) Handle(ctx context.Context, e interface{}) error {
	// Handle может быть вызван параллельно, поэтому требуется обеспечить потокобезопасность.
	b.lock.Lock()
	defer b.lock.Unlock()

	event := e.(*RoomBooked)

	// При использовании Pub/Sub, который не обеспечивает доставку семантики ровно один раз, нам нужно устранять дублирование сообщений.
	// Pub/Sub GoChannel обеспечивает доставку ровно один раз,
	// но давайте подготовим этот пример для других реализаций Pub/Sub.
	if _, ok := b.handledBookings[event.ReservationId]; ok {
		return nil
	}
	b.handledBookings[event.ReservationId] = struct{}{}

	b.totalCharge += event.Price

	fmt.Printf(">>> Комната забронирована за $%d\n", b.totalCharge)
	return nil
}

var amqpAddress = "amqp://guest:guest@rabbitmq:5672/"

func main() {
// ...

Подключите все

У нас уже есть все необходимые компоненты для создания приложения CQRS.

Мы будем использовать AMQP (RabbitMQ) в качестве нашего брокера сообщений: AMQP.

Под капотом CQRS использует маршрутизатор сообщений Watermill. Если вы не знакомы с этим и хотите понять, как это работает, вам следует ознакомиться с руководством по началу работы. В нем также будет показано, как использовать некоторые стандартные шаблоны сообщений, такие как метрики, очереди "отравленных" сообщений, ограничение скорости, корреляция и другие инструменты, используемые каждым приложением, работающим на сообщениях. Эти инструменты уже встроены в Watermill.

Вернемся к CQRS. Как вы уже знаете, CQRS состоит из нескольких компонентов, таких как шины команд или событий, процессоры и так далее.

Полный исходный код: github.com/ThreeDotsLabs/watermill/_examples/basic/5-cqrs-protobuf/main.go

// ...
func main() {
	logger := watermill.NewStdLogger(false, false)
	cqrsMarshaler := cqrs.ProtobufMarshaler{}

	// You can use any Pub/Sub implementation from here: https://watermill.io/pubsubs/
	// Detailed RabbitMQ implementation: https://watermill.io/pubsubs/amqp/
	// Commands will be send to queue, because they need to be consumed once.
	commandsAMQPConfig := amqp.NewDurableQueueConfig(amqpAddress)
	commandsPublisher, err := amqp.NewPublisher(commandsAMQPConfig, logger)
	if err != nil {
		panic(err)
	}
	commandsSubscriber, err := amqp.NewSubscriber(commandsAMQPConfig, logger)
	if err != nil {
		panic(err)
	}

	// Events will be published to PubSub configured Rabbit, because they may be consumed by multiple consumers.
	// (in that case BookingsFinancialReport and OrderBeerOnRoomBooked).
	eventsPublisher, err := amqp.NewPublisher(amqp.NewDurablePubSubConfig(amqpAddress, nil), logger)
	if err != nil {
		panic(err)
	}

	// CQRS is built on messages router. Detailed documentation: https://watermill.io/docs/messages-router/
	router, err := message.NewRouter(message.RouterConfig{}, logger)
	if err != nil {
		panic(err)
	}

	// Simple middleware which will recover panics from event or command handlers.
	// More about router middlewares you can find in the documentation:
	// https://watermill.io/docs/messages-router/#middleware
	//
	// List of available middlewares you can find in message/router/middleware.
	router.AddMiddleware(middleware.Recoverer)

	commandBus, err := cqrs.NewCommandBusWithConfig(commandsPublisher, cqrs.CommandBusConfig{
		GeneratePublishTopic: func(params cqrs.CommandBusGeneratePublishTopicParams) (string, error) {
			// we are using queue RabbitMQ config, so we need to have topic per command type
			return params.CommandName, nil
		},
		OnSend: func(params cqrs.CommandBusOnSendParams) error {
			logger.Info("Sending command", watermill.LogFields{
				"command_name": params.CommandName,
			})

			params.Message.Metadata.Set("sent_at", time.Now().String())

			return nil
		},
		Marshaler: cqrsMarshaler,
		Logger:    logger,
	})
	if err != nil {
		panic(err)
	}

	commandProcessor, err := cqrs.NewCommandProcessorWithConfig(
		router,
		cqrs.CommandProcessorConfig{
			GenerateSubscribeTopic: func(params cqrs.CommandProcessorGenerateSubscribeTopicParams) (string, error) {
				// we are using queue RabbitMQ config, so we need to have topic per command type
				return params.CommandName, nil
			},
			SubscriberConstructor: func(params cqrs.CommandProcessorSubscriberConstructorParams) (message.Subscriber, error) {
				// we can reuse subscriber, because all commands have separated topics
				return commandsSubscriber, nil
			},
			OnHandle: func(params cqrs.CommandProcessorOnHandleParams) error {
				start := time.Now()

				err := params.Handler.Handle(params.Message.Context(), params.Command)

				logger.Info("Command handled", watermill.LogFields{
					"command_name": params.CommandName,
					"duration":     time.Since(start),
					"err":          err,
				})

				return err
			},
			Marshaler: cqrsMarshaler,
			Logger:    logger,
		},
	)
	if err != nil {
		panic(err)
	}

	eventBus, err := cqrs.NewEventBusWithConfig(eventsPublisher, cqrs.EventBusConfig{
		GeneratePublishTopic: func(params cqrs.GenerateEventPublishTopicParams) (string, error) {
			// because we are using PubSub RabbitMQ config, we can use one topic for all events
			return "events", nil

			// we can also use topic per event type
			// return params.EventName, nil
		},

		OnPublish: func(params cqrs.OnEventSendParams) error {
			logger.Info("Publishing event", watermill.LogFields{
				"event_name": params.EventName,
			})

			params.Message.Metadata.Set("published_at", time.Now().String())

			return nil
		},

		Marshaler: cqrsMarshaler,
		Logger:    logger,
	})
	if err != nil {
		panic(err)
	}

	eventProcessor, err := cqrs.NewEventGroupProcessorWithConfig(
		router,
		cqrs.EventGroupProcessorConfig{
			GenerateSubscribeTopic: func(params cqrs.EventGroupProcessorGenerateSubscribeTopicParams) (string, error) {
				return "events", nil
			},
			SubscriberConstructor: func(params cqrs.EventGroupProcessorSubscriberConstructorParams) (message.Subscriber, error) {
				config := amqp.NewDurablePubSubConfig(
					amqpAddress,
					amqp.GenerateQueueNameTopicNameWithSuffix(params.EventGroupName),
				)

				return amqp.NewSubscriber(config, logger)
			},

			OnHandle: func(params cqrs.EventGroupProcessorOnHandleParams) error {
				start := time.Now()

				err := params.Handler.Handle(params.Message.Context(), params.Event)

				logger.Info("Event handled", watermill.LogFields{
					"event_name": params.EventName,
					"duration":   time.Since(start),
					"err":        err,
				})

				return err
			},

			Marshaler: cqrsMarshaler,
			Logger:    logger,
		},
	)
	if err != nil {
		panic(err)
	}

	err = commandProcessor.AddHandlers(
		BookRoomHandler{eventBus},
		OrderBeerHandler{eventBus},
	)
	if err != nil {
		panic(err)
	}

	err = eventProcessor.AddHandlersGroup(
		"events",
		OrderBeerOnRoomBooked{commandBus},

		NewBookingsFinancialReport(),

		cqrs.NewGroupEventHandler(func(ctx context.Context, event *BeerOrdered) error {
			logger.Info("Beer ordered", watermill.LogFields{
				"room_id": event.RoomId,
			})
			return nil
		}),
	)
	if err != nil {
		panic(err)
	}

	// publish BookRoom commands every second to simulate incoming traffic
	go publishCommands(commandBus)

	// processors are based on router, so they will work when router will start
	if err := router.Run(context.Background()); err != nil {
		panic(err)
	}
}
// ...

Это все. У нас есть исполняемое приложение CQRS.