Introdução

O middleware é usado para estender o framework de eventos, adicionar funcionalidades personalizadas e fornecer funcionalidades importantes não relacionadas à lógica do manipulador principal. Por exemplo, tentar novamente o manipulador após retornar um erro, ou recuperar de um pânico e capturar a pilha de chamadas dentro do manipulador.

A assinatura da função do middleware é definida da seguinte forma:

Código fonte completo: github.com/ThreeDotsLabs/watermill/message/router.go

// ...
// O HandlerMiddleware nos permite escrever decoradores semelhantes ao manipulador.
// Ele pode executar algumas operações antes do manipulador (por exemplo, modificar a mensagem consumida)
// e também realizar algumas operações após o manipulador (modificar a mensagem produzida, ACK/NACK da mensagem consumida, lidar com erros, fazer logging, etc.).
//
// Ele pode ser anexado ao roteador usando o método `AddMiddleware`.
//
// Exemplo:
//
//	func ExampleMiddleware(h message.HandlerFunc) message.HandlerFunc {
//		return func(message *message.Message) ([]*message.Message, error) {
//			fmt.Println("Antes de executar o manipulador")
//			mensagensProduzidas, err := h(message)
//			fmt.Println("Depois de executar o manipulador")
//
//			retorne mensagensProduzidas, err
//		}
//	}
type HandlerMiddleware func(h HandlerFunc) HandlerFunc
// ...

Uso

Os middlewares podem ser aplicados a todos os manipuladores no roteador ou a manipuladores específicos. Quando um middleware é adicionado diretamente ao roteador, ele será aplicado a todos os manipuladores fornecidos para o roteador. Se um middleware for aplicado apenas a um manipulador específico, ele precisa ser adicionado ao manipulador no roteador.

Aqui está um exemplo de uso:

Código fonte completo: github.com/ThreeDotsLabs/watermill/_examples/basic/3-router/main.go

// ...
router, err := message.NewRouter(message.RouterConfig{}, logger)
if err != nil {
	panic(err)
}

// Ao receber o sinal SIGTERM, o SignalsHandler fechará o roteador graciosamente.
// Você também pode fechar o roteador chamando `r.Close()`.
router.AddPlugin(plugin.SignalsHandler)

// O middleware do nível do roteador será executado em cada mensagem enviada para o roteador
router.AddMiddleware(
	// CorrelationID irá copiar o ID de correlação dos metadados da mensagem de entrada para a mensagem gerada
	middleware.CorrelationID,

	// Se o manipulador retornar um erro, ele será reenviado.
	// Ele será reenviado no máximo MaxRetries vezes, após o que a mensagem será Nacked e reenviada pelo PubSub.
	middleware.Retry{
		MaxRetries:      3,
		InitialInterval: time.Millisecond * 100,
		Logger:          logger,
	}.Middleware,

	// Recoverer lida com panics no manipulador.
	// Neste caso, ele os passa como erros para o middleware de Retransmissão.
	middleware.Recoverer,
)

// Para simplicidade, usamos o Pub/Sub gochannel aqui,
// você pode substituí-lo por qualquer implementação de Pub/Sub e funcionará da mesma forma.
pubSub := gochannel.NewGoChannel(gochannel.Config{}, logger)

// Publica algumas mensagens de entrada em segundo plano
go publishMessages(pubSub)

// AddHandler retorna um manipulador que pode ser usado para adicionar middlewares de nível de manipulador
// ou para parar o manipulador.
handler := router.AddHandler(
	"struct_handler",          // Nome do manipulador, deve ser único
	"incoming_messages_topic", // Tópico do qual os eventos serão lidos
	pubSub,
	"outgoing_messages_topic", // Tópico para o qual os eventos serão publicados
	pubSub,
	structHandler{}.Handler,
)

// Middlewares de nível de manipulador são executados apenas para manipuladores específicos
// Tais middlewares podem ser adicionados ao manipulador da mesma forma que os middlewares de nível de roteador
handler.AddMiddleware(func(h message.HandlerFunc) message.HandlerFunc {
	return func(message *message.Message) ([]*message.Message, error) {
		log.Println("Executando middleware específico do manipulador para", message.UUID)

		return h(message)
	}
})

// Apenas para fins de depuração, imprimimos todas as mensagens recebidas no `incoming_messages_topic`
router.AddNoPublisherHandler(
	"print_incoming_messages",
	"incoming_messages_topic",
	pubSub,
	printMessages,
)

// Apenas para fins de depuração, imprimimos todos os eventos enviados para `outgoing_messages_topic`
router.AddNoPublisherHandler(
	"print_outgoing_messages",
	"outgoing_messages_topic",
	pubSub,
	printMessages,
)

// Agora que todos os manipuladores foram registrados, podemos executar o roteador.
// Run bloqueará até que o roteador pare de funcionar.
// ...

Middlewares Disponíveis

Aqui estão os middlewares reutilizáveis fornecidos pelo Watermill, e você também pode implementar facilmente seu próprio middleware. Por exemplo, se você deseja armazenar cada mensagem recebida em um determinado formato de log, esta é a melhor maneira de fazer isso.

Disjuntor

// CircuitBreaker é um middleware que envolve o manipulador em um disjuntor.
// Com base na configuração, o disjuntor irá falhar rapidamente se o manipulador continuar a retornar erros.
// Isso é útil para evitar falhas em cascata.
type CircuitBreaker struct {
    cb *gobreaker.CircuitBreaker
}
// NewCircuitBreaker retorna um novo middleware CircuitBreaker.
// Para configurações disponíveis, consulte a documentação do gobreaker.
func NewCircuitBreaker(configurações gobreaker.Settings) CircuitBreaker {
    return CircuitBreaker{
        cb: gobreaker.NewCircuitBreaker(configurações),
    }
}
// Middleware retorna o middleware CircuitBreaker.
func (c CircuitBreaker) Middleware(h message.HandlerFunc) message.HandlerFunc {
    return func(msg *message.Message) ([]*message.Message, error) {
        out, err := c.cb.Execute(func() (interface{}, error) {
            return h(msg)
        })

        var resultado []*message.Message
        if out != nil {
            resultado = out.([]*message.Message)
        }

        return resultado, err
    }
}

Correlação

// SetCorrelationID define o ID de correlação para a mensagem.
//
// Quando uma mensagem entra no sistema, SetCorrelationID deve ser chamado.
// Quando uma mensagem é gerada em uma solicitação (por exemplo, HTTP), o ID de correlação da mensagem deve ser o mesmo que o ID de correlação da solicitação.
func SetCorrelationID(id string, msg *message.Message) {
    if MessageCorrelationID(msg) != "" {
        return
    }

    msg.Metadata.Set(ChaveMetadadosCorrelaçãoID, id)
}
// MessageCorrelationID retorna o ID de correlação da mensagem.
func MessageCorrelationID(mensagem *message.Message) string {
    return mensagem.Metadata.Get(ChaveMetadadosCorrelaçãoID)
}
// CorrelationID adiciona um ID de correlação a todas as mensagens geradas pelo manipulador.
// O ID é baseado no ID da mensagem recebida pelo manipulador.
//
// Para que CorrelationID funcione corretamente, SetCorrelationID deve ser chamado primeiro para a mensagem entrar no sistema.
func CorrelationID(h message.HandlerFunc) message.HandlerFunc {
    return func(mensagem *message.Message) ([]*message.Message, error) {
        mensagensProduzidas, err := h(mensagem)

        idCorrelação := MessageCorrelationID(mensagem)
        for _, msg := range mensagensProduzidas {
            SetCorrelationID(idCorrelação, msg)
        }

        return mensagensProduzidas, err
    }
}

Duplicador

// Duplicador processa a mensagem duas vezes para garantir que o ponto final seja idempotente.
func Duplicador(h message.HandlerFunc) message.HandlerFunc {
    return func(msg *message.Message) ([]*message.Message, error) {
        mensagensProduzidasPrimeira, erroPrimeira := h(msg)
        if erroPrimeira != nil {
            return nil, erroPrimeira
        }

        mensagensProduzidasSegunda, erroSegunda := h(msg)
        if erroSegunda != nil {
            return nil, erroSegunda
        }

        return append(mensagensProduzidasPrimeira, mensagensProduzidasSegunda...), nil
    }
}

Ignorar Erros

// IgnoreErrors fornece um middleware que permite ao manipulador ignorar certos erros explicitamente definidos.
type IgnoreErrors struct {
    ignoredErrors map[string]struct{}
}
// NewIgnoreErrors cria um novo middleware IgnoreErrors.
func NewIgnoreErrors(errs []error) IgnoreErrors {
    errsMap := make(map[string]struct{}, len(errs))

    for _, err := range errs {
        errsMap[err.Error()] = struct{}{}
    }

    return IgnoreErrors{errsMap}
}
// Middleware retorna o middleware IgnoreErrors.
func (i IgnoreErrors) Middleware(h message.HandlerFunc) message.HandlerFunc {
    return func(msg *message.Message) ([]*message.Message, error) {
        eventos, err := h(msg)
        if err != nil {
            if _, ok := i.ignoredErrors[errors.Cause(err).Error()]; ok {
                return eventos, nil
            }

            return eventos, err
        }

        return eventos, nil
    }
}

Ack Imediato

// InstantAck faz o manipulador reconhecer imediatamente a mensagem recebida, independentemente de quaisquer erros.
// Pode ser usado para melhorar a taxa de transferência, mas o trade-off é:
// Se você precisa garantir a entrega exatamente uma vez, é possível que receba a entrega pelo menos uma vez.
// Se você precisar de mensagens ordenadas, pode quebrar a ordenação.
func InstantAck(h message.HandlerFunc) message.HandlerFunc {
	return func(message *message.Message) ([]*message.Message, error) {
		message.Ack()
		return h(message)
	}
}

Poison

// PoisonQueue fornece um recurso de middleware para lidar com mensagens impossíveis de processar e publicá-las em um tópico separado.
// Em seguida, a cadeia principal de middleware continua a ser executada, e o negócio prossegue como de costume.
func PoisonQueue(pub message.Publisher, tópico string) (message.HandlerMiddleware, error) {
	if tópico == "" {
		return nil, ErrInvalidPoisonQueueTopic
	}

	pq := poisonQueue{
		tópico: tópico,
		pub:   pub,
		shouldGoToPoisonQueue: func(err error) bool {
			return true
		},
	}

	return pq.Middleware, nil
}

// PoisonQueueWithFilter é semelhante ao PoisonQueue, mas aceita uma função para determinar quais erros atendem aos critérios da fila de mensagens impróprias.
func PoisonQueueWithFilter(pub message.Publisher, tópico string, shouldGoToPoisonQueue func(err error) bool) (message.HandlerMiddleware, error) {
	if tópico == "" {
		return nil, ErrInvalidPoisonQueueTopic
	}

	pq := poisonQueue{
		tópico: tópico,
		pub:   pub,
		shouldGoToPoisonQueue: shouldGoToPoisonQueue,
	}

	return pq.Middleware, nil
}

Random Fail

// RandomFail faz com que o manipulador falhe com base em uma probabilidade aleatória. A probabilidade de erro deve estar dentro da faixa (0, 1).
func RandomFail(probabilidadeDeErro float32) message.HandlerMiddleware {
	return func(h message.HandlerFunc) message.HandlerFunc {
		return func(mensagem *message.Message) ([]*message.Message, error) {
			if deveFalhar(probabilidadeDeErro) {
				return nil, errors.New("ocorreu um erro aleatório")
			}
			return h(mensagem)
		}
	}
}

// RandomPanic faz com que o manipulador entre em pânico com base em uma probabilidade aleatória. A probabilidade de pânico deve estar dentro da faixa (0, 1).
func RandomPanic(probabilidadeDePânico float32) message.HandlerMiddleware {
	return func(h message.HandlerFunc) message.HandlerFunc {
		return func(mensagem *message.Message) ([]*message.Message, error) {
			if deveFalhar(probabilidadeDePânico) {
				panic("ocorreu um pânico aleatório")
			}
			return h(mensagem)
		}
	}
}

Recoverer

// RecoveredPanicError guarda o erro de pânico recuperado e suas informações de rastreamento de pilha.
type RecoveredPanicError struct {
	V          interface{}
	Stacktrace string
}

// Recoverer recupera qualquer pânico do manipulador e anexa o RecoveredPanicError com rastreamento de pilha a qualquer erro retornado pelo manipulador.
func Recoverer(h message.HandlerFunc) message.HandlerFunc {
	return func(event *message.Message) (events []*message.Message, err error) {
		panicked := true

		defer func() {
			if r := recover(); r != nil || panicked {
				err = errors.WithStack(RecoveredPanicError{V: r, Stacktrace: string(debug.Stack())})
			}
		}()

		events, err = h(event)
		panicked = false
		return events, err
	}
}

Tentativa

// Retry fornece um middleware que tenta novamente o manipulador se um erro for retornado.
// O comportamento de tentativa, retrocesso exponencial e tempo decorrido máximo podem ser configurados.
type Retry struct {
	// MaxRetries é o número máximo de tentativas a serem feitas.
	MaxRetries int

	// InitialInterval é o intervalo inicial entre as tentativas. Os intervalos subsequentes serão escalonados pelo Multiplicador.
	InitialInterval time.Duration
	// MaxInterval define o limite superior para o retrocesso exponencial das tentativas.
	MaxInterval time.Duration
	// Multiplicador é o fator pelo qual o intervalo de espera entre as tentativas será multiplicado.
	Multiplicador float64
	// MaxElapsedTime define o limite de tempo máximo para tentativas. Se 0, fica desativado.
	MaxElapsedTime time.Duration
	// RandomizationFactor espalha aleatoriamente o tempo de espera dentro da seguinte faixa:
	// [currentInterval * (1 - randomization_factor), currentInterval * (1 + randomization_factor)].
	RandomizationFactor float64

	// OnRetryHook é uma função opcional a ser executada em cada tentativa de repetição.
	// O número de tentativa atual é passado através de retryNum.
	OnRetryHook func(retryNum int, delay time.Duration)

	Logger watermill.LoggerAdapter
}
// Middleware retorna o middleware Retry.
func (r Retry) Middleware(h message.HandlerFunc) message.HandlerFunc {
	return func(msg *message.Message) ([]*message.Message, error) {
		producedMessages, err := h(msg)
		if err == nil {
			return producedMessages, nil
		}

		expBackoff := backoff.NewExponentialBackOff()
		expBackoff.InitialInterval = r.InitialInterval
		expBackoff.MaxInterval = r.MaxInterval
		expBackoff.Multiplier = r.Multiplicador
		expBackoff.MaxElapsedTime = r.MaxElapsedTime
		expBackoff.RandomizationFactor = r.RandomizationFactor

		ctx := msg.Context()
		if r.MaxElapsedTime > 0 {
			var cancel func()
			ctx, cancel = context.WithTimeout(ctx, r.MaxElapsedTime)
			defer cancel()
		}

		retryNum := 1
		expBackoff.Reset()
	retryLoop:
		for {
			waitTime := expBackoff.NextBackOff()
			select {
			case 

Limitador

// Throttle fornece um middleware para limitar o número de mensagens processadas dentro de um determinado período de tempo.
// Isso pode ser usado para evitar a sobrecarga de manipuladores em uma fila longa não processada.
type Throttle struct {
	ticker *time.Ticker
}
// NewThrottle cria um novo middleware Throttle.
// Exemplo de duração e contagem: NewThrottle(10, time.Second) indica 10 mensagens por segundo.
func NewThrottle(count int64, duration time.Duration) *Throttle {
	return &Throttle{
		ticker: time.NewTicker(duration / time.Duration(count)),
	}
}
// Middleware retorna o middleware Throttle.
func (t Throttle) Middleware(h message.HandlerFunc) message.HandlerFunc {
	return func(message *message.Message) ([]*message.Message, error) {
		// Limitadores compartilhados por vários manipuladores aguardarão seus "ticks".

Tempo Limite

// Timeout cancela o contexto da mensagem de entrada após a duração especificada.
// As funcionalidades sensíveis a tempo limite do manipulador devem ouvir msg.Context().Done() para saber quando falhar.
func Timeout(timeout time.Duration) func(message.HandlerFunc) message.HandlerFunc {
	return func(h message.HandlerFunc) message.HandlerFunc {
		return func(msg *message.Message) ([]*message.Message, error) {
			ctx, cancel := context.WithTimeout(msg.Context(), timeout)
			defer func() {
				cancel()
			}()

			msg.SetContext(ctx)
			return h(msg)
		}
	}
}