PublisherSubscriber は、Watermill の下位レベルのパーツです。実際のアプリケーションでは、通常、関連付け、メトリクス、ポイズンメッセージキュー、リトライ、レート制限など、高レベルのインターフェースや機能を使用したいと思うでしょう。

時々、処理が成功した場合に Ack を送信したくないことがあります。また、あるメッセージが処理された後に別のメッセージを送信したい場合もあります。

これらの要件を満たすために、Router と呼ばれるコンポーネントがあります。

Watermill Router

設定

完全なソースコード: github.com/ThreeDotsLabs/watermill/message/router.go

// ...
type RouterConfig struct {
	// CloseTimeout は、ルーターがハンドラーのクローズ時に作業する時間を決定します。
	CloseTimeout time.Duration
}

func (c *RouterConfig) setDefaults() {
	if c.CloseTimeout == 0 {
		c.CloseTimeout = time.Second * 30
	}
}

// Validate は、ルーターの構成にエラーがあるかどうかをチェックします。
func (c RouterConfig) Validate() error {
	return nil
}
// ...

ハンドラ

まず、HandlerFunc 関数を実装する必要があります:

完全なソースコード: github.com/ThreeDotsLabs/watermill/message/router.go

// ...

// HandlerFunc はメッセージを受信したときに呼び出される関数です。
// 
// HandlerFunc がエラーを返さないときは、msg.Ack() が自動的に呼び出されます。
// 
// HandlerFunc がエラーを返すと、msg.Nack() が呼び出されます。
// 
// ハンドラ内で msg.Ack() が呼び出され、かつ HandlerFunc がエラーを返したとき、
// すでに Ack が送信されているため、msg.Nack() は送信されません。
// 
// 複数のメッセージを受信するとき(HandlerFunc で msg.Ack() が送信されるか、
// 複数のコンシューマをサポートする Subscriber のため)、
// HandlerFunc は並行して実行されます。
type HandlerFunc func(msg *Message) ([]*Message, error)

// ...

次に、Router.AddHandler を使用して新しいハンドラを追加する必要があります:

完全なソースコード: github.com/ThreeDotsLabs/watermill/message/router.go

// ...

// AddHandler は新しいハンドラを追加します。

// handlerName はユニークである必要があります。現在はデバッグ目的でのみ使用されます。

// subscribeTopic はハンドラがメッセージを受信するトピックです。

// publishTopic はハンドラから返されたメッセージが Router によって生成されるトピックです。

// ハンドラが複数のトピックに公開する必要がある場合は、
// ハンドラに Publisher を注入するか、メタデータに基づいてメッセージをキャプチャし、特定のトピックに公開するミドルウェアを実装することを推奨します。

// ルータがすでに実行中の状態でハンドラを追加する場合は、RunHandlers() を明示的に呼び出す必要があります。

func (r *Router) AddHandler(

	handlerName string,

	subscribeTopic string,

	subscriber Subscriber,

	publishTopic string,

	publisher Publisher,

	handlerFunc HandlerFunc,

) *Handler {

	r.logger.Info("ハンドラを追加中", watermill.LogFields{

		"ハンドラ名": handlerName,

		"トピック":        subscribeTopic,

	})

	r.handlersLock.Lock()

	defer r.handlersLock.Unlock()

	if _, ok := r.handlers[handlerName]; ok {

		panic(DuplicateHandlerNameError{handlerName})

	}

	publisherName, subscriberName := internal.StructName(publisher), internal.StructName(subscriber)

	newHandler := &handler{

		name:   handlerName,

		logger: r.logger,

		subscriber:     subscriber,

		subscribeTopic: subscribeTopic,

		subscriberName: subscriberName,

		publisher:     publisher,

		publishTopic:  publishTopic,

		publisherName: publisherName,

		handlerFunc: handlerFunc,

		runningHandlersWg:     r.runningHandlersWg,

		runningHandlersWgLock: r.runningHandlersWgLock,

		messagesCh:     nil,

		routersCloseCh: r.closingInProgressCh,

		startedCh: make(chan struct{}),

	}

	r.handlersWg.Add(1)

	r.handlers[handlerName] = newHandler

	select {

	case r.handlerAdded struct{}{}:

	default:

		// closeWhenAllHandlersStopped は常に handlerAdded を待っていない

	}

	return &Handler{

		router:  r,

		handler: newHandler,

	}

}

// AddNoPublisherHandler は新しいハンドラを追加します。

// このハンドラはメッセージを返すことができません。

// メッセージが返された場合、エラーが発生し、Nack が送信されます。

//

// handlerName はユニークである必要があります。現在はデバッグ目的でのみ使用されます。

// subscribeTopic はハンドラがメッセージを受信するトピックです。

// subscriber はメッセージを消費するために使用される Subscriber です。

// ルータがすでに実行中の状態でハンドラを追加する場合は、RunHandlers() を明示的に呼び出す必要があります。

func (r *Router) AddNoPublisherHandler(

	handlerName string,

	subscribeTopic string,

	subscriber Subscriber,

handlerFunc NoPublishHandlerFunc,

) *Handler {

handlerFuncAdapter := func(msg *Message) ([]*Message, error) {
// ...

"はじめての使い方" の使用例を参照してください。 完全なソースコード: github.com/ThreeDotsLabs/watermill/_examples/basic/3-router/main.go

// ...
	// AddHandlerは、ハンドラレベルのミドルウェアを追加したり、ハンドラを停止したりするために使用できるハンドラを返します。
	handler := router.AddHandler(
		"struct_handler",          // ハンドラ名、ユニークである必要があります
		"incoming_messages_topic", // イベントが読み込まれるトピック
		pubSub,
		"outgoing_messages_topic", // イベントを公開するトピック
		pubSub,
		structHandler{}.Handler,
	)

	// ハンドラレベルのミドルウェアは特定のハンドラにのみ実行されます
	// このミドルウェアは、ルーターレベルのミドルウェアと同じ方法で追加することができます
	handler.AddMiddleware(func(h message.HandlerFunc) message.HandlerFunc {
		return func(message *message.Message) ([]*message.Message, error) {
			log.Println("ハンドラ固有のミドルウェアを実行中、メッセージUUID: ", message.UUID)

			return h(message)
		}
	})
// ...

パブリッシャーハンドラーなし

すべてのハンドラーが新しいメッセージを生成するわけではありません。Router.AddNoPublisherHandler を使用して、この種のハンドラーを追加できます。

完全なソースコード:github.com/ThreeDotsLabs/watermill/message/router.go

// ...
// AddNoPublisherHandler は新しいハンドラーを追加します。
// このハンドラーはメッセージを返すことができません。
// メッセージを返すとエラーが発生し、Nack が送信されます。
//
// handlerName はユニークでなければなりません。現在はデバッグ目的でのみ使用されます。
//
// subscribeTopic はハンドラーがメッセージを受け取るトピックです。
//
// subscriber はメッセージを消費するために使用されます。
//
// 既に実行中のルーターにハンドラーを追加する場合は、明示的に RunHandlers() を呼び出す必要があります。
func (r *Router) AddNoPublisherHandler(
	handlerName string,
	subscribeTopic string,
	subscriber Subscriber,
	handlerFunc NoPublishHandlerFunc,
) *Handler {
	handlerFuncAdapter := func(msg *Message) ([]*Message, error) {
// ...
}

アクノリジメント

デフォルトでは、HanderFunc がエラーを返さない場合は msg.Ack() が呼び出されます。エラーが返されると msg.Nack() が呼び出されます。つまり、メッセージの処理が終わった後は、msg.Ack()msg.Nack() を呼び出す必要はありません(もちろん、必要に応じて呼び出すこともできます)。

メッセージの生成

ハンドラーが複数のメッセージを返す場合、ほとんどのパブリッシャーの実装はメッセージの一括配信をサポートしていません。ブローカーやストレージが利用できない場合、一部のメッセージのみが生成され、msg.Nack() が送信されます。

これが問題である場合は、各ハンドラーが1つのメッセージのみを配信するように考慮してください。

ルーターの実行

ルーターを実行するには、Run() を呼び出す必要があります。

完全なソースコード:github.com/ThreeDotsLabs/watermill/message/router.go

// ...
// Run はすべてのプラグインとハンドラーを実行し、指定されたトピックに対して購読を開始します。
// この呼び出しは、ルーターが実行中の間ブロックします。
//
// すべてのハンドラーが停止したとき(たとえば購読が終了された場合)、ルーターも停止します。
//
// Run() を停止するには、ルーターで Close() を呼び出す必要があります。
//
// ctx はすべてのサブスクライバーに伝播されます。
//
// すべてのハンドラーが停止したとき(たとえば接続が閉じられた場合)、Run() も停止します。
func (r *Router) Run(ctx context.Context) (err error) {
// ...
}

ルーターの実行を確認する

ルーターが実行中かどうかを理解することは役立ちます。これは Running() メソッドを使用して実現できます。

完全なソースコード:github.com/ThreeDotsLabs/watermill/message/router.go

// ...
// Running はルーターが実行中のときに閉じます。
// 言い換えると、次のようにしてルーターが実行中であることを待つことができます:

// 	fmt.Println("ルーターを起動中")
//	go r.Run(ctx)
//	//	fmt.Println("ルーターが実行されています")

// 注意: 歴史的な理由から、このチャンネルはルーターのシャットダウンについては知りません - ルーターが動作し続けてからシャットダウンする場合、チャンネルは閉じられます。
func (r *Router) Running() chan struct{} {
// ...
}

また、ブール値を返す IsRunning 関数を使用することもできます。

完全なソースコード:github.com/ThreeDotsLabs/watermill/message/router.go

// ...
// IsRunning は、ルーターが実行中の場合に true を返します。
//
// 注意: 歴史的な理由から、このメソッドはルーターが終了したかどうかを把握しません。
// ルーターが閉じられたかどうかを知りたい場合は、IsClosed を使用してください。
func (r *Router) IsRunning() bool {
// ...
}

ルーターのシャットダウン

ルーターをシャットダウンするには、Close() を呼び出す必要があります。

完全なソースコード: github.com/ThreeDotsLabs/watermill/message/router.go

// ...
// Close は設定で提供されたタイムアウトでルーターを優雅に閉じます。
func (r *Router) Close() error {
	r.closedLock.Lock()
// ...

Close() はすべてのパブリッシャーとサブスクライバーをシャットダウンし、すべてのハンドラーが完了するのを待ちます。

Close() は設定で設定されたタイムアウト(RouterConfig.CloseTimeout)の時間を待ちます。タイムアウトに達すると、Close() はエラーを返します。

ルーターの起動後にハンドラーを追加する

ルーターが既に起動している場合は、新しいハンドラーを追加できます。これには、AddNoPublisherHandler または AddHandler を呼び出してから RunHandlers を呼び出す必要があります。

完全なソースコード: github.com/ThreeDotsLabs/watermill/message/router.go

// ...
// RunHandlers は Run() の後に追加されたすべてのハンドラーを実行します。
// RunHandlers は冪等性があり、複数回安全に呼び出すことができます。
func (r *Router) RunHandlers(ctx context.Context) error {
// ...

実行中のハンドラーの停止

Stop() を呼び出すことで、実行中のハンドラーを1つだけ停止することができます。

ルーターに実行中のハンドラーがない場合、ルーターはシャットダウンします。

完全なソースコード: github.com/ThreeDotsLabs/watermill/message/router.go

// ...
// Stop はハンドラーを停止します。
// Stop は非同期です。
// ハンドラーが停止したかどうかは Stopped() 関数で確認できます。
func (h *Handler) Stop() {
// ...

実行モデル

サブスクライバー は単一のメッセージを順次消費するか、複数のメッセージを並列で消費することができます。

  • 単一メッセージフロー は、msg.Ack() が呼び出されるまでサブスクライバーが新しいメッセージを受信しない最も単純な方法です。
  • 複数メッセージフロー は特定のサブスクライバーのみがサポートしています。複数のトピックパーティションに同時にサブスクライブすることで、複数のメッセージを並列で消費することができます。また、以前に確認されていないメッセージ(たとえば Kafka サブスクライバーの動作)さえも消費できます。ルーターはこのモデルをHandlerFunc を並列で実行することで処理します。

サポートされる実行モデルを理解するために、選択した Pub/Sub のドキュメントを参照してください。

ミドルウェア

完全なソースコード: github.com/ThreeDotsLabs/watermill/message/router.go

// ...
// HandlerMiddleware は HandlerFunc のデコレータのようなものを書くことを可能にします。
// ハンドラーの前でいくつかの操作を実行したり(受信メッセージの修正など)、ハンドラーの後で(生成されたメッセージの修正、受信メッセージの Ack/Nack、エラーの処理、ログなど)を実行できます。
//
// これは `AddMiddleware` メソッドを使用してルーターにアタッチできます。
//
// 例:
//
// 	func ExampleMiddleware(h message.HandlerFunc) message.HandlerFunc {
// 		return func(message *message.Message) ([]*message.Message, error) {
// 			fmt.Println("handler の前に実行")
// 			producedMessages, err := h(message)
// 			fmt.Println("handler の後に実行")
//
// 			return producedMessages, err
// 		}
// 	}
type HandlerMiddleware func(h HandlerFunc) HandlerFunc

// ...

標準のミドルウェアの完全なリストは Middlewares で見つけることができます。

プラグイン

完全なソースコード: github.com/ThreeDotsLabs/watermill/message/router.go

// ...
// RouterPlugin はルーターの起動時に実行される関数です。
type RouterPlugin func(*Router) error

// ...

標準のプラグインの完全なリストは message/router/plugin で見つけることができます。

コンテキスト

ハンドラーによって受信された各メッセージの contextには、いくつかの有用な値が格納されています。

完全なソースコード:github.com/ThreeDotsLabs/watermill/message/router_context.go

// ...
// HandlerNameFromCtxは、コンテキストからメッセージハンドラーの名前を取得します。
func HandlerNameFromCtx(ctx context.Context) string {
	return valFromCtx(ctx, handlerNameKey)
}

// PublisherNameFromCtxは、コンテキストからルーター内のメッセージパブリッシャータイプの名前を返します。
// たとえば、Kafkaの場合、`kafka.Publisher`になります。
func PublisherNameFromCtx(ctx context.Context) string {
	return valFromCtx(ctx, publisherNameKey)
}

// SubscriberNameFromCtxは、コンテキストからルーター内のメッセージサブスクライバータイプの名前を返します。
// たとえば、Kafkaの場合、`kafka.Subscriber`になります。
func SubscriberNameFromCtx(ctx context.Context) string {
	return valFromCtx(ctx, subscriberNameKey)
}

// SubscribeTopicFromCtxは、コンテキストからメッセージが受信されたトピックを返します。
func SubscribeTopicFromCtx(ctx context.Context) string {
	return valFromCtx(ctx, subscribeTopicKey)
}

// PublishTopicFromCtxは、コンテキストからメッセージが公開されるトピックを返します。
func PublishTopicFromCtx(ctx context.Context) string {
	return valFromCtx(ctx, publishTopicKey)
}
// ...