CQRS 메커니즘

CQRS는 "Command Query Responsibility Segregation"의 약자입니다. 이는 명령(command)과 조회(query)의 책임을 분리합니다. 쓰기 요청과 읽기 요청은 서로 다른 객체에서 처리됩니다.

이것이 CQRS입니다. 데이터 저장소를 더욱 분리하여 별도의 읽기 및 쓰기 저장소를 갖는 것도 가능합니다. 이렇게 하면 각기 다른 유형의 쿼리를 처리하거나 여러 경계된 컨텍스트를 포함하는 여러 읽기 저장소가 있을 수 있습니다. 별도의 읽기/쓰기 저장소는 종종 CQRS와 관련된 토론의 주제가 되지만, 이것이 CQRS 자체는 아닙니다. CQRS는 명령과 쿼리를 첫 번째로 분리하는 것뿐입니다.

CQRS 아키텍처 다이어그램

cqrs 컴포넌트는 CQRS 패턴을 구현하는 데 도움이 되는 Pub/Sub 및 Router 위에 구축된 일부 유용한 추상화를 제공합니다.

전체 CQRS를 구현할 필요는 없습니다. 일반적으로 컴포넌트의 이벤트 부분만 사용하여 이벤트 기반 애플리케이션을 빌드합니다.

구축 요소

이벤트

이벤트는 이미 발생한 일을 나타냅니다. 이벤트는 변경할 수 없습니다.

이벤트 버스

전체 소스 코드: github.com/ThreeDotsLabs/watermill/components/cqrs/event_bus.go

// ...
// EventBus는 이벤트를 이벤트 핸들러로 전송합니다.
type EventBus struct {
// ...

전체 소스 코드: github.com/ThreeDotsLabs/watermill/components/cqrs/event_bus.go

// ...
type EventBusConfig struct {
    // GeneratePublishTopic은 이벤트 발행을 위한 토픽 이름을 생성하는 데 사용됩니다.
    GeneratePublishTopic GenerateEventPublishTopicFn

    // OnPublish는 이벤트를 전송하기 전에 호출됩니다. *message.Message를 수정할 수 있습니다.
    //
    // 이 옵션은 필수가 아닙니다.
    OnPublish OnEventSendFn

    // Marshaler는 이벤트의 인코딩 및 디코딩에 사용됩니다.
    // 이는 필수 사항입니다.
    Marshaler CommandEventMarshaler

    // 로깅을 위한 Logger 인스턴스입니다. 제공되지 않을 경우 watermill.NopLogger가 사용됩니다.
    Logger watermill.LoggerAdapter
}

func (c *EventBusConfig) setDefaults() {
    if c.Logger == nil {
        c.Logger = watermill.NopLogger{}
    }
}
// ...

이벤트 프로세서

전체 코드: github.com/ThreeDotsLabs/watermill/components/cqrs/event_processor.go

// ...
// EventProcessor는 이벤트 버스로부터 수신한 이벤트를 처리할 EventHandler를 결정하는 데 사용됩니다.
type EventProcessor struct {
// ...

전체 코드: github.com/ThreeDotsLabs/watermill/components/cqrs/event_processor.go

// ...
type EventProcessorConfig struct {
	// GenerateSubscribeTopic은 구독할 이벤트의 토픽을 생성하는 데 사용됩니다.
	// 이벤트 프로세서가 핸들러 그룹을 사용하는 경우 GenerateSubscribeTopic이 사용됩니다.
	GenerateSubscribeTopic EventProcessorGenerateSubscribeTopicFn

	// SubscriberConstructor는 EventHandler를 위해 구독자를 생성하는 데 사용됩니다.
	//
	// 이 함수는 각 EventHandler 인스턴스마다 한 번 호출됩니다.
	// 여러 핸들러에 대해 구독자를 재사용하려면 GroupEventProcessor를 사용하십시오.
	SubscriberConstructor EventProcessorSubscriberConstructorFn

	// OnHandle은 이벤트를 처리하기 전에 호출됩니다.
	// OnHandle은 미들웨어와 유사하게 동작하여 이벤트를 처리하기 전후에 추가 로직을 주입할 수 있습니다.
	//
	// 따라서 params.Handler.Handle()를 명시적으로 호출하여 이벤트를 처리해야 합니다.
	//
	//  func(params EventProcessorOnHandleParams) (err error) {
	//      // 처리 전 로직
	//      //  (...)

	//      err := params.Handler.Handle(params.Message.Context(), params.Event)
	//
	//      // 처리 후 로직
	//      //  (...)

	//      return err
	//  }
	//
	// 이 옵션은 필수가 아닙니다.
	OnHandle EventProcessorOnHandleFn

	// 정의되지 않은 핸들러가 있는 경우 메시지를 인식해야 하는지 여부를 결정하는 데 사용됩니다.
	AckOnUnknownEvent bool

	// Marshaler는 이벤트를 marshal하고 unmarshal하는 데 사용됩니다.
	// 필수 항목입니다.
	Marshaler CommandEventMarshaler

	// 로깅을 위한 로거 인스턴스.
	// 제공되지 않는 경우 watermill.NopLogger가 사용됩니다.
	Logger watermill.LoggerAdapter

	// disableRouterAutoAddHandlers는 역호환성 유지를 위한 것입니다.
	// NewEventProcessor를 사용하여 EventProcessor를 작성할 때 이 값이 설정됩니다.
	// Deprecated: NewEventProcessorWithConfig로 마이그레이션하십시오.
	disableRouterAutoAddHandlers bool
}

func (c *EventProcessorConfig) setDefaults() {
	if c.Logger == nil {
		c.Logger = watermill.NopLogger{}
	}
}
// ...

이벤트 그룹 프로세서

전체 소스 코드: github.com/ThreeDotsLabs/watermill/components/cqrs/event_processor_group.go

// ...
// EventGroupProcessor는 이벤트 버스로부터 수신된 이벤트를 처리할 이벤트 프로세서를 결정합니다.
// EventProcessor와 비교하여 EventGroupProcessor는 여러 프로세서가 동일한 구독자 인스턴스를 공유할 수 있습니다.
type EventGroupProcessor struct {
// ...

전체 소스 코드: github.com/ThreeDotsLabs/watermill/components/cqrs/event_processor_group.go

// ...
type EventGroupProcessorConfig struct {
	// GenerateSubscribeTopic은 그룹 이벤트 프로세서를 구독하기 위한 주제를 생성하는 데 사용됩니다.
	// 이 옵션은 프로세서 그룹을 사용할 때 EventProcessor에 필요합니다.
	GenerateSubscribeTopic EventGroupProcessorGenerateSubscribeTopicFn

	// SubscriberConstructor은 GroupEventHandler를 생성하는 데 사용됩니다.
	// 이 함수는 이벤트 그룹 당 한 번 호출됩니다. 따라서 각 그룹에 대해 구독을 만들 수 있습니다.
	// 순서대로 스트림에서 이벤트를 처리하려는 경우 매우 유용합니다.
	SubscriberConstructor EventGroupProcessorSubscriberConstructorFn

	// OnHandle은 이벤트를 처리하기 전에 호출됩니다.
	// OnHandle은 미들웨어와 유사합니다. 이벤트를 처리하기 전후에 추가적인 로직을 삽입할 수 있습니다.
	//
	// 따라서 params.Handler.Handle()를 명시적으로 호출하여 이벤트를 처리해야 합니다.
	//
	// func(params EventGroupProcessorOnHandleParams) (err error) {
	//     // 처리 전 로직
	//     //  (...)
	//
	//     err := params.Handler.Handle(params.Message.Context(), params.Event)
	//
	//     // 처리 후 로직
	//     //  (...)
	//
	//     return err
	// }
	//
	// 이 옵션은 필수가 아닙니다.
	OnHandle EventGroupProcessorOnHandleFn

	// 정의되지 않은 핸들러가 있는 경우 이벤트를 확인할지 여부를 결정하는 AckOnUnknownEvent입니다.
	AckOnUnknownEvent bool

	// Marshaler는 이벤트를 인코딩하고 디코딩하는 데 사용됩니다.
	// 필수입니다.
	Marshaler CommandEventMarshaler

	// 로깅에 사용되는 Logger 인스턴스입니다.
	// 제공되지 않으면 watermill.NopLogger가 사용됩니다.
	Logger watermill.LoggerAdapter
}

func (c *EventGroupProcessorConfig) setDefaults() {
	if c.Logger == nil {
		c.Logger = watermill.NopLogger{}
	}
}
// ...

이벤트 그룹 프로세서에 대해 더 알아보기.

이벤트 핸들러

전체 소스 코드: github.com/ThreeDotsLabs/watermill/components/cqrs/event_handler.go

// ...
// EventHandler는 NewEvent에 정의된 이벤트를 수신하고 Handle 메서드를 사용하여 처리합니다.
// DDD를 사용하는 경우 이벤트 핸들러는 집계를 수정하고 저장할 수 있습니다.
// 프로세스 매니저, 사가를 호출하거나 단순히 읽기 모델을 빌드할 수도 있습니다.
//
// 명령 핸들러와 달리 각 이벤트에는 여러 이벤트 핸들러가 있을 수 있습니다.
//
// 메시지 처리 중에는 하나의 EventHandler 인스턴스를 사용합니다.
// 동시에 여러 이벤트를 전달할 때 Handle 메서드는 여러 번 동시에 실행될 수 있습니다.
// 따라서 Handle 메서드는 스레드 안전해야 합니다!
type EventHandler interface {
// ...

명령

명령은 어떤 작업을 수행하는 요청을 나타내는 간단한 데이터 구조입니다.

Command Bus

Complete source code: github.com/ThreeDotsLabs/watermill/components/cqrs/command_bus.go

// ...
// CommandBus는 명령을 명령 처리기에 전송하는 구성 요소입니다.
type CommandBus struct {
// ...
}

Complete source code: github.com/ThreeDotsLabs/watermill/components/cqrs/command_bus.go

// ...
type CommandBusConfig struct {
	// GeneratePublishTopic은 명령을 게시하기 위한 토픽을 생성하는 데 사용됩니다.
	GeneratePublishTopic CommandBusGeneratePublishTopicFn

	// OnSend는 명령을 게시하기 전에 호출됩니다.
	// *message.Message를 수정할 수 있습니다.
	//
	// 이 옵션은 필수가 아닙니다.
	OnSend CommandBusOnSendFn

	// Marshaler는 명령의 직렬화와 역직렬화에 사용됩니다.
	// 필수 항목입니다.
	Marshaler CommandEventMarshaler

	// 로깅에 사용되는 Logger 인스턴스입니다.
	// 제공되지 않으면 watermill.NopLogger가 사용됩니다.
	Logger watermill.LoggerAdapter
}

func (c *CommandBusConfig) setDefaults() {
	if c.Logger == nil {
		c.Logger = watermill.NopLogger{}
	}
}
// ...

Command Processor

Complete source code: github.com/ThreeDotsLabs/watermill/components/cqrs/command_processor.go

// ...
// CommandProcessorSubscriberConstructorFn은 CommandHandler에 대한 구독자를 생성하는 데 사용됩니다.
// 각 명령 처리기에 별도의 사용자 정의 구독자를 생성할 수 있습니다.
type CommandProcessorSubscriberConstructorFn func(CommandProcessorSubscriberConstructorParams) (message.Subscriber, error)
// ...

Complete source code: github.com/ThreeDotsLabs/watermill/components/cqrs/command_processor.go

// ...
type CommandProcessorConfig struct {
	// GenerateSubscribeTopic은 명령을 구독하기 위한 토픽을 생성하는 데 사용됩니다.
	GenerateSubscribeTopic CommandProcessorGenerateSubscribeTopicFn

	// SubscriberConstructor은 CommandHandler에 대한 구독자를 생성하는 데 사용됩니다.
	SubscriberConstructor CommandProcessorSubscriberConstructorFn

	// OnHandle는 명령을 처리하기 전에 호출됩니다.
	// OnHandle는 미들웨어처럼 작동하여 명령을 처리하기 전후에 추가 로직을 삽입할 수 있습니다.
	//
	// 이러한 이유로 params.Handler.Handle()를 명령 처리에 명시적으로 호출해야 합니다.
	//  func(params CommandProcessorOnHandleParams) (err error) {
	//      // 처리 전 로직
	//      // (...)
	//
	//      err := params.Handler.Handle(params.Message.Context(), params.Command)
	//
	//      // 처리 후 로직
	//      // (...)
	//
	//      return err
	//  }
	//
	// 이 옵션은 필수가 아닙니다.
	OnHandle CommandProcessorOnHandleFn

	// Marshaler는 명령의 직렬화 및 역직렬화에 사용됩니다.
	// 필수 항목입니다.
	Marshaler CommandEventMarshaler

	// 로깅에 사용되는 Logger 인스턴스입니다.
	// 제공되지 않으면 watermill.NopLogger가 사용됩니다.
	Logger watermill.LoggerAdapter

	// true이면 CommandProcessor는 CommandHandler가 오류를 반환해도 메시지를 확인합니다.
	// RequestReplyBackend가 null이 아니고 응답을 보내는 데 실패한 경우 메시지가 여전히 nacked됩니다.
	//
	// 경고: requestreply 컴포넌트(requestreply.NewCommandHandler 또는 requestreply.NewCommandHandlerWithResult)를 사용할 때 이 옵션을 사용하는 것은 권장되지 않습니다.
	// 왜냐하면 응답을 보내는 동안 명령을 확인할 수 있기 때문입니다.
	//
	// requestreply를 사용할 때는 requestreply.PubSubBackendConfig.AckCommandErrors를 사용해야 합니다.
	AckCommandHandlingErrors bool

	// disableRouterAutoAddHandlers는 역 호환성을 위해 사용됩니다.
	// NewCommandProcessor로 CommandProcessor를 생성할 때 설정됩니다.
	// Deprecated: NewCommandProcessorWithConfig로 마이그레이션하세요.
	disableRouterAutoAddHandlers bool
}

func (c *CommandProcessorConfig) setDefaults() {
	if c.Logger == nil {
		c.Logger = watermill.NopLogger{}
	}
}
// ...

명령 처리기

전체 소스 코드: github.com/ThreeDotsLabs/watermill/components/cqrs/command_handler.go

// ...
// CommandHandler는 NewCommand로 정의된 명령을 받아 Handle 메서드를 사용하여 처리합니다.
// DDD를 사용하는 경우 CommandHandler는 집계를 수정하고 유지할 수 있습니다.
//
// EventHandler와 달리 각 명령은 하나의 CommandHandler만 가질 수 있습니다.
//
// 메시지 처리 중에는 CommandHandler의 인스턴스를 하나 사용합니다.
// 여러 명령이 동시에 전달되는 경우 Handle 메서드는 여러 번 동시에 실행될 수 있습니다.
// 따라서 Handle 메서드는 스레드 안전해야 합니다!
type CommandHandler interface {
// ...

명령 및 이벤트 마샬러

전체 소스 코드: github.com/ThreeDotsLabs/watermill/components/cqrs/marshaler.go

// ...
// CommandEventMarshaler는 명령과 이벤트를 Watermill 메시지로 마샬링하고 그 반대로 진행합니다.
// 명령의 페이로드는 []바이트로 마샬링되어야 합니다.
type CommandEventMarshaler interface {
	// Marshal은 명령 또는 이벤트를 Watermill 메시지로 마샬링합니다.
	Marshal(v interface{}) (*message.Message, error)

	// Unmarshal은 Watermill 메시지를 v 명령이나 이벤트로 디코딩합니다.
	Unmarshal(msg *message.Message, v interface{}) (err error)

	// Name은 명령 또는 이벤트의 이름을 반환합니다.
	// 이름은 수신된 명령 또는 이벤트를 처리해야 하는지 여부를 판별하는 데 사용할 수 있습니다.
	Name(v interface{}) string

	// NameFromMessage는 Watermill 메시지에서 명령 또는 이벤트의 이름을 반환합니다 (Marshal에서 생성됨).
	//
	// 명령 또는 이벤트를 Watermill 메시지로 마샬링할 때 Name 대신 NameFromMessage를 사용하여 불필요한 디코딩을 피해야 합니다.
	NameFromMessage(msg *message.Message) string
}
// ...

사용법

예제 도메인

호텔에서 객실 예약을 처리하는 간단한 도메인을 사용합니다.

우리는 이벤트 스토밍 기호를 사용하여 이 도메인의 모델을 보여줍니다.

기호 설명:

  • 파란색 메모는 명령을 나타냅니다.
  • 주황색 메모는 이벤트를 나타냅니다.
  • 초록색 메모는 이벤트에서 비동기적으로 생성된 읽기 모델을 나타냅니다.
  • 보라색 메모는 이벤트에서 트리거되는 정책이자 명령을 생성합니다.
  • 분홍색 메모는 핫스팟입니다. 자주 문제가 발생하는 영역을 표시합니다.

CQRS 이벤트 스토밍

이 도메인은 다음과 같습니다:

  • 고객은 객실을 예약할 수 있습니다.
  • 객실이 예약될 때마다 고객을 위해 맥주를 주문합니다 (우리는 손님을 사랑하니까).
    • 가끔 맥주가 다 떨어지는 경우가 있습니다.
  • 예약을 기반으로 재무 보고서를 생성합니다.

명령 전송

먼저 고객 작업을 시뮬레이션해야 합니다.

전체 소스 코드: github.com/ThreeDotsLabs/watermill/_examples/basic/5-cqrs-protobuf

// ...
		bookRoomCmd := &BookRoom{
			RoomId:    fmt.Sprintf("%d", i),
			GuestName: "John",
			StartDate: startDate,
			EndDate:   endDate,
		}
		if err := commandBus.Send(context.Background(), bookRoomCmd); err != nil {
			panic(err)
		}
// ...

명령 처리기

BookRoomHandler는 우리의 명령을 처리할 것입니다.

전체 소스 코드: github.com/ThreeDotsLabs/watermill/_examples/basic/5-cqrs-protobuf/main.go

// ...
// BookRoomHandler는 BookRoom 명령을 처리하고 RoomBooked 이벤트를 발생시키는 명령 핸들러입니다.
//
// CQRS에서 명령은 핸들러에 의해 처리되어야 합니다.
// 이 명령을 처리하기 위해 다른 핸들러를 추가할 경우 오류가 반환됩니다.
type BookRoomHandler struct {
	eventBus *cqrs.EventBus
}

func (b BookRoomHandler) HandlerName() string {
	return "BookRoomHandler"
}

// NewCommand는 이 핸들러가 처리해야 하는 명령의 유형을 반환합니다. 반드시 포인터여야 합니다.
func (b BookRoomHandler) NewCommand() interface{} {
	return &BookRoom{}
}

func (b BookRoomHandler) Handle(ctx context.Context, c interface{}) error {
	// c는 항상 `NewCommand`에서 반환된 유형이므로 유형 단언은 항상 안전합니다
	cmd := c.(*BookRoom)

	// 실제 제작에서 더 현명한 방식으로 계산될 수 있는 일부 무작위 가격
	price := (rand.Int63n(40) + 1) * 10

	log.Printf(
		"%s에서 %s까지 예약됨",
		cmd.RoomId,
		cmd.GuestName,
		time.Unix(cmd.StartDate.Seconds, int64(cmd.StartDate.Nanos)),
		time.Unix(cmd.EndDate.Seconds, int64(cmd.EndDate.Nanos)),
	)

	// RoomBooked는 OrderBeerOnRoomBooked 이벤트 핸들러에 의해 처리되며,
	// 현재 RoomBooked는 미래에 여러 이벤트 핸들러에 의해 처리될 수 있습니다.
	if err := b.eventBus.Publish(ctx, &RoomBooked{
		ReservationId: watermill.NewUUID(),
		RoomId:        cmd.RoomId,
		GuestName:     cmd.GuestName,
		Price:         price,
		StartDate:     cmd.StartDate,
		EndDate:       cmd.EndDate,
	}); err != nil {
		return err
	}

	return nil
}

// RoomBooked에 대해 RoomBooked 이벤트 핸들러를 처리하고 OrderBeer 명령을 발생시키는 이벤트 핸들러입니다.
// ...

이벤트 핸들러

이전에 언급한 바와 같이 각 방이 예약될 때마다 맥주 한 병을 주문하려고 합니다("방이 예약될 때" 라벨이 붙어 있음). 이를 위해 OrderBeer 명령을 사용합니다.

전체 소스 코드: github.com/ThreeDotsLabs/watermill/_examples/basic/5-cqrs-protobuf/main.go

// ...
// RoomBooked 이벤트를 처리하고 OrderBeer 명령을 발생시키는 이벤트 핸들러입니다.
type OrderBeerOnRoomBooked struct {
	commandBus *cqrs.CommandBus
}

func (o OrderBeerOnRoomBooked) HandlerName() string {
	// 이 이름은 EventsSubscriberConstructor에 전달되어 대기열 이름을 생성합니다
	return "OrderBeerOnRoomBooked"
}

func (OrderBeerOnRoomBooked) NewEvent() interface{} {
	return &RoomBooked{}
}

func (o OrderBeerOnRoomBooked) Handle(ctx context.Context, e interface{}) error {
	event := e.(*RoomBooked)

	orderBeerCmd := &OrderBeer{
		RoomId: event.RoomId,
		Count:  rand.Int63n(10) + 1,
	}

	return o.commandBus.Send(ctx, orderBeerCmd)
}

// OrderBeerHandler는 BookRoomHandler와 매우 유사합니다. 유일한 차이점은 충분한 맥주가 없을 때 때로는 오류를 반환하여 명령을 다시 발행하는 것입니다. 전체 구현은 [예제 소스 코드](https://github.com/ThreeDotsLabs/watermill/tree/master/_examples/basic/5-cqrs-protobuf/?utm_source=cqrs_doc)에서 찾을 수 있습니다.

이벤트 핸들러 그룹

기본적으로 각 이벤트 핸들러는 별도의 구독자 인스턴스를 갖습니다. 이 방식은 주제로 하나의 이벤트 유형만 보내는 경우에는 잘 작동합니다.

주제에 여러 이벤트 유형이 있는 경우, 두 가지 옵션이 있습니다:

  1. EventConfig.AckOnUnknownEventtrue로 설정할 수 있습니다 - 이렇게 하면 핸들러에서 처리되지 않는 모든 이벤트를 확인합니다.
  2. 이벤트 핸들러 그룹 메커니즘을 사용할 수 있습니다.

이벤트 그룹을 사용하려면 EventConfig에서 GenerateHandlerGroupSubscribeTopicGroupSubscriberConstructor 옵션을 설정해야 합니다.

그런 다음 EventProcessor에서 AddHandlersGroup을 사용할 수 있습니다.

전체 소스 코드: github.com/ThreeDotsLabs/watermill/_examples/basic/5-cqrs-protobuf/main.go

// ...
	err = eventProcessor.AddHandlersGroup(
		"events",
		OrderBeerOnRoomBooked{commandBus},

		NewBookingsFinancialReport(),

		cqrs.NewGroupEventHandler(func(ctx context.Context, event *BeerOrdered) error {
			logger.Info("맥주 주문됨", watermill.LogFields{
				"room_id": event.RoomId,
			})
			return nil
		}),
	)
	if err != nil {
// ...

GenerateHandlerGroupSubscribeTopicGroupSubscriberConstructor은 함수 매개변수로 그룹 이름에 대한 정보를 받습니다.

제네릭 핸들러

Watermill v1.3부터 제네릭 핸들러를 사용하여 명령 및 이벤트를 처리할 수 있습니다. 이것은 많은 수의 명령/이벤트가 있는 경우에 매우 유용합니다.

전체 소스 코드: github.com/ThreeDotsLabs/watermill/_examples/basic/5-cqrs-protobuf/main.go

// ...
		cqrs.NewGroupEventHandler(func(ctx context.Context, event *BeerOrdered) error {
			logger.Info("맥주 주문됨", watermill.LogFields{
				"room_id": event.RoomId,
			})
			return nil
		}),
// ...

내부적으로 이는 EventHandler 또는 CommandHandler 구현을 생성합니다. 모든 유형의 핸들러에 적합합니다.

전체 소스 코드: github.com/ThreeDotsLabs/watermill/components/cqrs/command_handler.go

// ...
// NewCommandHandler는 제공된 함수와 함수 매개변수에서 유추된 명령 유형을 기반으로 새로운 CommandHandler 구현을 생성합니다.
func NewCommandHandler[Command any](
// ...

전체 소스 코드: github.com/ThreeDotsLabs/watermill/components/cqrs/event_handler.go

// ...
// NewEventHandler는 제공된 함수와 함수 매개변수에서 유추된 이벤트 유형을 기반으로 새로운 EventHandler 구현을 생성합니다.
func NewEventHandler[T any](
// ...

전체 소스 코드: github.com/ThreeDotsLabs/watermill/components/cqrs/event_handler.go

// ...
// NewGroupEventHandler는 제공된 함수와 함수 매개변수에서 유추된 이벤트 유형을 기반으로 새로운 GroupEventHandler 구현을 생성합니다.
func NewGroupEventHandler[T any](handleFunc func(ctx context.Context, event *T) error) GroupEventHandler {
// ...

이벤트 핸들러를 사용하여 읽기 모델 구축

전체 소스 코드: github.com/ThreeDotsLabs/watermill/_examples/basic/5-cqrs-protobuf/main.go

// ...
// BookingsFinancialReport는 예약에서 얼마의 수익을 올릴 수 있는지 계산하는 읽기 모델이다.
// RoomBooked 이벤트가 발생할 때 대기합니다.
//
// 이 구현은 단순히 메모리에 작성하는 것입니다. 실제 환경에서는 어떤 형태의 영속적인 저장소를 사용할 수 있습니다.
type BookingsFinancialReport struct {
	handledBookings map[string]struct{}
	totalCharge     int64
	lock            sync.Mutex
}

func NewBookingsFinancialReport() *BookingsFinancialReport {
	return &BookingsFinancialReport{handledBookings: map[string]struct{}{}}
}

func (b BookingsFinancialReport) HandlerName() string {
	// 이 이름은 EventsSubscriberConstructor로 전달되어 큐 이름을 생성하는 데 사용됩니다.
	return "BookingsFinancialReport"
}

func (BookingsFinancialReport) NewEvent() interface{} {
	return &RoomBooked{}
}

func (b *BookingsFinancialReport) Handle(ctx context.Context, e interface{}) error {
	// 동시에 호출될 수 있으므로 스레드 안전이 필요합니다.
	b.lock.Lock()
	defer b.lock.Unlock()

	event := e.(*RoomBooked)

	// 정확히 한 번의 전달 의미론을 제공하지 않는 Pub/Sub를 사용할 때, 메시지를 중복 처리하지 않아야 합니다.
	// GoChannel Pub/Sub는 정확히 한 번의 전달을 제공하지만, 다른 Pub/Sub 구현을 위해 이 예제를 준비합니다.
	if _, ok := b.handledBookings[event.ReservationId]; ok {
		return nil
	}
	b.handledBookings[event.ReservationId] = struct{}{}

	b.totalCharge += event.Price

	fmt.Printf(">>> $%d에 방 예약됨\n", b.totalCharge)
	return nil
}

var amqpAddress = "amqp://guest:guest@rabbitmq:5672/"

func main() {
// ...

모든 것을 연결하기

우리는 이미 CQRS 애플리케이션을 구축하는 데 필요한 모든 구성 요소를 갖추고 있습니다.

우리는 메시지 브로커로 AMQP (RabbitMQ)를 사용할 것입니다.

CQRS는 Watermill의 메시지 라우터를 사용합니다. 이것에 익숙하지 않고 작동 방식을 이해하고 싶다면 시작 가이드를 확인해야 합니다. 시작 가이드는 또한 메트릭, 중독성 메시지 대기열, 속도 제한, 상관 관계 등과 같은 몇 가지 표준 메시징 패턴을 사용하는 방법을 보여줍니다. 이러한 도구들은 이미 Watermill에 내장되어 있습니다.

이제 CQRS로 돌아가봅시다. 이미 알고 계시다시피, CQRS는 명령이나 이벤트 버스, 프로세서 등과 같은 여러 구성 요소로 구성되어 있습니다.

전체 소스 코드: github.com/ThreeDotsLabs/watermill/_examples/basic/5-cqrs-protobuf/main.go

// ...
func main() {
	logger := watermill.NewStdLogger(false, false)
	cqrsMarshaler := cqrs.ProtobufMarshaler{}

	// You can use any Pub/Sub implementation from here: https://watermill.io/pubsubs/
	// Detailed RabbitMQ implementation: https://watermill.io/pubsubs/amqp/
	// Commands will be send to queue, because they need to be consumed once.
	commandsAMQPConfig := amqp.NewDurableQueueConfig(amqpAddress)
	commandsPublisher, err := amqp.NewPublisher(commandsAMQPConfig, logger)
	if err != nil {
		panic(err)
	}
	commandsSubscriber, err := amqp.NewSubscriber(commandsAMQPConfig, logger)
	if err != nil {
		panic(err)
	}

	// Events will be published to PubSub configured Rabbit, because they may be consumed by multiple consumers.
	// (in that case BookingsFinancialReport and OrderBeerOnRoomBooked).
	eventsPublisher, err := amqp.NewPublisher(amqp.NewDurablePubSubConfig(amqpAddress, nil), logger)
	if err != nil {
		panic(err)
	}

	// CQRS is built on messages router. Detailed documentation: https://watermill.io/docs/messages-router/
	router, err := message.NewRouter(message.RouterConfig{}, logger)
	if err != nil {
		panic(err)
	}

	// Simple middleware which will recover panics from event or command handlers.
	// More about router middlewares you can find in the documentation:
	// https://watermill.io/docs/messages-router/#middleware
	//
	// List of available middlewares you can find in message/router/middleware.
	router.AddMiddleware(middleware.Recoverer)

	commandBus, err := cqrs.NewCommandBusWithConfig(commandsPublisher, cqrs.CommandBusConfig{
		GeneratePublishTopic: func(params cqrs.CommandBusGeneratePublishTopicParams) (string, error) {
			// we are using queue RabbitMQ config, so we need to have topic per command type
			return params.CommandName, nil
		},
		OnSend: func(params cqrs.CommandBusOnSendParams) error {
			logger.Info("Sending command", watermill.LogFields{
				"command_name": params.CommandName,
			})

			params.Message.Metadata.Set("sent_at", time.Now().String())

			return nil
		},
		Marshaler: cqrsMarshaler,
		Logger:    logger,
	})
	if err != nil {
		panic(err)
	}

	commandProcessor, err := cqrs.NewCommandProcessorWithConfig(
		router,
		cqrs.CommandProcessorConfig{
			GenerateSubscribeTopic: func(params cqrs.CommandProcessorGenerateSubscribeTopicParams) (string, error) {
				// we are using queue RabbitMQ config, so we need to have topic per command type
				return params.CommandName, nil
			},
			SubscriberConstructor: func(params cqrs.CommandProcessorSubscriberConstructorParams) (message.Subscriber, error) {
				// we can reuse subscriber, because all commands have separated topics
				return commandsSubscriber, nil
			},
			OnHandle: func(params cqrs.CommandProcessorOnHandleParams) error {
				start := time.Now()

				err := params.Handler.Handle(params.Message.Context(), params.Command)

				logger.Info("Command handled", watermill.LogFields{
					"command_name": params.CommandName,
					"duration":     time.Since(start),
					"err":          err,
				})

				return err
			},
			Marshaler: cqrsMarshaler,
			Logger:    logger,
		},
	)
	if err != nil {
		panic(err)
	}

	eventBus, err := cqrs.NewEventBusWithConfig(eventsPublisher, cqrs.EventBusConfig{
		GeneratePublishTopic: func(params cqrs.GenerateEventPublishTopicParams) (string, error) {
			// because we are using PubSub RabbitMQ config, we can use one topic for all events
			return "events", nil

			// we can also use topic per event type
			// return params.EventName, nil
		},

		OnPublish: func(params cqrs.OnEventSendParams) error {
			logger.Info("Publishing event", watermill.LogFields{
				"event_name": params.EventName,
			})

			params.Message.Metadata.Set("published_at", time.Now().String())

			return nil
		},

		Marshaler: cqrsMarshaler,
		Logger:    logger,
	})
	if err != nil {
		panic(err)
	}

	eventProcessor, err := cqrs.NewEventGroupProcessorWithConfig(
		router,
		cqrs.EventGroupProcessorConfig{
			GenerateSubscribeTopic: func(params cqrs.EventGroupProcessorGenerateSubscribeTopicParams) (string, error) {
				return "events", nil
			},
			SubscriberConstructor: func(params cqrs.EventGroupProcessorSubscriberConstructorParams) (message.Subscriber, error) {
				config := amqp.NewDurablePubSubConfig(
					amqpAddress,
					amqp.GenerateQueueNameTopicNameWithSuffix(params.EventGroupName),
				)

				return amqp.NewSubscriber(config, logger)
			},

			OnHandle: func(params cqrs.EventGroupProcessorOnHandleParams) error {
				start := time.Now()

				err := params.Handler.Handle(params.Message.Context(), params.Event)

				logger.Info("Event handled", watermill.LogFields{
					"event_name": params.EventName,
					"duration":   time.Since(start),
					"err":        err,
				})

				return err
			},

			Marshaler: cqrsMarshaler,
			Logger:    logger,
		},
	)
	if err != nil {
		panic(err)
	}

	err = commandProcessor.AddHandlers(
		BookRoomHandler{eventBus},
		OrderBeerHandler{eventBus},
	)
	if err != nil {
		panic(err)
	}

	err = eventProcessor.AddHandlersGroup(
		"events",
		OrderBeerOnRoomBooked{commandBus},

		NewBookingsFinancialReport(),

		cqrs.NewGroupEventHandler(func(ctx context.Context, event *BeerOrdered) error {
			logger.Info("Beer ordered", watermill.LogFields{
				"room_id": event.RoomId,
			})
			return nil
		}),
	)
	if err != nil {
		panic(err)
	}

	// publish BookRoom commands every second to simulate incoming traffic
	go publishCommands(commandBus)

	// processors are based on router, so they will work when router will start
	if err := router.Run(context.Background()); err != nil {
		panic(err)
	}
}
// ...

그게 다입니다. 실행 가능한 CQRS 애플리케이션이 준비되었습니다.