Введение
Промежуточное ПО используется для расширения фреймворка событий, добавления пользовательской функциональности и предоставления важных функций, не связанных с логикой основного обработчика. Например, повторное выполнение обработчика после возврата ошибки или восстановление после паники и захвата трассировки стека внутри обработчика.
Сигнатура функции промежуточного ПО определена следующим образом:
Полный исходный код: github.com/ThreeDotsLabs/watermill/message/router.go
// ...
// HandlerMiddleware позволяет нам создавать декораторы, аналогичные обработчику.
// Он может выполнять операции до обработчика (например, изменять полученное сообщение)
// и также выполнять операции после обработчика (изменять произведенное сообщение, подтверждать/отменять полученное сообщение, обрабатывать ошибки, вести журналирование и т. д.).
//
// Его можно присоединить к маршрутизатору, используя метод `AddMiddleware`.
//
// Пример:
//
// func ExampleMiddleware(h message.HandlerFunc) message.HandlerFunc {
// return func(message *message.Message) ([]*message.Message, error) {
// fmt.Println("Перед выполнением обработчика")
// произведенныеСообщения, ошибка := h(message)
// fmt.Println("После выполнения обработчика")
//
// return произведенныеСообщения, ошибка
// }
// }
type HandlerMiddleware func(h HandlerFunc) HandlerFunc
// ...
Использование
Промежуточное ПО может применяться ко всем обработчикам в маршрутизаторе или к конкретным обработчикам. Когда промежуточное ПО добавляется непосредственно в маршрутизатор, оно применяется ко всем обработчикам, предоставленным для маршрутизатора. Если промежуточное ПО применяется только к конкретному обработчику, его необходимо добавить к обработчику в маршрутизаторе.
Вот пример использования:
Полный исходный код: github.com/ThreeDotsLabs/watermill/_examples/basic/3-router/main.go
// ...
router, err := message.NewRouter(message.RouterConfig{}, logger)
if err != nil {
panic(err)
}
// При получении сигнала SIGTERM SignalsHandler будет грациозно закрывать маршрутизатор.
// Вы также можете закрыть маршрутизатор, вызвав `r.Close()`.
router.AddPlugin(plugin.SignalsHandler)
// Промежуточное ПО на уровне маршрутизатора будет выполнено для каждого сообщения, отправленного в маршрутизатор
router.AddMiddleware(
// CorrelationID скопирует идентификатор корреляции из метаданных входящего сообщения в сгенерированное сообщение
middleware.CorrelationID,
// Если обработчик возвращает ошибку, она будет повторно попытана.
// Она будет повторно попытана не более MaxRetries раз, после чего сообщение будет Nacked и повторно отправлено PubSub.
middleware.Retry{
MaxRetries: 3,
InitialInterval: time.Millisecond * 100,
Logger: logger,
}.Middleware,
// Recoverer обрабатывает паники в обработчике.
// В этом случае он передает их в качестве ошибок в промежуточное ПО Retry.
middleware.Recoverer,
)
// Для упрощения здесь используется Pub/Sub gochannel,
// вы можете заменить его любой другой реализацией Pub/Sub, и это будет работать так же.
pubSub := gochannel.NewGoChannel(gochannel.Config{}, logger)
// Публикуем некоторые входящие сообщения в фоне
go publishMessages(pubSub)
// AddHandler возвращает обработчик, который можно использовать для добавления промежуточного ПО на уровне обработчика
// или для остановки обработчика.
handler := router.AddHandler(
"struct_handler", // Имя обработчика, должно быть уникальным
"incoming_messages_topic", // Тема, из которой будут считываться события
pubSub,
"outgoing_messages_topic", // Тема, в которую будут опубликованы события
pubSub,
structHandler{}.Handler,
)
// Промежуточное ПО на уровне обработчика выполняется только для определенных обработчиков
// Такое промежуточное ПО можно добавить к обработчику таким же образом, как и промежуточное ПО на уровне маршрутизатора
handler.AddMiddleware(func(h message.HandlerFunc) message.HandlerFunc {
return func(message *message.Message) ([]*message.Message, error) {
log.Println("Выполняется обработчик-специфичное промежуточное ПО для", message.UUID)
return h(message)
}
})
// Только для целей отладки мы печатаем все полученные сообщения на `incoming_messages_topic`
router.AddNoPublisherHandler(
"print_incoming_messages",
"incoming_messages_topic",
pubSub,
printMessages,
)
// Только для целей отладки мы печатаем все события, отправленные в `outgoing_messages_topic`
router.AddNoPublisherHandler(
"print_outgoing_messages",
"outgoing_messages_topic",
pubSub,
printMessages,
)
// Теперь, когда все обработчики зарегистрированы, мы можем запустить маршрутизатор.
// Run будет заблокирован, пока маршрутизатор не перестанет работать.
// ...
Доступное промежуточное ПО
Вот переиспользуемые промежуточные ПО, предоставляемые Watermill, и вы также легко можете реализовать свое собственное промежуточное ПО. Например, если вы хотите сохранить каждое входящее сообщение в определенном формате журнала, это лучший способ сделать это.
Прерыватель цепи
// CircuitBreaker - это промежуточное ПО, которое оборачивает обработчик в прерыватель цепи.
// В зависимости от конфигурации прерыватель цепи может быстро завершить работу, если обработчик продолжает возвращать ошибки.
// Это полезно для предотвращения каскадных сбоев.
type CircuitBreaker struct {
cb *gobreaker.CircuitBreaker
}
// NewCircuitBreaker возвращает новое промежуточное ПО прерывателя цепи.
// Для доступных настроек обратитесь к документации по gobreaker.
func NewCircuitBreaker(settings gobreaker.Settings) CircuitBreaker {
return CircuitBreaker{
cb: gobreaker.NewCircuitBreaker(settings),
}
}
// Middleware возвращает промежуточное ПО прерывателя цепи.
func (c CircuitBreaker) Middleware(h message.HandlerFunc) message.HandlerFunc {
return func(msg *message.Message) ([]*message.Message, error) {
out, err := c.cb.Execute(func() (interface{}, error) {
return h(msg)
})
var result []*message.Message
if out != nil {
result = out.([]*message.Message)
}
return result, err
}
}
Корреляция
// SetCorrelationID устанавливает идентификатор корреляции для сообщения.
//
// Когда сообщение попадает в систему, должен быть вызван SetCorrelationID.
// Когда сообщение генерируется в запросе (например, HTTP), идентификатор корреляции сообщения должен совпадать с идентификатором корреляции запроса.
func SetCorrelationID(id string, msg *message.Message) {
if MessageCorrelationID(msg) != "" {
return
}
msg.Metadata.Set(CorrelationIDMetadataKey, id)
}
// MessageCorrelationID возвращает идентификатор корреляции из сообщения.
func MessageCorrelationID(message *message.Message) string {
return message.Metadata.Get(CorrelationIDMetadataKey)
}
// CorrelationID добавляет идентификатор корреляции ко всем сообщениям, сгенерированным обработчиком.
// Идентификатор основан на идентификаторе сообщения, полученном обработчиком.
//
// Для правильной работы CorrelationID вызов SetCorrelationID должен быть сделан первым для попадания сообщения в систему.
func CorrelationID(h message.HandlerFunc) message.HandlerFunc {
return func(message *message.Message) ([]*message.Message, error) {
producedMessages, err := h(message)
correlationID := MessageCorrelationID(message)
for _, msg := range producedMessages {
SetCorrelationID(correlationID, msg)
}
return producedMessages, err
}
}
Дубликатор
// Duplicator обрабатывает сообщение дважды для обеспечения идемпотентности конечной точки.
func Duplicator(h message.HandlerFunc) message.HandlerFunc {
return func(msg *message.Message) ([]*message.Message, error) {
firstProducedMessages, firstErr := h(msg)
if firstErr != nil {
return nil, firstErr
}
secondProducedMessages, secondErr := h(msg)
if secondErr != nil {
return nil, secondErr
}
return append(firstProducedMessages, secondProducedMessages...), nil
}
}
Игнорирование ошибок
// IgnoreErrors предоставляет промежуточное ПО, которое позволяет обработчику игнорировать определенные явно определенные ошибки.
type IgnoreErrors struct {
ignoredErrors map[string]struct{}
}
// NewIgnoreErrors создает новое промежуточное ПО IgnoreErrors.
func NewIgnoreErrors(errs []error) IgnoreErrors {
errsMap := make(map[string]struct{}, len(errs))
for _, err := range errs {
errsMap[err.Error()] = struct{}{}
}
return IgnoreErrors{errsMap}
}
// Middleware возвращает промежуточное ПО IgnoreErrors.
func (i IgnoreErrors) Middleware(h message.HandlerFunc) message.HandlerFunc {
return func(msg *message.Message) ([]*message.Message, error) {
events, err := h(msg)
if err != nil {
if _, ok := i.ignoredErrors[errors.Cause(err).Error()]; ok {
return events, nil
}
return events, err
}
return events, nil
}
}
Мгновенное подтверждение
// InstantAck заставляет обработчик немедленно подтвердить входящее сообщение, независимо от любых ошибок.
// Это можно использовать для увеличения пропускной способности, но компромисс состоит в том, что:
// Если вам нужно гарантировать доставку ровно один раз, вы можете получить, по меньшей мере, однократную доставку.
// Если вам необходимы упорядоченные сообщения, это может нарушить порядок.
func InstantAck(h message.HandlerFunc) message.HandlerFunc {
return func(message *message.Message) ([]*message.Message, error) {
message.Ack()
return h(message)
}
}
Яд
// PoisonQueue предоставляет функцию промежуточного слоя для обработки недоступных сообщений и их публикации в отдельную тему.
// Затем основная цепочка промежуточных слоев продолжает выполнение, и бизнес-процесс продолжает своё выполнение как обычно.
func PoisonQueue(pub message.Publisher, topic string) (message.HandlerMiddleware, error) {
if topic == "" {
return nil, ErrInvalidPoisonQueueTopic
}
pq := poisonQueue{
topic: topic,
pub: pub,
shouldGoToPoisonQueue: func(err error) bool {
return true
},
}
return pq.Middleware, nil
}
// PoisonQueueWithFilter похож на функцию PoisonQueue, но принимает функцию для определения ошибок, соответствующих критериям очереди яда.
func PoisonQueueWithFilter(pub message.Publisher, topic string, shouldGoToPoisonQueue func(err error) bool) (message.HandlerMiddleware, error) {
if topic == "" {
return nil, ErrInvalidPoisonQueueTopic
}
pq := poisonQueue{
topic: topic,
pub: pub,
shouldGoToPoisonQueue: shouldGoToPoisonQueue,
}
return pq.Middleware, nil
}
Случайная Ошибка
// RandomFail вызывает сбой обработчика на основе случайной вероятности. Вероятность ошибки должна находиться в диапазоне (0, 1).
func RandomFail(errorProbability float32) message.HandlerMiddleware {
return func(h message.HandlerFunc) message.HandlerFunc {
return func(message *message.Message) ([]*message.Message, error) {
if shouldFail(errorProbability) {
return nil, errors.New("произошла случайная ошибка")
}
return h(message)
}
}
}
// RandomPanic вызывает панику обработчика на основе случайной вероятности. Вероятность паники должна находиться в диапазоне (0, 1).
func RandomPanic(panicProbability float32) message.HandlerMiddleware {
return func(h message.HandlerFunc) message.HandlerFunc {
return func(message *message.Message) ([]*message.Message, error) {
if shouldFail(panicProbability) {
panic("произошла случайная паника")
}
return h(message)
}
}
}
Восстановитель
// RecoveredPanicError содержит восстановленную ошибку паники и информацию о трассировке стека.
type RecoveredPanicError struct {
V interface{}
Stacktrace string
}
// Recoverer восстанавливает любую панику из обработчика и ставит RecoveredPanicError с трассировкой стека к любой ошибке, возвращаемой из обработчика.
func Recoverer(h message.HandlerFunc) message.HandlerFunc {
return func(event *message.Message) (events []*message.Message, err error) {
panicked := true
defer func() {
if r := recover(); r != nil || panicked {
err = errors.WithStack(RecoveredPanicError{V: r, Stacktrace: string(debug.Stack())})
}
}()
events, err = h(event)
panicked = false
return events, err
}
}
Повтор
// Retry предоставляет промежуточное ПО, которое повторяет обработчик, если возвращается ошибка.
// Поведение повтора, экспоненциальная задержка и максимальное затраченное время можно настроить.
type Retry struct {
// MaxRetries - максимальное количество попыток.
MaxRetries int
// InitialInterval - начальный интервал между повторами. Последующие интервалы будут масштабироваться Множителем.
InitialInterval time.Duration
// MaxInterval устанавливает верхний предел экспоненциальной задержки повторов.
MaxInterval time.Duration
// Multiplier - это фактор, на который будет умножаться интервал ожидания между повторами.
Multiplier float64
// MaxElapsedTime задает максимальный временной лимит для повторов. Если 0, то отключен.
MaxElapsedTime time.Duration
// RandomizationFactor случайным образом распределяет время задержки в пределах следующего диапазона:
// [текущийИнтервал * (1 - факторРандомизации), текущийИнтервал * (1 + факторРандомизации)].
RandomizationFactor float64
// OnRetryHook - это необязательная функция, которая будет выполнена на каждой попытке повтора.
// Текущий номер повтора передается через retryNum.
OnRetryHook func(retryNum int, delay time.Duration)
Logger watermill.LoggerAdapter
}
// Middleware возвращает промежуточное ПО Retry.
func (r Retry) Middleware(h message.HandlerFunc) message.HandlerFunc {
return func(msg *message.Message) ([]*message.Message, error) {
producedMessages, err := h(msg)
if err == nil {
return producedMessages, nil
}
expBackoff := backoff.NewExponentialBackOff()
expBackoff.InitialInterval = r.InitialInterval
expBackoff.MaxInterval = r.MaxInterval
expBackoff.Multiplier = r.Multiplier
expBackoff.MaxElapsedTime = r.MaxElapsedTime
expBackoff.RandomizationFactor = r.RandomizationFactor
ctx := msg.Context()
if r.MaxElapsedTime > 0 {
var cancel func()
ctx, cancel = context.WithTimeout(ctx, r.MaxElapsedTime)
defer cancel()
}
retryNum := 1
expBackoff.Reset()
retryLoop:
for {
waitTime := expBackoff.NextBackOff()
select {
case
Ограничение
// Throttle предоставляет промежуточное ПО для ограничения количества обработанных сообщений в определенный период времени.
// Это может быть использовано для предотвращения перегрузки обработчиков, работающих с невыполненной длинной очередью.
type Throttle struct {
ticker *time.Ticker
}
// NewThrottle создает новое промежуточное ПО Throttle.
// Пример длительности и количества: NewThrottle(10, time.Second) указывает 10 сообщений в секунду.
func NewThrottle(count int64, duration time.Duration) *Throttle {
return &Throttle{
ticker: time.NewTicker(duration / time.Duration(count)),
}
}
// Middleware возвращает промежуточное ПО Throttle.
func (t Throttle) Middleware(h message.HandlerFunc) message.HandlerFunc {
return func(message *message.Message) ([]*message.Message, error) {
// Промежутки, общие для нескольких обработчиков, будут ожидать своих "тиков".
Тайм-аут
// Timeout отменяет контекст входящего сообщения после указанной продолжительности.
// Любые функции обработчика, чувствительные к тайм-ауту, должны слушать msg.Context().Done(), чтобы знать, когда завершиться.
func Timeout(timeout time.Duration) func(message.HandlerFunc) message.HandlerFunc {
return func(h message.HandlerFunc) message.HandlerFunc {
return func(msg *message.Message) ([]*message.Message, error) {
ctx, cancel := context.WithTimeout(msg.Context(), timeout)
defer func() {
cancel()
}()
msg.SetContext(ctx)
return h(msg)
}
}
}