Введение

Промежуточное ПО используется для расширения фреймворка событий, добавления пользовательской функциональности и предоставления важных функций, не связанных с логикой основного обработчика. Например, повторное выполнение обработчика после возврата ошибки или восстановление после паники и захвата трассировки стека внутри обработчика.

Сигнатура функции промежуточного ПО определена следующим образом:

Полный исходный код: github.com/ThreeDotsLabs/watermill/message/router.go

// ...
// HandlerMiddleware позволяет нам создавать декораторы, аналогичные обработчику.
// Он может выполнять операции до обработчика (например, изменять полученное сообщение)
// и также выполнять операции после обработчика (изменять произведенное сообщение, подтверждать/отменять полученное сообщение, обрабатывать ошибки, вести журналирование и т. д.).
//
// Его можно присоединить к маршрутизатору, используя метод `AddMiddleware`.
//
// Пример:
//
//	func ExampleMiddleware(h message.HandlerFunc) message.HandlerFunc {
//		return func(message *message.Message) ([]*message.Message, error) {
//			fmt.Println("Перед выполнением обработчика")
//			произведенныеСообщения, ошибка := h(message)
//			fmt.Println("После выполнения обработчика")
//
//			return произведенныеСообщения, ошибка
//		}
//	}
type HandlerMiddleware func(h HandlerFunc) HandlerFunc
// ...

Использование

Промежуточное ПО может применяться ко всем обработчикам в маршрутизаторе или к конкретным обработчикам. Когда промежуточное ПО добавляется непосредственно в маршрутизатор, оно применяется ко всем обработчикам, предоставленным для маршрутизатора. Если промежуточное ПО применяется только к конкретному обработчику, его необходимо добавить к обработчику в маршрутизаторе.

Вот пример использования:

Полный исходный код: github.com/ThreeDotsLabs/watermill/_examples/basic/3-router/main.go

// ...
	router, err := message.NewRouter(message.RouterConfig{}, logger)
	if err != nil {
		panic(err)
	}

	// При получении сигнала SIGTERM SignalsHandler будет грациозно закрывать маршрутизатор.
	// Вы также можете закрыть маршрутизатор, вызвав `r.Close()`.
	router.AddPlugin(plugin.SignalsHandler)

	// Промежуточное ПО на уровне маршрутизатора будет выполнено для каждого сообщения, отправленного в маршрутизатор
	router.AddMiddleware(
		// CorrelationID скопирует идентификатор корреляции из метаданных входящего сообщения в сгенерированное сообщение
		middleware.CorrelationID,

		// Если обработчик возвращает ошибку, она будет повторно попытана.
		// Она будет повторно попытана не более MaxRetries раз, после чего сообщение будет Nacked и повторно отправлено PubSub.
		middleware.Retry{
			MaxRetries:      3,
			InitialInterval: time.Millisecond * 100,
			Logger:          logger,
		}.Middleware,

		// Recoverer обрабатывает паники в обработчике.
		// В этом случае он передает их в качестве ошибок в промежуточное ПО Retry.
		middleware.Recoverer,
	)

	// Для упрощения здесь используется Pub/Sub gochannel,
	// вы можете заменить его любой другой реализацией Pub/Sub, и это будет работать так же.
	pubSub := gochannel.NewGoChannel(gochannel.Config{}, logger)

	// Публикуем некоторые входящие сообщения в фоне
	go publishMessages(pubSub)

	// AddHandler возвращает обработчик, который можно использовать для добавления промежуточного ПО на уровне обработчика
	// или для остановки обработчика.
	handler := router.AddHandler(
		"struct_handler",          // Имя обработчика, должно быть уникальным
		"incoming_messages_topic", // Тема, из которой будут считываться события
		pubSub,
		"outgoing_messages_topic", // Тема, в которую будут опубликованы события
		pubSub,
		structHandler{}.Handler,
	)

	// Промежуточное ПО на уровне обработчика выполняется только для определенных обработчиков
	// Такое промежуточное ПО можно добавить к обработчику таким же образом, как и промежуточное ПО на уровне маршрутизатора
	handler.AddMiddleware(func(h message.HandlerFunc) message.HandlerFunc {
		return func(message *message.Message) ([]*message.Message, error) {
			log.Println("Выполняется обработчик-специфичное промежуточное ПО для", message.UUID)

			return h(message)
		}
	})

	// Только для целей отладки мы печатаем все полученные сообщения на `incoming_messages_topic`
	router.AddNoPublisherHandler(
		"print_incoming_messages",
		"incoming_messages_topic",
		pubSub,
		printMessages,
	)

	// Только для целей отладки мы печатаем все события, отправленные в `outgoing_messages_topic`
	router.AddNoPublisherHandler(
		"print_outgoing_messages",
		"outgoing_messages_topic",
		pubSub,
		printMessages,
	)

	// Теперь, когда все обработчики зарегистрированы, мы можем запустить маршрутизатор.
	// Run будет заблокирован, пока маршрутизатор не перестанет работать.
// ...

Доступное промежуточное ПО

Вот переиспользуемые промежуточные ПО, предоставляемые Watermill, и вы также легко можете реализовать свое собственное промежуточное ПО. Например, если вы хотите сохранить каждое входящее сообщение в определенном формате журнала, это лучший способ сделать это.

Прерыватель цепи

// CircuitBreaker - это промежуточное ПО, которое оборачивает обработчик в прерыватель цепи.
// В зависимости от конфигурации прерыватель цепи может быстро завершить работу, если обработчик продолжает возвращать ошибки.
// Это полезно для предотвращения каскадных сбоев.
type CircuitBreaker struct {
    cb *gobreaker.CircuitBreaker
}
// NewCircuitBreaker возвращает новое промежуточное ПО прерывателя цепи.
// Для доступных настроек обратитесь к документации по gobreaker.
func NewCircuitBreaker(settings gobreaker.Settings) CircuitBreaker {
    return CircuitBreaker{
        cb: gobreaker.NewCircuitBreaker(settings),
    }
}
// Middleware возвращает промежуточное ПО прерывателя цепи.
func (c CircuitBreaker) Middleware(h message.HandlerFunc) message.HandlerFunc {
    return func(msg *message.Message) ([]*message.Message, error) {
        out, err := c.cb.Execute(func() (interface{}, error) {
            return h(msg)
        })

        var result []*message.Message
        if out != nil {
            result = out.([]*message.Message)
        }

        return result, err
    }
}

Корреляция

// SetCorrelationID устанавливает идентификатор корреляции для сообщения.
//
// Когда сообщение попадает в систему, должен быть вызван SetCorrelationID.
// Когда сообщение генерируется в запросе (например, HTTP), идентификатор корреляции сообщения должен совпадать с идентификатором корреляции запроса.
func SetCorrelationID(id string, msg *message.Message) {
    if MessageCorrelationID(msg) != "" {
        return
    }

    msg.Metadata.Set(CorrelationIDMetadataKey, id)
}
// MessageCorrelationID возвращает идентификатор корреляции из сообщения.
func MessageCorrelationID(message *message.Message) string {
    return message.Metadata.Get(CorrelationIDMetadataKey)
}
// CorrelationID добавляет идентификатор корреляции ко всем сообщениям, сгенерированным обработчиком.
// Идентификатор основан на идентификаторе сообщения, полученном обработчиком.
//
// Для правильной работы CorrelationID вызов SetCorrelationID должен быть сделан первым для попадания сообщения в систему.
func CorrelationID(h message.HandlerFunc) message.HandlerFunc {
    return func(message *message.Message) ([]*message.Message, error) {
        producedMessages, err := h(message)

        correlationID := MessageCorrelationID(message)
        for _, msg := range producedMessages {
            SetCorrelationID(correlationID, msg)
        }

        return producedMessages, err
    }
}

Дубликатор

// Duplicator обрабатывает сообщение дважды для обеспечения идемпотентности конечной точки.
func Duplicator(h message.HandlerFunc) message.HandlerFunc {
    return func(msg *message.Message) ([]*message.Message, error) {
        firstProducedMessages, firstErr := h(msg)
        if firstErr != nil {
            return nil, firstErr
        }

        secondProducedMessages, secondErr := h(msg)
        if secondErr != nil {
            return nil, secondErr
        }

        return append(firstProducedMessages, secondProducedMessages...), nil
    }
}

Игнорирование ошибок

// IgnoreErrors предоставляет промежуточное ПО, которое позволяет обработчику игнорировать определенные явно определенные ошибки.
type IgnoreErrors struct {
    ignoredErrors map[string]struct{}
}
// NewIgnoreErrors создает новое промежуточное ПО IgnoreErrors.
func NewIgnoreErrors(errs []error) IgnoreErrors {
    errsMap := make(map[string]struct{}, len(errs))

    for _, err := range errs {
        errsMap[err.Error()] = struct{}{}
    }

    return IgnoreErrors{errsMap}
}
// Middleware возвращает промежуточное ПО IgnoreErrors.
func (i IgnoreErrors) Middleware(h message.HandlerFunc) message.HandlerFunc {
    return func(msg *message.Message) ([]*message.Message, error) {
        events, err := h(msg)
        if err != nil {
            if _, ok := i.ignoredErrors[errors.Cause(err).Error()]; ok {
                return events, nil
            }

            return events, err
        }

        return events, nil
    }
}

Мгновенное подтверждение

// InstantAck заставляет обработчик немедленно подтвердить входящее сообщение, независимо от любых ошибок.
// Это можно использовать для увеличения пропускной способности, но компромисс состоит в том, что:
// Если вам нужно гарантировать доставку ровно один раз, вы можете получить, по меньшей мере, однократную доставку.
// Если вам необходимы упорядоченные сообщения, это может нарушить порядок.
func InstantAck(h message.HandlerFunc) message.HandlerFunc {
	return func(message *message.Message) ([]*message.Message, error) {
		message.Ack()
		return h(message)
	}
}

Яд

// PoisonQueue предоставляет функцию промежуточного слоя для обработки недоступных сообщений и их публикации в отдельную тему.
// Затем основная цепочка промежуточных слоев продолжает выполнение, и бизнес-процесс продолжает своё выполнение как обычно.
func PoisonQueue(pub message.Publisher, topic string) (message.HandlerMiddleware, error) {
	if topic == "" {
		return nil, ErrInvalidPoisonQueueTopic
	}

	pq := poisonQueue{
		topic: topic,
		pub:   pub,
		shouldGoToPoisonQueue: func(err error) bool {
			return true
		},
	}

	return pq.Middleware, nil
}

// PoisonQueueWithFilter похож на функцию PoisonQueue, но принимает функцию для определения ошибок, соответствующих критериям очереди яда.
func PoisonQueueWithFilter(pub message.Publisher, topic string, shouldGoToPoisonQueue func(err error) bool) (message.HandlerMiddleware, error) {
	if topic == "" {
		return nil, ErrInvalidPoisonQueueTopic
	}

	pq := poisonQueue{
		topic: topic,
		pub:   pub,
		shouldGoToPoisonQueue: shouldGoToPoisonQueue,
	}

	return pq.Middleware, nil
}

Случайная Ошибка

// RandomFail вызывает сбой обработчика на основе случайной вероятности. Вероятность ошибки должна находиться в диапазоне (0, 1).
func RandomFail(errorProbability float32) message.HandlerMiddleware {
	return func(h message.HandlerFunc) message.HandlerFunc {
		return func(message *message.Message) ([]*message.Message, error) {
			if shouldFail(errorProbability) {
				return nil, errors.New("произошла случайная ошибка")
			}
			return h(message)
		}
	}
}

// RandomPanic вызывает панику обработчика на основе случайной вероятности. Вероятность паники должна находиться в диапазоне (0, 1).
func RandomPanic(panicProbability float32) message.HandlerMiddleware {
	return func(h message.HandlerFunc) message.HandlerFunc {
		return func(message *message.Message) ([]*message.Message, error) {
			if shouldFail(panicProbability) {
				panic("произошла случайная паника")
			}
			return h(message)
		}
	}
}

Восстановитель

// RecoveredPanicError содержит восстановленную ошибку паники и информацию о трассировке стека.
type RecoveredPanicError struct {
	V          interface{}
	Stacktrace string
}

// Recoverer восстанавливает любую панику из обработчика и ставит RecoveredPanicError с трассировкой стека к любой ошибке, возвращаемой из обработчика.
func Recoverer(h message.HandlerFunc) message.HandlerFunc {
	return func(event *message.Message) (events []*message.Message, err error) {
		panicked := true

		defer func() {
			if r := recover(); r != nil || panicked {
				err = errors.WithStack(RecoveredPanicError{V: r, Stacktrace: string(debug.Stack())})
			}
		}()

		events, err = h(event)
		panicked = false
		return events, err
	}
}

Повтор

// Retry предоставляет промежуточное ПО, которое повторяет обработчик, если возвращается ошибка.
// Поведение повтора, экспоненциальная задержка и максимальное затраченное время можно настроить.
type Retry struct {
	// MaxRetries - максимальное количество попыток.
	MaxRetries int

	// InitialInterval - начальный интервал между повторами. Последующие интервалы будут масштабироваться Множителем.
	InitialInterval time.Duration
	// MaxInterval устанавливает верхний предел экспоненциальной задержки повторов.
	MaxInterval time.Duration
	// Multiplier - это фактор, на который будет умножаться интервал ожидания между повторами.
	Multiplier float64
	// MaxElapsedTime задает максимальный временной лимит для повторов. Если 0, то отключен.
	MaxElapsedTime time.Duration
	// RandomizationFactor случайным образом распределяет время задержки в пределах следующего диапазона:
	// [текущийИнтервал * (1 - факторРандомизации), текущийИнтервал * (1 + факторРандомизации)].
	RandomizationFactor float64

	// OnRetryHook - это необязательная функция, которая будет выполнена на каждой попытке повтора.
	// Текущий номер повтора передается через retryNum.
	OnRetryHook func(retryNum int, delay time.Duration)

	Logger watermill.LoggerAdapter
}
// Middleware возвращает промежуточное ПО Retry.
func (r Retry) Middleware(h message.HandlerFunc) message.HandlerFunc {
	return func(msg *message.Message) ([]*message.Message, error) {
		producedMessages, err := h(msg)
		if err == nil {
			return producedMessages, nil
		}

		expBackoff := backoff.NewExponentialBackOff()
		expBackoff.InitialInterval = r.InitialInterval
		expBackoff.MaxInterval = r.MaxInterval
		expBackoff.Multiplier = r.Multiplier
		expBackoff.MaxElapsedTime = r.MaxElapsedTime
		expBackoff.RandomizationFactor = r.RandomizationFactor

		ctx := msg.Context()
		if r.MaxElapsedTime > 0 {
			var cancel func()
			ctx, cancel = context.WithTimeout(ctx, r.MaxElapsedTime)
			defer cancel()
		}

		retryNum := 1
		expBackoff.Reset()
	retryLoop:
		for {
			waitTime := expBackoff.NextBackOff()
			select {
			case 

Ограничение

// Throttle предоставляет промежуточное ПО для ограничения количества обработанных сообщений в определенный период времени.
// Это может быть использовано для предотвращения перегрузки обработчиков, работающих с невыполненной длинной очередью.
type Throttle struct {
	ticker *time.Ticker
}
// NewThrottle создает новое промежуточное ПО Throttle.
// Пример длительности и количества: NewThrottle(10, time.Second) указывает 10 сообщений в секунду.
func NewThrottle(count int64, duration time.Duration) *Throttle {
	return &Throttle{
		ticker: time.NewTicker(duration / time.Duration(count)),
	}
}
// Middleware возвращает промежуточное ПО Throttle.
func (t Throttle) Middleware(h message.HandlerFunc) message.HandlerFunc {
	return func(message *message.Message) ([]*message.Message, error) {
		// Промежутки, общие для нескольких обработчиков, будут ожидать своих "тиков".

Тайм-аут

// Timeout отменяет контекст входящего сообщения после указанной продолжительности.
// Любые функции обработчика, чувствительные к тайм-ауту, должны слушать msg.Context().Done(), чтобы знать, когда завершиться.
func Timeout(timeout time.Duration) func(message.HandlerFunc) message.HandlerFunc {
	return func(h message.HandlerFunc) message.HandlerFunc {
		return func(msg *message.Message) ([]*message.Message, error) {
			ctx, cancel := context.WithTimeout(msg.Context(), timeout)
			defer func() {
				cancel()
			}()

			msg.SetContext(ctx)
			return h(msg)
		}
	}
}