はじめに

ミドルウェアはイベントフレームワークを拡張し、カスタム機能を追加し、メインハンドラのロジックとは関係のない重要な機能を提供するために使用されます。例えば、エラーを返した後にハンドラを再試行したり、パニックから回復し、ハンドラ内でスタックトレースをキャプチャしたりします。

ミドルウェアの関数シグネチャは以下のように定義されています:

ソースコード全文: github.com/ThreeDotsLabs/watermill/message/router.go

// ...
// HandlerMiddlewareを使用すると、ハンドラと同様のデコレータを記述できます。
// ハンドラの前でいくつかの操作を実行することができます(例: 受信メッセージの変更)。
// ハンドラの後にもいくつかの操作を実行することができます(生成されたメッセージの変更、受信メッセージのACK/NACK、エラーの処理、ロギングなど)。
//
// これは、`AddMiddleware`メソッドを使用してルータに追加できます。
//
// 例:
//
//	func ExampleMiddleware(h message.HandlerFunc) message.HandlerFunc {
//		return func(message *message.Message) ([]*message.Message, error) {
//			fmt.Println("ハンドラの実行前")
//			生成されたメッセージ, エラー := h(message)
//			fmt.Println("ハンドラの実行後")
//
//			return 生成されたメッセージ, エラー
//		}
//	}
type HandlerMiddleware func(h HandlerFunc) HandlerFunc
// ...

使用方法

ミドルウェアは、ルーターのすべてのハンドラまたは特定のハンドラに適用することができます。ミドルウェアをルーターに直接追加した場合、そのミドルウェアはルーターに提供されるすべてのハンドラに適用されます。ミドルウェアが特定のハンドラにのみ適用される場合、それはルーターのハンドラに追加する必要があります。

以下は使用例です:

完全なソースコード: github.com/ThreeDotsLabs/watermill/_examples/basic/3-router/main.go

// ...
	router, err := message.NewRouter(message.RouterConfig{}, logger)
	if err != nil {
		panic(err)
	}

	// SIGTERMシグナルを受信すると、SignalsHandlerはrouterをきれいに閉じます。
	// routerを閉じるには、`r.Close()`を呼び出すこともできます。
	router.AddPlugin(plugin.SignalsHandler)

	// ルーターレベルのミドルウェアは、routerに送信されるすべてのメッセージで実行されます
	router.AddMiddleware(
		// CorrelationIDは、着信メッセージのメタデータから相関IDを生成されたメッセージにコピーします
		middleware.CorrelationID,

		// ハンドラがエラーを返す場合、リトライされます。
		// 最大でMaxRetries回、その後メッセージはNackされ、PubSubによって再送信されます。
		middleware.Retry{
			MaxRetries:      3,
			InitialInterval: time.Millisecond * 100,
			Logger:          logger,
		}.Middleware,

		// Recovererはハンドラ内のパニックを処理します。
		// この場合、それらをエラーとしてRetryミドルウェアに渡します。
		middleware.Recoverer,
	)

	// 簡単のため、ここではgochannel Pub/Subを使用していますが、
	// 任意のPub/Sub実装で置き換えることができます。
	pubSub := gochannel.NewGoChannel(gochannel.Config{}, logger)

	// バックグラウンドでいくつかの着信メッセージを公開します
	go publishMessages(pubSub)

	// AddHandlerは、ハンドラレベルのミドルウェアを追加したり、ハンドラを停止したりするために使用できます
	// また、ハンドラ名、着信メッセージのトピック、Pub/Subなど必要な情報を指定します
	handler := router.AddHandler(
		"struct_handler",          // ハンドラ名、ユニークである必要があります
		"incoming_messages_topic", // イベントが読み込まれるトピック
		pubSub,
		"outgoing_messages_topic", // イベントが公開されるトピック
		pubSub,
		structHandler{}.Handler,
	)

	// ハンドラレベルのミドルウェアは特定のハンドラにのみ実行されます
	// このようなミドルウェアは、ルーターレベルのミドルウェアと同様にハンドラに追加することができます
	handler.AddMiddleware(func(h message.HandlerFunc) message.HandlerFunc {
		return func(message *message.Message) ([]*message.Message, error) {
			log.Println("メッセージ", message.UUID, "に対するハンドラ固有のミドルウェアを実行します")

			return h(message)
		}
	})

	// デバッグ目的でのみ、`incoming_messages_topic`で受信したすべてのメッセージを印刷します
	router.AddNoPublisherHandler(
		"print_incoming_messages",
		"incoming_messages_topic",
		pubSub,
		printMessages,
	)

	// デバッグ目的でのみ、`outgoing_messages_topic`に送信されたすべてのイベントを印刷します
	router.AddNoPublisherHandler(
		"print_outgoing_messages",
		"outgoing_messages_topic",
		pubSub,
		printMessages,
	)

	// すべてのハンドラが登録されたので、routerを実行できます。
	// routerが実行されるまでRunはブロックされます。
// ...

利用可能なミドルウェア

ここではWatermillが提供する再利用可能なミドルウェアを紹介します。また、独自のミドルウェアを簡単に実装することもできます。例えば、特定のログ形式で各着信メッセージを保存したい場合、これが最良の方法です。

サーキットブレーカー

// CircuitBreakerはハンドラをサーキットブレーカーでラップするミドルウェアです。
// 構成に基づいて、ハンドラがエラーを続けて返す場合、サーキットブレーカーは高速に失敗します。
// これは連鎖的障害を防ぐのに役立ちます。
type CircuitBreaker struct {
    cb *gobreaker.CircuitBreaker
}
// NewCircuitBreakerは新しいCircuitBreakerミドルウェアを返します。
// 利用可能な設定については、gobreakerのドキュメントを参照してください。
func NewCircuitBreaker(settings gobreaker.Settings) CircuitBreaker {
    return CircuitBreaker{
        cb: gobreaker.NewCircuitBreaker(settings),
    }
}
// MiddlewareはCircuitBreakerミドルウェアを返します。
func (c CircuitBreaker) Middleware(h message.HandlerFunc) message.HandlerFunc {
    return func(msg *message.Message) ([]*message.Message, error) {
        out, err := c.cb.Execute(func() (interface{}, error) {
            return h(msg)
        })

        var result []*message.Message
        if out != nil {
            result = out.([]*message.Message)
        }

        return result, err
    }
}

相関

// SetCorrelationIDはメッセージの相関IDを設定します。
//
// メッセージがシステムに入るとき、SetCorrelationIDを呼び出す必要があります。
// リクエスト(例:HTTP)でメッセージが生成される場合、メッセージの相関IDはリクエストの相関IDと同じであるべきです。
func SetCorrelationID(id string, msg *message.Message) {
    if MessageCorrelationID(msg) != "" {
        return
    }

    msg.Metadata.Set(CorrelationIDMetadataKey, id)
}
// MessageCorrelationIDはメッセージから相関IDを返します。
func MessageCorrelationID(message *message.Message) string {
    return message.Metadata.Get(CorrelationIDMetadataKey)
}
// CorrelationIDはハンドラによって生成されたすべてのメッセージに相関IDを追加します。
// そのIDはハンドラが受け取ったメッセージIDに基づいています。
//
// CorrelationIDが正しく機能するためには、最初にメッセージがシステムに入る際にSetCorrelationIDを呼び出す必要があります。
func CorrelationID(h message.HandlerFunc) message.HandlerFunc {
    return func(message *message.Message) ([]*message.Message, error) {
        producedMessages, err := h(message)

        correlationID := MessageCorrelationID(message)
        for _, msg := range producedMessages {
            SetCorrelationID(correlationID, msg)
        }

        return producedMessages, err
    }
}

デュプリケータ

// Duplicatorはエンドポイントが冪等性を確保するためにメッセージを二重に処理します。
func Duplicator(h message.HandlerFunc) message.HandlerFunc {
    return func(msg *message.Message) ([]*message.Message, error) {
        firstProducedMessages, firstErr := h(msg)
        if firstErr != nil {
            return nil, firstErr
        }

        secondProducedMessages, secondErr := h(msg)
        if secondErr != nil {
            return nil, secondErr
        }

        return append(firstProducedMessages, secondProducedMessages...), nil
    }
}

エラーを無視

// IgnoreErrorsはハンドラが特定の明示的に定義されたエラーを無視することを可能にするミドルウェアを提供します。
type IgnoreErrors struct {
    ignoredErrors map[string]struct{}
}
// NewIgnoreErrorsは新しいIgnoreErrorsミドルウェアを作成します。
func NewIgnoreErrors(errs []error) IgnoreErrors {
    errsMap := make(map[string]struct{}, len(errs))

    for _, err := range errs {
        errsMap[err.Error()] = struct{}{}
    }

    return IgnoreErrors{errsMap}
}
// MiddlewareはIgnoreErrorsミドルウェアを返します。
func (i IgnoreErrors) Middleware(h message.HandlerFunc) message.HandlerFunc {
    return func(msg *message.Message) ([]*message.Message, error) {
        events, err := h(msg)
        if err != nil {
            if _, ok := i.ignoredErrors[errors.Cause(err).Error()]; ok {
                return events, nil
            }

            return events, err
        }

        return events, nil
    }
}

インスタントAck

// InstantAckはハンドラが即座にエラーにかかわらず受信メッセージを直ちに確認するようにします。
// スループットを向上させるために使用できますが、そのトレードオフは次のとおりです:
// 正確に一度だけの配信を確保する必要がある場合、少なくとも一度配信される可能性があります。
// 順序付けられたメッセージが必要な場合、順序付けが崩れる可能性があります。
func InstantAck(h message.HandlerFunc) message.HandlerFunc {
	return func(message *message.Message) ([]*message.Message, error) {
		message.Ack()
		return h(message)
	}
}

Poison(毒)

// PoisonQueueは処理できないメッセージを扱い、それらを別のトピックに公開するミドルウェア機能を提供します。
// その後、メインのミドルウェアチェーンは実行を続け、業務は通常通り進行します。
func PoisonQueue(pub message.Publisher, topic string) (message.HandlerMiddleware, error) {
	if topic == "" {
		return nil, ErrInvalidPoisonQueueTopic
	}

	pq := poisonQueue{
		topic: topic,
		pub:   pub,
		shouldGoToPoisonQueue: func(err error) bool {
			return true
		},
	}

	return pq.Middleware, nil
}

// PoisonQueueWithFilterは、PoisonQueueと似ていますが、毒キューの条件を満たすエラーを決定するための関数を受け入れます。
func PoisonQueueWithFilter(pub message.Publisher, topic string, shouldGoToPoisonQueue func(err error) bool) (message.HandlerMiddleware, error) {
	if topic == "" {
		return nil, ErrInvalidPoisonQueueTopic
	}

	pq := poisonQueue{
		topic: topic,
		pub:   pub,
		shouldGoToPoisonQueue: shouldGoToPoisonQueue,
	}

	return pq.Middleware, nil
}

Random Fail(ランダムな失敗)

// RandomFailは、ハンドラをランダムな確率で失敗させます。エラー確率は範囲(0、1)内である必要があります。
func RandomFail(errorProbability float32) message.HandlerMiddleware {
	return func(h message.HandlerFunc) message.HandlerFunc {
		return func(message *message.Message) ([]*message.Message, error) {
			if shouldFail(errorProbability) {
				return nil, errors.New("ランダムなエラーが発生しました")
			}
			return h(message)
		}
	}
}

// RandomPanicは、ハンドラをランダムな確率でパニックさせます。パニック確率は範囲(0、1)内である必要があります。
func RandomPanic(panicProbability float32) message.HandlerMiddleware {
	return func(h message.HandlerFunc) message.HandlerFunc {
		return func(message *message.Message) ([]*message.Message, error) {
			if shouldFail(panicProbability) {
				panic("ランダムなパニックが発生しました")
			}
			return h(message)
		}
	}
}

Recoverer(リカバラー)

// RecoveredPanicErrorは復帰したパニックのエラーとそのスタックトレース情報を保持します。
type RecoveredPanicError struct {
	V          interface{}
	Stacktrace string
}

// Recovererはハンドラからの任意のパニックを回復し、スタックトレース付きのRecoveredPanicErrorをハンドラから返されるエラーに添付します。
func Recoverer(h message.HandlerFunc) message.HandlerFunc {
	return func(event *message.Message) (events []*message.Message, err error) {
		panicked := true

		defer func() {
			if r := recover(); r != nil || panicked {
				err = errors.WithStack(RecoveredPanicError{V: r, Stacktrace: string(debug.Stack())})
			}
		}()

		events, err = h(event)
		panicked = false
		return events, err
	}
}

リトライ

// Retryは、エラーが返された場合にハンドラをリトライするミドルウェアを提供します。
// リトライの動作、指数バックオフ、および最大経過時間を構成できます。
type Retry struct {
	// MaxRetriesは行われる試行の最大回数です。
	MaxRetries int

	// InitialIntervalはリトライ間の最初の間隔です。後続の間隔は乗数によってスケーリングされます。
	InitialInterval time.Duration
	// MaxIntervalはリトライの指数バックオフの上限を設定します。
	MaxInterval time.Duration
	// Multiplierはリトライ間の待機間隔が乗算される係数です。
	Multiplier float64
	// MaxElapsedTimeはリトライの最大時間制限を設定します。0の場合、無効になります。
	MaxElapsedTime time.Duration
	// RandomizationFactorはバックオフ時間を次の範囲内でランダムに広げます:
	// [currentInterval * (1 - randomization_factor), currentInterval * (1 + randomization_factor)].
	RandomizationFactor float64

	// OnRetryHookは、各リトライ試行で実行されるオプションの関数です。
	// 現在のリトライ回数がretryNumを介して渡されます。
	OnRetryHook func(retryNum int, delay time.Duration)

	Logger watermill.LoggerAdapter
}
// MiddlewareはRetryミドルウェアを返します。
func (r Retry) Middleware(h message.HandlerFunc) message.HandlerFunc {
	return func(msg *message.Message) ([]*message.Message, error) {
		producedMessages, err := h(msg)
		if err == nil {
			return producedMessages, nil
		}

		expBackoff := backoff.NewExponentialBackOff()
		expBackoff.InitialInterval = r.InitialInterval
		expBackoff.MaxInterval = r.MaxInterval
		expBackoff.Multiplier = r.Multiplier
		expBackoff.MaxElapsedTime = r.MaxElapsedTime
		expBackoff.RandomizationFactor = r.RandomizationFactor

		ctx := msg.Context()
		if r.MaxElapsedTime > 0 {
			var cancel func()
			ctx, cancel = context.WithTimeout(ctx, r.MaxElapsedTime)
			defer cancel()
		}

		retryNum := 1
		expBackoff.Reset()
	retryLoop:
		for {
			waitTime := expBackoff.NextBackOff()
			select {
			case 

スロットル

// Throttleは、特定の時間内に処理されるメッセージの数を制限するミドルウェアを提供します。
// これは、未処理の長いキューで実行されるハンドラの過負荷を防ぐために使用できます。
type Throttle struct {
	ticker *time.Ticker
}
// NewThrottleは新しいThrottleミドルウェアを作成します。
// 例:NewThrottle(10, time.Second)は、1秒あたり10メッセージを示します。
func NewThrottle(count int64, duration time.Duration) *Throttle {
	return &Throttle{
		ticker: time.NewTicker(duration / time.Duration(count)),
	}
}
// MiddlewareはThrottleミドルウェアを返します。
func (t Throttle) Middleware(h message.HandlerFunc) message.HandlerFunc {
	return func(message *message.Message) ([]*message.Message, error) {
		// 複数のハンドラで共有されるスロットルは、それぞれの「チック」を待ちます。

タイムアウト

// Timeoutは、指定された期間後に着信メッセージのコンテキストをキャンセルします。
// ハンドラのタイムアウトに敏感な機能は、msg.Context().Done()を監視して失敗するタイミングを知る必要があります。
func Timeout(timeout time.Duration) func(message.HandlerFunc) message.HandlerFunc {
	return func(h message.HandlerFunc) message.HandlerFunc {
		return func(msg *message.Message) ([]*message.Message, error) {
			ctx, cancel := context.WithTimeout(msg.Context(), timeout)
			defer func() {
				cancel()
			}()

			msg.SetContext(ctx)
			return h(msg)
		}
	}
}