CQRSメカニズム

CQRSは"Command Query Responsibility Segregation"の略で、コマンド(書き込みリクエスト)とクエリ(読み取りリクエスト)の責任を分離します。書き込みリクエストと読み取りリクエストは異なるオブジェクトによって処理されます。

これがCQRSです。さらに、データストレージを分離して、読み取り専用と書き込み専用のストレージにすることができます。これにより、さまざまな種類のクエリを処理するための最適化された多くの読み取り専用ストレージや、多くの境界コンテキストをまたぐクエリを扱うためのストレージができます。読み取り/書き込み用の分離ストレージは、CQRSに関連する議論の対象になることが多いですが、それ自体がCQRSではありません。CQRSは、コマンドとクエリの最初の分離だけです。

CQRSアーキテクチャダイアグラム

cqrsコンポーネントは、Pub/SubとRouterの上に構築されたいくつかの便利な抽象化を提供し、CQRSパターンを実装するのに役立ちます。

CQRSをすべて実装する必要はありません。通常、コンポーネントのイベント部分のみを使用して、イベント駆動アプリケーションを構築します。

構築ブロック

イベント

イベントはすでに起こったことを表します。イベントは不変です。

イベントバス

完全なソースコード: github.com/ThreeDotsLabs/watermill/components/cqrs/event_bus.go

// ...
// EventBusはイベントをイベントハンドラに転送します。
type EventBus struct {
// ...

完全なソースコード: github.com/ThreeDotsLabs/watermill/components/cqrs/event_bus.go

// ...
type EventBusConfig struct {
    // GeneratePublishTopicは、イベントの発行トピック名を生成するために使用されます。
    GeneratePublishTopic GenerateEventPublishTopicFn

    // 送信前に呼び出されるOnPublish。*message.Messageを変更できます。
    //
    // このオプションは必須ではありません。
    OnPublish OnEventSendFn

    // Marshalerはイベントのエンコードとデコードに使用されます。
    // これは必須です。
    Marshaler CommandEventMarshaler

    // ロガーはロギングに使用されるインスタンスです。指定しない場合、watermill.NopLoggerが使用されます。
    Logger watermill.LoggerAdapter
}

func (c *EventBusConfig) setDefaults() {
    if c.Logger == nil {
        c.Logger = watermill.NopLogger{}
    }
}
// ...

イベントプロセッサ

完全なコード:github.com/ThreeDotsLabs/watermill/components/cqrs/event_processor.go

// ...
// EventProcessorは、イベントバスから受信したイベントを処理するのに使用される、イベントハンドラを決定するためのものです。
type EventProcessor struct {
// ...

完全なコード:github.com/ThreeDotsLabs/watermill/components/cqrs/event_processor.go

// ...
type EventProcessorConfig struct {
	// GenerateSubscribeTopicは、イベントの購読トピックを生成するために使用されます。
	// イベントプロセッサがハンドラグループを使用する場合、GenerateSubscribeTopicが使用されます。
	GenerateSubscribeTopic EventProcessorGenerateSubscribeTopicFn

	// SubscriberConstructorは、イベントハンドラのためのサブスクライバを作成するために使用されます。
	//
	// この関数は、各イベントハンドラインスタンスごとに一度呼び出されます。
	// 複数のハンドラでサブスクライバを再利用したい場合は、GroupEventProcessorを使用してください。
	SubscriberConstructor EventProcessorSubscriberConstructorFn

	// OnHandleは、イベントの処理の前に呼び出されます。
	// OnHandleはミドルウェアと同様に機能し、イベントの処理前後に追加のロジックを注入することができます。
	//
	// したがって、params.Handler.Handle()を明示的に呼び出す必要があります。
	//
	//  func(params EventProcessorOnHandleParams) (err error) {
	//      // 処理前のロジック
	//      //  (...)

	//      err := params.Handler.Handle(params.Message.Context(), params.Event)
	//
	//      // 処理後のロジック
	//      //  (...)

	//      return err
	//  }
	//
	// このオプションは必須ではありません。
	OnHandle EventProcessorOnHandleFn

	// AckOnUnknownEventは、イベントに定義されたハンドラがない場合にメッセージをアクノリッジするかどうかを決定するために使用されます。
	AckOnUnknownEvent bool

	// Marshalerは、イベントのマーシャリングとアンマーシャリングに使用されます。
	// 必須です。
	Marshaler CommandEventMarshaler

	// ロガーのインスタンスを提供します。
	// 提供されていない場合、watermill.NopLoggerが使用されます。
	Logger watermill.LoggerAdapter

	// disableRouterAutoAddHandlersは、後方互換性を維持するためのものです。
	// この値は、NewEventProcessorを使用してEventProcessorを作成する際に設定されます。
	// 廃止予定: NewEventProcessorWithConfigに移行してください。
	disableRouterAutoAddHandlers bool
}

func (c *EventProcessorConfig) setDefaults() {
	if c.Logger == nil {
		c.Logger = watermill.NopLogger{}
	}
}
// ...

イベントグループプロセッサ

完全なソースコード: github.com/ThreeDotsLabs/watermill/components/cqrs/event_processor_group.go

// ...
// EventGroupProcessorは、イベントバスから受信したイベントを処理するためのイベントプロセッサを決定します。
// EventProcessorと比較して、EventGroupProcessorは複数のプロセッサが同じサブスクライバーインスタンスを共有できるようにします。
type EventGroupProcessor struct {
// ...

完全なソースコード: github.com/ThreeDotsLabs/watermill/components/cqrs/event_processor_group.go

// ...
type EventGroupProcessorConfig struct {
	// GenerateSubscribeTopicは、グループイベントプロセッサに購読するためのトピックを生成するために使用されます。
	// このオプションは、プロセッサグループを使用する場合にEventProcessorで必要です。
	GenerateSubscribeTopic EventGroupProcessorGenerateSubscribeTopicFn

	// SubscriberConstructorはGroupEventHandlerのサブスクライバーを作成するために使用されます。
	// この関数は、一つのイベントグループごとに一度呼び出され、各グループに対してサブスクリプションを作成することができます。
	// イベントを順番に処理したい場合に非常に便利です。
	SubscriberConstructor EventGroupProcessorSubscriberConstructorFn

	// OnHandleはイベントを処理する前に呼び出されます。
	// OnHandleはミドルウェアに似ており、イベントを処理する前と後に追加のロジックを注入できます。
	//
	// したがって、params.Handler.Handle()を明示的に呼び出す必要があります。
	//
	// func(params EventGroupProcessorOnHandleParams) (err error) {
	//     // 処理前のロジック
	//     //  (...)
	//
	//     err := params.Handler.Handle(params.Message.Context(), params.Event)
	//
	//     // 処理後のロジック
	//     //  (...)
	//
	//     return err
	// }
	//
	// このオプションは必須ではありません。
	OnHandle EventGroupProcessorOnHandleFn

	// AckOnUnknownEventは、定義されたハンドラを持たないイベントに対してアクノリッジメントするかどうかを決定するために使用されます。
	AckOnUnknownEvent bool

	// Marshalerはイベントのエンコーディングおよびデコーディングに使用されます。
	// これは必須です。
	Marshaler CommandEventMarshaler

	// ロガーインスタンスはログ出力に使用されます。
	// 提供されていない場合、watermill.NopLoggerが使用されます。
	Logger watermill.LoggerAdapter
}

func (c *EventGroupProcessorConfig) setDefaults() {
	if c.Logger == nil {
		c.Logger = watermill.NopLogger{}
	}
}
// ...

イベントグループプロセッサについてもっと詳しく学ぶ。

イベントハンドラ

完全なソースコード: github.com/ThreeDotsLabs/watermill/components/cqrs/event_handler.go

// ...
// EventHandlerはNewEventで定義されたイベントを受け取り、そのHandleメソッドを使用して処理します。
// DDDを使用している場合、イベントハンドラは集約を変更および永続化することができます。
// プロセスマネージャやサーガを呼び出すこともでき、単にリードモデルを構築することもできます。
//
// コマンドハンドラとは異なり、各イベントに複数のイベントハンドラを持つことができます。
//
// メッセージの処理中に、1つのEventHandlerインスタンスを使用します。
// 複数のイベントを同時に渡すと、Handleメソッドが複数回同時に実行される可能性があります。
// したがって、Handleメソッドはスレッドセーフである必要があります!
type EventHandler interface {
// ...

コマンド

コマンドは、ある操作を実行するためのリクエストを表すシンプルなデータ構造です。

コマンドバス

完全なソースコード: github.com/ThreeDotsLabs/watermill/components/cqrs/command_bus.go

// ...
// CommandBusはコマンドをコマンドハンドラーに転送するコンポーネントです。
type CommandBus struct {
// ...

完全なソースコード: github.com/ThreeDotsLabs/watermill/components/cqrs/command_bus.go

// ...
type CommandBusConfig struct {
	// GeneratePublishTopicはコマンドを公開するためのトピックを生成するために使用されます。
	GeneratePublishTopic CommandBusGeneratePublishTopicFn

	// OnSendはコマンドを公開する前に呼び出されます。
	// *message.Messageを修正することができます。
	//
	// このオプションは必須ではありません。
	OnSend CommandBusOnSendFn

	// Marshalerはコマンドの直列化および逆直列化に使用されます。
	// 必須です。
	Marshaler CommandEventMarshaler

	// ロガーインスタンスはログ出力に使用されます。
	// 提供されない場合、watermill.NopLoggerが使用されます。
	Logger watermill.LoggerAdapter
}

func (c *CommandBusConfig) setDefaults() {
	if c.Logger == nil {
		c.Logger = watermill.NopLogger{}
	}
}
// ...

コマンドプロセッサ

完全なソースコード: github.com/ThreeDotsLabs/watermill/components/cqrs/command_processor.go

// ...
// CommandProcessorSubscriberConstructorFnはCommandHandlerのためのサブスクライバーを作成するために使用されます。
// 各コマンドハンドラーに個別のカスタムサブスクライバーを作成することができます。
type CommandProcessorSubscriberConstructorFn func(CommandProcessorSubscriberConstructorParams) (message.Subscriber, error)
// ...

完全なソースコード: github.com/ThreeDotsLabs/watermill/components/cqrs/command_processor.go

// ...
type CommandProcessorConfig struct {
	// GenerateSubscribeTopicはコマンドに購読するためのトピックを生成するために使用されます。
	GenerateSubscribeTopic CommandProcessorGenerateSubscribeTopicFn

	// SubscriberConstructorはCommandHandlerのためにサブスクライバーを作成するために使用されます。
	SubscriberConstructor CommandProcessorSubscriberConstructorFn

	// OnHandleはコマンドの処理の前に呼び出されます。
	// OnHandleはミドルウェアのように動作します: コマンドの処理の前と後に追加のロジックを注入できます。
	//
	// そのため、params.Handler.Handle()を明示的に呼び出してコマンドを処理する必要があります。
	//  func(params CommandProcessorOnHandleParams) (err error) {
	//      // 処理前のロジック
	//      // (...)
	//
	//      err := params.Handler.Handle(params.Message.Context(), params.Command)
	//
	//      // 処理後のロジック
	//      // (...)
	//
	//      return err
	//  }
	//
	// このオプションは必須ではありません。
	OnHandle CommandProcessorOnHandleFn

	// Marshalerはコマンドの直列化および逆直列化に使用されます。
	// 必須です。
	Marshaler CommandEventMarshaler

	// ロガーインスタンスはログ出力に使用されます。
	// 提供されない場合、watermill.NopLoggerが使用されます。
	Logger watermill.LoggerAdapter

	// AckCommandHandlingErrorsがtrueの場合、CommandHandlerがエラーを返してもメッセージを認識します。
	// RequestReplyBackendがnullでなく応答の送信が失敗した場合、メッセージはまたしてもnackedされます。
	//
	// 警告: requestreplyコンポーネント(requestreply.NewCommandHandlerまたはrequestreply.NewCommandHandlerWithResultを使用している場合)、
	// 応答の送信が失敗した場合にコマンドを認識する可能性があるため、このオプションの使用は推奨されません。
	//
	// requestreplyを使用する場合は、requestreply.PubSubBackendConfig.AckCommandErrorsを使用する必要があります。
	AckCommandHandlingErrors bool

	// disableRouterAutoAddHandlersは後方互換性のために使用されます。
	// NewCommandProcessorでCommandProcessorを作成するときに設定されます。
	// Deprecated: NewCommandProcessorWithConfigに移行してください。
	disableRouterAutoAddHandlers bool
}

func (c *CommandProcessorConfig) setDefaults() {
	if c.Logger == nil {
		c.Logger = watermill.NopLogger{}
	}
}
// ...

コマンドプロセッサ

完全なソースコード: github.com/ThreeDotsLabs/watermill/components/cqrs/command_handler.go

// ...
// CommandHandlerはNewCommandで定義されたコマンドを受け取り、Handleメソッドを使用して処理します。
// DDDを使用する場合、CommandHandlerは集約を変更し、永続化する場合があります。
//
// EventHandlerとは異なり、各コマンドには1つのCommandHandlerのみが存在します。
//
// メッセージの処理中にCommandHandlerの1つのインスタンスを使用します。
// 複数のコマンドが同時に配信される場合、Handleメソッドは複数回同時に実行される可能性があります。
// したがって、Handleメソッドはスレッドセーフである必要があります!
type CommandHandler interface {
// ...

コマンドおよびイベントマーシャラー

完全なソースコード: github.com/ThreeDotsLabs/watermill/components/cqrs/marshaler.go

// ...
// CommandEventMarshalerはコマンドおよびイベントをWatermillメッセージにマーシャリングおよび逆マーシャリングし、その逆も行います。
// コマンドのペイロードは[]bytesにマーシャリングする必要があります。
type CommandEventMarshaler interface {
	// MarshalはコマンドまたはイベントをWatermillメッセージにマーシャリングします。
	Marshal(v interface{}) (*message.Message, error)

	// UnmarshalはWatermillメッセージをvコマンドまたはイベントにデコードします。
	Unmarshal(msg *message.Message, v interface{}) (err error)

	// Nameはコマンドまたはイベントの名前を返します。
	// この名前は、受信したコマンドまたはイベントが処理したいものであるかどうかを判断するために使用できます。
	Name(v interface{}) string

	// NameFromMessageはWatermillメッセージからコマンドまたはイベントの名前を返します(Marshalによって生成されたもの)。
	//
	// Watermillメッセージにマーシャリングされたコマンドまたはイベントがある場合、不必要なデコードを避けるために、Nameの代わりにNameFromMessageを使用する必要があります。
	NameFromMessage(msg *message.Message) string
}
// ...

使用法

例題ドメイン

ホテルでの部屋予約を処理する責務を持つ単純なドメインを使用します。

このドメインのモデルを示すためにイベントストーミングの記号を使用します。

記号の凡例:

  • 青色の付箋はコマンドです
  • オレンジ色の付箋はイベントです
  • 緑色の付箋はイベントから非同期に生成される読み取りモデルです
  • 紫色の付箋はイベントによってトリガーされ、コマンドを生成するポリシーです
  • ピンク色の付箋はホットスポットです。頻繁に問題が発生するエリアをマークします

CQRSイベントストーミング

このドメインは次のようになります:

  • 顧客は部屋を予約できます。
  • 部屋が予約されるたびに、顧客にビールを注文します(なぜなら、私たちはゲストを大切にするからです)。
    • 時々ビールがなくなることがあります。
  • 予約に基づいて財務レポートを生成します。

コマンドの送信

まず、顧客のアクションをシミュレートする必要があります。

完全なソースコード:github.com/ThreeDotsLabs/watermill/_examples/basic/5-cqrs-protobuf

// ...
		bookRoomCmd := &BookRoom{
			RoomId:    fmt.Sprintf("%d", i),
			GuestName: "John",
			StartDate: startDate,
			EndDate:   endDate,
		}
		if err := commandBus.Send(context.Background(), bookRoomCmd); err != nil {
			panic(err)
		}
// ...

コマンドハンドラ

BookRoomHandler はコマンドを処理します。

完全なソースコード: github.com/ThreeDotsLabs/watermill/_examples/basic/5-cqrs-protobuf/main.go

// ...
// BookRoomHandler は BookRoom コマンドを処理し、RoomBooked イベントを発行するコマンドハンドラです。
//
// CQRS では、コマンドはハンドラによって処理されなければなりません。
// このコマンドを処理するための別のハンドラを追加すると、エラーが返されます。
type BookRoomHandler struct {
	eventBus *cqrs.EventBus
}

func (b BookRoomHandler) HandlerName() string {
	return "BookRoomHandler"
}

// NewCommand はこのハンドラが処理すべきコマンドの型を返します。必ずポインタである必要があります。
func (b BookRoomHandler) NewCommand() interface{} {
	return &BookRoom{}
}

func (b BookRoomHandler) Handle(ctx context.Context, c interface{}) error {
	// c は常に `NewCommand` が返す型なので、型アサーションは常に安全です
	cmd := c.(*BookRoom)

	// 実際のプロダクションではより適切な方法で計算されるかもしれない、ランダムな価格
	price := (rand.Int63n(40) + 1) * 10

	log.Printf(
		"%s を予約しました、%s から %s まで",
		cmd.RoomId,
		cmd.GuestName,
		time.Unix(cmd.StartDate.Seconds, int64(cmd.StartDate.Nanos)),
		time.Unix(cmd.EndDate.Seconds, int64(cmd.EndDate.Nanos)),
	)

	// RoomBooked は OrderBeerOnRoomBooked イベントハンドラによって処理され、将来的には RoomBooked を複数のイベントハンドラで処理できます
	if err := b.eventBus.Publish(ctx, &RoomBooked{
		ReservationId: watermill.NewUUID(),
		RoomId:        cmd.RoomId,
		GuestName:     cmd.GuestName,
		Price:         price,
		StartDate:     cmd.StartDate,
		EndDate:       cmd.EndDate,
	}); err != nil {
		return err
	}

	return nil
}

// OrderBeerOnRoomBooked は RoomBooked イベントを処理し、OrderBeer コマンドを発行するイベントハンドラです。
// ...

イベントハンドラ

前述のように、部屋が予約されるたびにビールを注文したい(「ルームが予約されたとき」 とラベル付けされています)。これは OrderBeer コマンドを使用して実現されます。

完全なソースコード: github.com/ThreeDotsLabs/watermill/_examples/basic/5-cqrs-protobuf/main.go

// ...
// OrderBeerOnRoomBooked は RoomBooked イベントを処理し、OrderBeer コマンドを発行するイベントハンドラです。
type OrderBeerOnRoomBooked struct {
	commandBus *cqrs.CommandBus
}

func (o OrderBeerOnRoomBooked) HandlerName() string {
	// この名前は EventsSubscriberConstructor に渡され、キュー名を生成するために使用されます
	return "OrderBeerOnRoomBooked"
}

func (OrderBeerOnRoomBooked) NewEvent() interface{} {
	return &RoomBooked{}
}

func (o OrderBeerOnRoomBooked) Handle(ctx context.Context, e interface{}) error {
	event := e.(*RoomBooked)

	orderBeerCmd := &OrderBeer{
		RoomId: event.RoomId,
		Count:  rand.Int63n(10) + 1,
	}

	return o.commandBus.Send(ctx, orderBeerCmd)
}

// OrderBeerHandler は BookRoomHandler に非常に似ています。唯一の違いは、ビールが足りないときにときどきエラーを返し、コマンドを再発行することです。完全な実装は [example source code](https://github.com/ThreeDotsLabs/watermill/tree/master/_examples/basic/5-cqrs-protobuf/?utm_source=cqrs_doc) で見つけることができます。

イベントハンドラーグループ

デフォルトでは、各イベントハンドラーには別々のサブスクライバーインスタンスがあります。このアプローチは、トピックに送信されるイベントタイプが1つだけの場合は問題ありません。

トピックに複数のイベントタイプがある場合、次の2つのオプションがあります。

  1. EventConfig.AckOnUnknownEventtrue に設定すると、ハンドラーで処理されないすべてのイベントがアクノウリッジされます。
  2. イベントハンドラーグループメカニズムを使用できます。

イベントグループを使用するには、EventConfigGenerateHandlerGroupSubscribeTopic および GroupSubscriberConstructor オプションを設定する必要があります。

その後、EventProcessorAddHandlersGroup を使用できます。

完全なソースコード: github.com/ThreeDotsLabs/watermill/_examples/basic/5-cqrs-protobuf/main.go

// ...
	err = eventProcessor.AddHandlersGroup(
		"events",
		OrderBeerOnRoomBooked{commandBus},

		NewBookingsFinancialReport(),

		cqrs.NewGroupEventHandler(func(ctx context.Context, event *BeerOrdered) error {
			logger.Info("ビールが注文されました", watermill.LogFields{
				"room_id": event.RoomId,
			})
			return nil
		}),
	)
	if err != nil {
// ...

GenerateHandlerGroupSubscribeTopicGroupSubscriberConstructor の両方は、グループ名に関する情報を関数パラメーターとして受け取ります。

汎用ハンドラー

Watermill v1.3から、汎用ハンドラーを使用してコマンドやイベントを処理することができます。これは、多数のコマンド/イベントがある場合やそれぞれのためのハンドラーを作成したくない場合に非常に便利です。

完全なソースコード: github.com/ThreeDotsLabs/watermill/_examples/basic/5-cqrs-protobuf/main.go

// ...
		cqrs.NewGroupEventHandler(func(ctx context.Context, event *BeerOrdered) error {
			logger.Info("ビールが注文されました", watermill.LogFields{
				"room_id": event.RoomId,
			})
			return nil
		}),
// ...

裏側では、EventHandler または CommandHandler の実装を作成します。すべての種類のハンドラーに適しています。

完全なソースコード: github.com/ThreeDotsLabs/watermill/components/cqrs/command_handler.go

// ...
// NewCommandHandler は、提供された関数と関数パラメーターから推測されたコマンドタイプに基づいて新しい CommandHandler の実装を作成します。
func NewCommandHandler[Command any](
// ...

完全なソースコード: github.com/ThreeDotsLabs/watermill/components/cqrs/event_handler.go

// ...
// NewEventHandler は、提供された関数と関数パラメーターから推測されたイベントタイプに基づいて新しい EventHandler の実装を作成します。
func NewEventHandler[T any](
// ...

完全なソースコード: github.com/ThreeDotsLabs/watermill/components/cqrs/event_handler.go

// ...
// NewGroupEventHandler は、提供された関数と関数パラメーターから推測されたイベントタイプに基づいて新しい GroupEventHandler の実装を作成します。
func NewGroupEventHandler[T any](handleFunc func(ctx context.Context, event *T) error) GroupEventHandler {
// ...

イベントハンドラを使用したリードモデルの構築

完全なソースコード: github.com/ThreeDotsLabs/watermill/_examples/basic/5-cqrs-protobuf/main.go

// ...
// BookingsFinancialReportは、予約から得られる収益を計算するリードモデルです。
// RoomBookedイベントが発生した際にリッスンします。
//
// この実装では単純にメモリに書き込みます。本番環境では永続ストレージを使用することがあります。
type BookingsFinancialReport struct {
	handledBookings map[string]struct{}
	totalCharge     int64
	lock            sync.Mutex
}

func NewBookingsFinancialReport() *BookingsFinancialReport {
	return &BookingsFinancialReport{handledBookings: map[string]struct{}{}}
}

func (b BookingsFinancialReport) HandlerName() string {
	// この名前はEventsSubscriberConstructorに渡され、キューの名前を生成する際に使用されます
	return "BookingsFinancialReport"
}

func (BookingsFinancialReport) NewEvent() interface{} {
	return &RoomBooked{}
}

func (b *BookingsFinancialReport) Handle(ctx context.Context, e interface{}) error {
	// 同時に呼び出される場合があるため、スレッドセーフが必要です。
	b.lock.Lock()
	defer b.lock.Unlock()

	event := e.(*RoomBooked)

	// 正確な一度限りの配信セマンティクスを提供しないパブ/サブを使用する場合、メッセージの重複排除が必要です。
	// GoChannelパブ/サブは正確な一度限りの配信を提供しますが、他のパブ/サブ実装にも対応するためにこの例を準備しましょう。
	if _, ok := b.handledBookings[event.ReservationId]; ok {
		return nil
	}
	b.handledBookings[event.ReservationId] = struct{}{}

	b.totalCharge += event.Price

	fmt.Printf(">>> $%dで部屋が予約されました\n", b.totalCharge)
	return nil
}

var amqpAddress = "amqp://guest:guest@rabbitmq:5672/"

func main() {
// ...

すべてを接続する

CQRSアプリケーションを構築するために必要なすべてのコンポーネントを既に持っています。

メッセージブローカーとしてAMQP(RabbitMQ)を使用します。

CQRSは、Watermillのメッセージルーターを使用しています。これに慣れていない場合は、その動作原理を理解したいと思うかもしれません。初めての方向けのガイドをチェックしてみてください。通常のメッセージングパターン(メトリクス、毒メッセージキュー、レート制限、相関など)の使用方法も示されます。これらのツールは既にWatermillに組み込まれています。

CQRSに戻りましょう。CQRSは、コマンドやイベントのバス、プロセッサなどの複数のコンポーネントで構成されていることをすでに知っています。

完全なソースコード: github.com/ThreeDotsLabs/watermill/_examples/basic/5-cqrs-protobuf/main.go

// ...
func main() {
	logger := watermill.NewStdLogger(false, false)
	cqrsMarshaler := cqrs.ProtobufMarshaler{}

	// You can use any Pub/Sub implementation from here: https://watermill.io/pubsubs/
	// Detailed RabbitMQ implementation: https://watermill.io/pubsubs/amqp/
	// Commands will be send to queue, because they need to be consumed once.
	commandsAMQPConfig := amqp.NewDurableQueueConfig(amqpAddress)
	commandsPublisher, err := amqp.NewPublisher(commandsAMQPConfig, logger)
	if err != nil {
		panic(err)
	}
	commandsSubscriber, err := amqp.NewSubscriber(commandsAMQPConfig, logger)
	if err != nil {
		panic(err)
	}

	// Events will be published to PubSub configured Rabbit, because they may be consumed by multiple consumers.
	// (in that case BookingsFinancialReport and OrderBeerOnRoomBooked).
	eventsPublisher, err := amqp.NewPublisher(amqp.NewDurablePubSubConfig(amqpAddress, nil), logger)
	if err != nil {
		panic(err)
	}

	// CQRS is built on messages router. Detailed documentation: https://watermill.io/docs/messages-router/
	router, err := message.NewRouter(message.RouterConfig{}, logger)
	if err != nil {
		panic(err)
	}

	// Simple middleware which will recover panics from event or command handlers.
	// More about router middlewares you can find in the documentation:
	// https://watermill.io/docs/messages-router/#middleware
	//
	// List of available middlewares you can find in message/router/middleware.
	router.AddMiddleware(middleware.Recoverer)

	commandBus, err := cqrs.NewCommandBusWithConfig(commandsPublisher, cqrs.CommandBusConfig{
		GeneratePublishTopic: func(params cqrs.CommandBusGeneratePublishTopicParams) (string, error) {
			// we are using queue RabbitMQ config, so we need to have topic per command type
			return params.CommandName, nil
		},
		OnSend: func(params cqrs.CommandBusOnSendParams) error {
			logger.Info("Sending command", watermill.LogFields{
				"command_name": params.CommandName,
			})

			params.Message.Metadata.Set("sent_at", time.Now().String())

			return nil
		},
		Marshaler: cqrsMarshaler,
		Logger:    logger,
	})
	if err != nil {
		panic(err)
	}

	commandProcessor, err := cqrs.NewCommandProcessorWithConfig(
		router,
		cqrs.CommandProcessorConfig{
			GenerateSubscribeTopic: func(params cqrs.CommandProcessorGenerateSubscribeTopicParams) (string, error) {
				// we are using queue RabbitMQ config, so we need to have topic per command type
				return params.CommandName, nil
			},
			SubscriberConstructor: func(params cqrs.CommandProcessorSubscriberConstructorParams) (message.Subscriber, error) {
				// we can reuse subscriber, because all commands have separated topics
				return commandsSubscriber, nil
			},
			OnHandle: func(params cqrs.CommandProcessorOnHandleParams) error {
				start := time.Now()

				err := params.Handler.Handle(params.Message.Context(), params.Command)

				logger.Info("Command handled", watermill.LogFields{
					"command_name": params.CommandName,
					"duration":     time.Since(start),
					"err":          err,
				})

				return err
			},
			Marshaler: cqrsMarshaler,
			Logger:    logger,
		},
	)
	if err != nil {
		panic(err)
	}

	eventBus, err := cqrs.NewEventBusWithConfig(eventsPublisher, cqrs.EventBusConfig{
		GeneratePublishTopic: func(params cqrs.GenerateEventPublishTopicParams) (string, error) {
			// because we are using PubSub RabbitMQ config, we can use one topic for all events
			return "events", nil

			// we can also use topic per event type
			// return params.EventName, nil
		},

		OnPublish: func(params cqrs.OnEventSendParams) error {
			logger.Info("Publishing event", watermill.LogFields{
				"event_name": params.EventName,
			})

			params.Message.Metadata.Set("published_at", time.Now().String())

			return nil
		},

		Marshaler: cqrsMarshaler,
		Logger:    logger,
	})
	if err != nil {
		panic(err)
	}

	eventProcessor, err := cqrs.NewEventGroupProcessorWithConfig(
		router,
		cqrs.EventGroupProcessorConfig{
			GenerateSubscribeTopic: func(params cqrs.EventGroupProcessorGenerateSubscribeTopicParams) (string, error) {
				return "events", nil
			},
			SubscriberConstructor: func(params cqrs.EventGroupProcessorSubscriberConstructorParams) (message.Subscriber, error) {
				config := amqp.NewDurablePubSubConfig(
					amqpAddress,
					amqp.GenerateQueueNameTopicNameWithSuffix(params.EventGroupName),
				)

				return amqp.NewSubscriber(config, logger)
			},

			OnHandle: func(params cqrs.EventGroupProcessorOnHandleParams) error {
				start := time.Now()

				err := params.Handler.Handle(params.Message.Context(), params.Event)

				logger.Info("Event handled", watermill.LogFields{
					"event_name": params.EventName,
					"duration":   time.Since(start),
					"err":        err,
				})

				return err
			},

			Marshaler: cqrsMarshaler,
			Logger:    logger,
		},
	)
	if err != nil {
		panic(err)
	}

	err = commandProcessor.AddHandlers(
		BookRoomHandler{eventBus},
		OrderBeerHandler{eventBus},
	)
	if err != nil {
		panic(err)
	}

	err = eventProcessor.AddHandlersGroup(
		"events",
		OrderBeerOnRoomBooked{commandBus},

		NewBookingsFinancialReport(),

		cqrs.NewGroupEventHandler(func(ctx context.Context, event *BeerOrdered) error {
			logger.Info("Beer ordered", watermill.LogFields{
				"room_id": event.RoomId,
			})
			return nil
		}),
	)
	if err != nil {
		panic(err)
	}

	// publish BookRoom commands every second to simulate incoming traffic
	go publishCommands(commandBus)

	// processors are based on router, so they will work when router will start
	if err := router.Run(context.Background()); err != nil {
		panic(err)
	}
}
// ...

それで終わりです。実行可能なCQRSアプリケーションがあります。