Introdução
O middleware é usado para estender o framework de eventos, adicionar funcionalidades personalizadas e fornecer funcionalidades importantes não relacionadas à lógica do manipulador principal. Por exemplo, tentar novamente o manipulador após retornar um erro, ou recuperar de um pânico e capturar a pilha de chamadas dentro do manipulador.
A assinatura da função do middleware é definida da seguinte forma:
Código fonte completo: github.com/ThreeDotsLabs/watermill/message/router.go
// ...
// O HandlerMiddleware nos permite escrever decoradores semelhantes ao manipulador.
// Ele pode executar algumas operações antes do manipulador (por exemplo, modificar a mensagem consumida)
// e também realizar algumas operações após o manipulador (modificar a mensagem produzida, ACK/NACK da mensagem consumida, lidar com erros, fazer logging, etc.).
//
// Ele pode ser anexado ao roteador usando o método `AddMiddleware`.
//
// Exemplo:
//
// func ExampleMiddleware(h message.HandlerFunc) message.HandlerFunc {
// return func(message *message.Message) ([]*message.Message, error) {
// fmt.Println("Antes de executar o manipulador")
// mensagensProduzidas, err := h(message)
// fmt.Println("Depois de executar o manipulador")
//
// retorne mensagensProduzidas, err
// }
// }
type HandlerMiddleware func(h HandlerFunc) HandlerFunc
// ...
Uso
Os middlewares podem ser aplicados a todos os manipuladores no roteador ou a manipuladores específicos. Quando um middleware é adicionado diretamente ao roteador, ele será aplicado a todos os manipuladores fornecidos para o roteador. Se um middleware for aplicado apenas a um manipulador específico, ele precisa ser adicionado ao manipulador no roteador.
Aqui está um exemplo de uso:
Código fonte completo: github.com/ThreeDotsLabs/watermill/_examples/basic/3-router/main.go
// ...
router, err := message.NewRouter(message.RouterConfig{}, logger)
if err != nil {
panic(err)
}
// Ao receber o sinal SIGTERM, o SignalsHandler fechará o roteador graciosamente.
// Você também pode fechar o roteador chamando `r.Close()`.
router.AddPlugin(plugin.SignalsHandler)
// O middleware do nível do roteador será executado em cada mensagem enviada para o roteador
router.AddMiddleware(
// CorrelationID irá copiar o ID de correlação dos metadados da mensagem de entrada para a mensagem gerada
middleware.CorrelationID,
// Se o manipulador retornar um erro, ele será reenviado.
// Ele será reenviado no máximo MaxRetries vezes, após o que a mensagem será Nacked e reenviada pelo PubSub.
middleware.Retry{
MaxRetries: 3,
InitialInterval: time.Millisecond * 100,
Logger: logger,
}.Middleware,
// Recoverer lida com panics no manipulador.
// Neste caso, ele os passa como erros para o middleware de Retransmissão.
middleware.Recoverer,
)
// Para simplicidade, usamos o Pub/Sub gochannel aqui,
// você pode substituí-lo por qualquer implementação de Pub/Sub e funcionará da mesma forma.
pubSub := gochannel.NewGoChannel(gochannel.Config{}, logger)
// Publica algumas mensagens de entrada em segundo plano
go publishMessages(pubSub)
// AddHandler retorna um manipulador que pode ser usado para adicionar middlewares de nível de manipulador
// ou para parar o manipulador.
handler := router.AddHandler(
"struct_handler", // Nome do manipulador, deve ser único
"incoming_messages_topic", // Tópico do qual os eventos serão lidos
pubSub,
"outgoing_messages_topic", // Tópico para o qual os eventos serão publicados
pubSub,
structHandler{}.Handler,
)
// Middlewares de nível de manipulador são executados apenas para manipuladores específicos
// Tais middlewares podem ser adicionados ao manipulador da mesma forma que os middlewares de nível de roteador
handler.AddMiddleware(func(h message.HandlerFunc) message.HandlerFunc {
return func(message *message.Message) ([]*message.Message, error) {
log.Println("Executando middleware específico do manipulador para", message.UUID)
return h(message)
}
})
// Apenas para fins de depuração, imprimimos todas as mensagens recebidas no `incoming_messages_topic`
router.AddNoPublisherHandler(
"print_incoming_messages",
"incoming_messages_topic",
pubSub,
printMessages,
)
// Apenas para fins de depuração, imprimimos todos os eventos enviados para `outgoing_messages_topic`
router.AddNoPublisherHandler(
"print_outgoing_messages",
"outgoing_messages_topic",
pubSub,
printMessages,
)
// Agora que todos os manipuladores foram registrados, podemos executar o roteador.
// Run bloqueará até que o roteador pare de funcionar.
// ...
Middlewares Disponíveis
Aqui estão os middlewares reutilizáveis fornecidos pelo Watermill, e você também pode implementar facilmente seu próprio middleware. Por exemplo, se você deseja armazenar cada mensagem recebida em um determinado formato de log, esta é a melhor maneira de fazer isso.
Disjuntor
// CircuitBreaker é um middleware que envolve o manipulador em um disjuntor.
// Com base na configuração, o disjuntor irá falhar rapidamente se o manipulador continuar a retornar erros.
// Isso é útil para evitar falhas em cascata.
type CircuitBreaker struct {
cb *gobreaker.CircuitBreaker
}
// NewCircuitBreaker retorna um novo middleware CircuitBreaker.
// Para configurações disponíveis, consulte a documentação do gobreaker.
func NewCircuitBreaker(configurações gobreaker.Settings) CircuitBreaker {
return CircuitBreaker{
cb: gobreaker.NewCircuitBreaker(configurações),
}
}
// Middleware retorna o middleware CircuitBreaker.
func (c CircuitBreaker) Middleware(h message.HandlerFunc) message.HandlerFunc {
return func(msg *message.Message) ([]*message.Message, error) {
out, err := c.cb.Execute(func() (interface{}, error) {
return h(msg)
})
var resultado []*message.Message
if out != nil {
resultado = out.([]*message.Message)
}
return resultado, err
}
}
Correlação
// SetCorrelationID define o ID de correlação para a mensagem.
//
// Quando uma mensagem entra no sistema, SetCorrelationID deve ser chamado.
// Quando uma mensagem é gerada em uma solicitação (por exemplo, HTTP), o ID de correlação da mensagem deve ser o mesmo que o ID de correlação da solicitação.
func SetCorrelationID(id string, msg *message.Message) {
if MessageCorrelationID(msg) != "" {
return
}
msg.Metadata.Set(ChaveMetadadosCorrelaçãoID, id)
}
// MessageCorrelationID retorna o ID de correlação da mensagem.
func MessageCorrelationID(mensagem *message.Message) string {
return mensagem.Metadata.Get(ChaveMetadadosCorrelaçãoID)
}
// CorrelationID adiciona um ID de correlação a todas as mensagens geradas pelo manipulador.
// O ID é baseado no ID da mensagem recebida pelo manipulador.
//
// Para que CorrelationID funcione corretamente, SetCorrelationID deve ser chamado primeiro para a mensagem entrar no sistema.
func CorrelationID(h message.HandlerFunc) message.HandlerFunc {
return func(mensagem *message.Message) ([]*message.Message, error) {
mensagensProduzidas, err := h(mensagem)
idCorrelação := MessageCorrelationID(mensagem)
for _, msg := range mensagensProduzidas {
SetCorrelationID(idCorrelação, msg)
}
return mensagensProduzidas, err
}
}
Duplicador
// Duplicador processa a mensagem duas vezes para garantir que o ponto final seja idempotente.
func Duplicador(h message.HandlerFunc) message.HandlerFunc {
return func(msg *message.Message) ([]*message.Message, error) {
mensagensProduzidasPrimeira, erroPrimeira := h(msg)
if erroPrimeira != nil {
return nil, erroPrimeira
}
mensagensProduzidasSegunda, erroSegunda := h(msg)
if erroSegunda != nil {
return nil, erroSegunda
}
return append(mensagensProduzidasPrimeira, mensagensProduzidasSegunda...), nil
}
}
Ignorar Erros
// IgnoreErrors fornece um middleware que permite ao manipulador ignorar certos erros explicitamente definidos.
type IgnoreErrors struct {
ignoredErrors map[string]struct{}
}
// NewIgnoreErrors cria um novo middleware IgnoreErrors.
func NewIgnoreErrors(errs []error) IgnoreErrors {
errsMap := make(map[string]struct{}, len(errs))
for _, err := range errs {
errsMap[err.Error()] = struct{}{}
}
return IgnoreErrors{errsMap}
}
// Middleware retorna o middleware IgnoreErrors.
func (i IgnoreErrors) Middleware(h message.HandlerFunc) message.HandlerFunc {
return func(msg *message.Message) ([]*message.Message, error) {
eventos, err := h(msg)
if err != nil {
if _, ok := i.ignoredErrors[errors.Cause(err).Error()]; ok {
return eventos, nil
}
return eventos, err
}
return eventos, nil
}
}
Ack Imediato
// InstantAck faz o manipulador reconhecer imediatamente a mensagem recebida, independentemente de quaisquer erros.
// Pode ser usado para melhorar a taxa de transferência, mas o trade-off é:
// Se você precisa garantir a entrega exatamente uma vez, é possível que receba a entrega pelo menos uma vez.
// Se você precisar de mensagens ordenadas, pode quebrar a ordenação.
func InstantAck(h message.HandlerFunc) message.HandlerFunc {
return func(message *message.Message) ([]*message.Message, error) {
message.Ack()
return h(message)
}
}
Poison
// PoisonQueue fornece um recurso de middleware para lidar com mensagens impossíveis de processar e publicá-las em um tópico separado.
// Em seguida, a cadeia principal de middleware continua a ser executada, e o negócio prossegue como de costume.
func PoisonQueue(pub message.Publisher, tópico string) (message.HandlerMiddleware, error) {
if tópico == "" {
return nil, ErrInvalidPoisonQueueTopic
}
pq := poisonQueue{
tópico: tópico,
pub: pub,
shouldGoToPoisonQueue: func(err error) bool {
return true
},
}
return pq.Middleware, nil
}
// PoisonQueueWithFilter é semelhante ao PoisonQueue, mas aceita uma função para determinar quais erros atendem aos critérios da fila de mensagens impróprias.
func PoisonQueueWithFilter(pub message.Publisher, tópico string, shouldGoToPoisonQueue func(err error) bool) (message.HandlerMiddleware, error) {
if tópico == "" {
return nil, ErrInvalidPoisonQueueTopic
}
pq := poisonQueue{
tópico: tópico,
pub: pub,
shouldGoToPoisonQueue: shouldGoToPoisonQueue,
}
return pq.Middleware, nil
}
Random Fail
// RandomFail faz com que o manipulador falhe com base em uma probabilidade aleatória. A probabilidade de erro deve estar dentro da faixa (0, 1).
func RandomFail(probabilidadeDeErro float32) message.HandlerMiddleware {
return func(h message.HandlerFunc) message.HandlerFunc {
return func(mensagem *message.Message) ([]*message.Message, error) {
if deveFalhar(probabilidadeDeErro) {
return nil, errors.New("ocorreu um erro aleatório")
}
return h(mensagem)
}
}
}
// RandomPanic faz com que o manipulador entre em pânico com base em uma probabilidade aleatória. A probabilidade de pânico deve estar dentro da faixa (0, 1).
func RandomPanic(probabilidadeDePânico float32) message.HandlerMiddleware {
return func(h message.HandlerFunc) message.HandlerFunc {
return func(mensagem *message.Message) ([]*message.Message, error) {
if deveFalhar(probabilidadeDePânico) {
panic("ocorreu um pânico aleatório")
}
return h(mensagem)
}
}
}
Recoverer
// RecoveredPanicError guarda o erro de pânico recuperado e suas informações de rastreamento de pilha.
type RecoveredPanicError struct {
V interface{}
Stacktrace string
}
// Recoverer recupera qualquer pânico do manipulador e anexa o RecoveredPanicError com rastreamento de pilha a qualquer erro retornado pelo manipulador.
func Recoverer(h message.HandlerFunc) message.HandlerFunc {
return func(event *message.Message) (events []*message.Message, err error) {
panicked := true
defer func() {
if r := recover(); r != nil || panicked {
err = errors.WithStack(RecoveredPanicError{V: r, Stacktrace: string(debug.Stack())})
}
}()
events, err = h(event)
panicked = false
return events, err
}
}
Tentativa
// Retry fornece um middleware que tenta novamente o manipulador se um erro for retornado.
// O comportamento de tentativa, retrocesso exponencial e tempo decorrido máximo podem ser configurados.
type Retry struct {
// MaxRetries é o número máximo de tentativas a serem feitas.
MaxRetries int
// InitialInterval é o intervalo inicial entre as tentativas. Os intervalos subsequentes serão escalonados pelo Multiplicador.
InitialInterval time.Duration
// MaxInterval define o limite superior para o retrocesso exponencial das tentativas.
MaxInterval time.Duration
// Multiplicador é o fator pelo qual o intervalo de espera entre as tentativas será multiplicado.
Multiplicador float64
// MaxElapsedTime define o limite de tempo máximo para tentativas. Se 0, fica desativado.
MaxElapsedTime time.Duration
// RandomizationFactor espalha aleatoriamente o tempo de espera dentro da seguinte faixa:
// [currentInterval * (1 - randomization_factor), currentInterval * (1 + randomization_factor)].
RandomizationFactor float64
// OnRetryHook é uma função opcional a ser executada em cada tentativa de repetição.
// O número de tentativa atual é passado através de retryNum.
OnRetryHook func(retryNum int, delay time.Duration)
Logger watermill.LoggerAdapter
}
// Middleware retorna o middleware Retry.
func (r Retry) Middleware(h message.HandlerFunc) message.HandlerFunc {
return func(msg *message.Message) ([]*message.Message, error) {
producedMessages, err := h(msg)
if err == nil {
return producedMessages, nil
}
expBackoff := backoff.NewExponentialBackOff()
expBackoff.InitialInterval = r.InitialInterval
expBackoff.MaxInterval = r.MaxInterval
expBackoff.Multiplier = r.Multiplicador
expBackoff.MaxElapsedTime = r.MaxElapsedTime
expBackoff.RandomizationFactor = r.RandomizationFactor
ctx := msg.Context()
if r.MaxElapsedTime > 0 {
var cancel func()
ctx, cancel = context.WithTimeout(ctx, r.MaxElapsedTime)
defer cancel()
}
retryNum := 1
expBackoff.Reset()
retryLoop:
for {
waitTime := expBackoff.NextBackOff()
select {
case
Limitador
// Throttle fornece um middleware para limitar o número de mensagens processadas dentro de um determinado período de tempo.
// Isso pode ser usado para evitar a sobrecarga de manipuladores em uma fila longa não processada.
type Throttle struct {
ticker *time.Ticker
}
// NewThrottle cria um novo middleware Throttle.
// Exemplo de duração e contagem: NewThrottle(10, time.Second) indica 10 mensagens por segundo.
func NewThrottle(count int64, duration time.Duration) *Throttle {
return &Throttle{
ticker: time.NewTicker(duration / time.Duration(count)),
}
}
// Middleware retorna o middleware Throttle.
func (t Throttle) Middleware(h message.HandlerFunc) message.HandlerFunc {
return func(message *message.Message) ([]*message.Message, error) {
// Limitadores compartilhados por vários manipuladores aguardarão seus "ticks".
Tempo Limite
// Timeout cancela o contexto da mensagem de entrada após a duração especificada.
// As funcionalidades sensíveis a tempo limite do manipulador devem ouvir msg.Context().Done() para saber quando falhar.
func Timeout(timeout time.Duration) func(message.HandlerFunc) message.HandlerFunc {
return func(h message.HandlerFunc) message.HandlerFunc {
return func(msg *message.Message) ([]*message.Message, error) {
ctx, cancel := context.WithTimeout(msg.Context(), timeout)
defer func() {
cancel()
}()
msg.SetContext(ctx)
return h(msg)
}
}
}