Introducción
El middleware se utiliza para extender el marco de eventos, agregar funcionalidades personalizadas y proporcionar funcionalidades importantes no relacionadas con la lógica del manejador principal. Por ejemplo, reintentar el manejador después de devolver un error, o recuperarse de un pánico y capturar la pila de llamadas dentro del manejador.
La firma de la función de middleware se define de la siguiente manera:
Código fuente completo: github.com/ThreeDotsLabs/watermill/message/router.go
// ...
// HandlerMiddleware nos permite escribir decoradores similares al manejador.
// Puede ejecutar algunas operaciones antes del manejador (por ejemplo, modificar el mensaje consumido)
// y también realizar algunas operaciones después del manejador (modificar el mensaje producido, ACK/NACK del mensaje consumido, manejar errores, registro, etc.).
//
// Se puede adjuntar al enrutador utilizando el método `AddMiddleware`.
//
// Ejemplo:
//
// func ExampleMiddleware(h message.HandlerFunc) message.HandlerFunc {
// return func(message *message.Message) ([]*message.Message, error) {
// fmt.Println("Antes de ejecutar el manejador")
// producedMessages, err := h(message)
// fmt.Println("Después de ejecutar el manejador")
//
// return producedMessages, err
// }
// }
type HandlerMiddleware func(h HandlerFunc) HandlerFunc
// ...
Uso
El middleware se puede aplicar a todos los controladores en el enrutador o a controladores específicos. Cuando se agrega middleware directamente al enrutador, se aplicará a todos los controladores proporcionados para el enrutador. Si un middleware solo se aplica a un controlador específico, debe agregarse al controlador en el enrutador.
Aquí tienes un ejemplo de uso:
Código fuente completo: github.com/ThreeDotsLabs/watermill/_examples/basic/3-router/main.go
// ...
router, err := message.NewRouter(message.RouterConfig{}, logger)
if err != nil {
panic(err)
}
// Cuando se reciba la señal SIGTERM, SignalsHandler cerrará el enrutador de forma elegante.
// También puedes cerrar el enrutador llamando a `r.Close()`.
router.AddPlugin(plugin.SignalsHandler)
// El middleware a nivel de enrutador se ejecutará en cada mensaje enviado al enrutador
router.AddMiddleware(
// CorrelationID copiará el ID de correlación de los metadatos del mensaje entrante al mensaje generado
middleware.CorrelationID,
// Si el controlador devuelve un error, se intentará de nuevo.
// Se intentará como máximo MaxRetries veces, después de las cuales el mensaje será Nacked y reenviado por PubSub.
middleware.Retry{
MaxRetries: 3,
InitialInterval: time.Millisecond * 100,
Logger: logger,
}.Middleware,
// Recoverer maneja las fallas en el controlador.
// En este caso, las pasa como errores al middleware Retry.
middleware.Recoverer,
)
// Por simplicidad, usamos gochannel Pub/Sub aquí,
// puedes reemplazarlo con cualquier implementación de Pub/Sub y funcionará de la misma manera.
pubSub := gochannel.NewGoChannel(gochannel.Config{}, logger)
// Publica algunos mensajes entrantes en segundo plano
go publishMessages(pubSub)
// AddHandler devuelve un controlador que se puede usar para agregar middleware a nivel de controlador
// o para detener el controlador.
handler := router.AddHandler(
"struct_handler", // Nombre del controlador, debe ser único
"incoming_messages_topic", // Tema del que se leerán los eventos
pubSub,
"outgoing_messages_topic", // Tema al que se publicarán los eventos
pubSub,
structHandler{}.Handler,
)
// El middleware a nivel de controlador solo se ejecuta para controladores específicos
// Dicho middleware se puede agregar al controlador de la misma manera que el middleware a nivel de enrutador
handler.AddMiddleware(func(h message.HandlerFunc) message.HandlerFunc {
return func(message *message.Message) ([]*message.Message, error) {
log.Println("Ejecutando middleware específico del controlador para", message.UUID)
return h(message)
}
})
// Solo con fines de depuración, imprimimos todos los mensajes recibidos en `incoming_messages_topic`
router.AddNoPublisherHandler(
"print_incoming_messages",
"incoming_messages_topic",
pubSub,
printMessages,
)
// Solo con fines de depuración, imprimimos todos los eventos enviados a `outgoing_messages_topic`
router.AddNoPublisherHandler(
"print_outgoing_messages",
"outgoing_messages_topic",
pubSub,
printMessages,
)
// Ahora que todos los controladores han sido registrados, podemos ejecutar el enrutador.
// Run bloqueará hasta que el enrutador deje de ejecutarse.
// ...
Middleware Disponible
Aquí están los middlewares reutilizables proporcionados por Watermill, y también puedes implementar fácilmente tu propio middleware. Por ejemplo, si deseas almacenar cada mensaje entrante en un cierto tipo de formato de registro, esta es la mejor manera de hacerlo.
Interruptor de circuito
// CircuitBreaker es un middleware que envuelve el controlador en un interruptor de circuito.
// Según la configuración, el interruptor de circuito fallará rápidamente si el controlador continúa devolviendo errores.
// Esto es útil para prevenir fallas en cascada.
type CircuitBreaker struct {
cb *gobreaker.CircuitBreaker
}
// NewCircuitBreaker devuelve un nuevo middleware CircuitBreaker.
// Para ver la configuración disponible, consulte la documentación de gobreaker.
func NewCircuitBreaker(settings gobreaker.Settings) CircuitBreaker {
return CircuitBreaker{
cb: gobreaker.NewCircuitBreaker(settings),
}
}
// Middleware devuelve el middleware CircuitBreaker.
func (c CircuitBreaker) Middleware(h message.HandlerFunc) message.HandlerFunc {
return func(msg *message.Message) ([]*message.Message, error) {
out, err := c.cb.Execute(func() (interface{}, error) {
return h(msg)
})
var result []*message.Message
if out != nil {
result = out.([]*message.Message)
}
return result, err
}
}
Correlación
// SetCorrelationID establece el ID de correlación para el mensaje.
//
// Cuando un mensaje entra en el sistema, se debe llamar a SetCorrelationID.
// Cuando se genera un mensaje en una solicitud (por ejemplo, HTTP), el ID de correlación del mensaje debe ser el mismo que el ID de correlación de la solicitud.
func SetCorrelationID(id string, msg *message.Message) {
if MessageCorrelationID(msg) != "" {
return
}
msg.Metadata.Set(CorrelationIDMetadataKey, id)
}
// MessageCorrelationID devuelve el ID de correlación del mensaje.
func MessageCorrelationID(message *message.Message) string {
return message.Metadata.Get(CorrelationIDMetadataKey)
}
// CorrelationID agrega un ID de correlación a todos los mensajes generados por el controlador.
// El ID se basa en el ID de mensaje recibido por el controlador.
//
// Para que CorrelationID funcione correctamente, primero se debe llamar a SetCorrelationID para que el mensaje entre en el sistema.
func CorrelationID(h message.HandlerFunc) message.HandlerFunc {
return func(message *message.Message) ([]*message.Message, error) {
producedMessages, err := h(message)
correlationID := MessageCorrelationID(message)
for _, msg := range producedMessages {
SetCorrelationID(correlationID, msg)
}
return producedMessages, err
}
}
Duplicador
// Duplicador procesa el mensaje dos veces para garantizar que el punto de conexión sea idempotente.
func Duplicator(h message.HandlerFunc) message.HandlerFunc {
return func(msg *message.Message) ([]*message.Message, error) {
firstProducedMessages, firstErr := h(msg)
if firstErr != nil {
return nil, firstErr
}
secondProducedMessages, secondErr := h(msg)
if secondErr != nil {
return nil, secondErr
}
return append(firstProducedMessages, secondProducedMessages...), nil
}
}
Ignorar Errores
// IgnoreErrors proporciona un middleware que permite al controlador ignorar ciertos errores definidos explícitamente.
type IgnoreErrors struct {
ignoredErrors map[string]struct{}
}
// NewIgnoreErrors crea un nuevo middleware IgnoreErrors.
func NewIgnoreErrors(errs []error) IgnoreErrors {
errsMap := make(map[string]struct{}, len(errs))
for _, err := range errs {
errsMap[err.Error()] = struct{}{}
}
return IgnoreErrors{errsMap}
}
// Middleware devuelve el middleware IgnoreErrors.
func (i IgnoreErrors) Middleware(h message.HandlerFunc) message.HandlerFunc {
return func(msg *message.Message) ([]*message.Message, error) {
events, err := h(msg)
if err != nil {
if _, ok := i.ignoredErrors[errors.Cause(err).Error()]; ok {
return events, nil
}
return events, err
}
return events, nil
}
}
Aprobación Instantánea
// InstantAck hace que el controlador apruebe inmediatamente el mensaje entrante, independientemente de cualquier error.
// Puede usarse para mejorar el rendimiento, pero el inconveniente es:
// Si necesita asegurar la entrega exactamente una vez, puede obtener al menos una vez la entrega.
// Si requiere mensajes ordenados, puede romper el orden.
func InstantAck(h message.HandlerFunc) message.HandlerFunc {
return func(message *message.Message) ([]*message.Message, error) {
message.Ack()
return h(message)
}
}
Veneno
// PoisonQueue proporciona una función intermedia para manejar mensajes no procesables y publicarlos en un tema separado.
// Luego, la cadena principal de funciones intermedias continúa ejecutándose y el negocio continúa como de costumbre.
func PoisonQueue(pub message.Publisher, topic string) (message.HandlerMiddleware, error) {
if topic == "" {
return nil, ErrInvalidPoisonQueueTopic
}
pq := poisonQueue{
topic: topic,
pub: pub,
shouldGoToPoisonQueue: func(err error) bool {
return true
},
}
return pq.Middleware, nil
}
// PoisonQueueWithFilter es similar a PoisonQueue, pero acepta una función para determinar qué errores cumplen con los criterios de la cola de veneno.
func PoisonQueueWithFilter(pub message.Publisher, topic string, shouldGoToPoisonQueue func(err error) bool) (message.HandlerMiddleware, error) {
if topic == "" {
return nil, ErrInvalidPoisonQueueTopic
}
pq := poisonQueue{
topic: topic,
pub: pub,
shouldGoToPoisonQueue: shouldGoToPoisonQueue,
}
return pq.Middleware, nil
}
Fallo Aleatorio
// RandomFail hace que el manejador falle basado en una probabilidad aleatoria. La probabilidad de error debe estar dentro del rango (0, 1).
func RandomFail(probabilidadDeError float32) message.HandlerMiddleware {
return func(h message.HandlerFunc) message.HandlerFunc {
return func(mensaje *message.Message) ([]*message.Message, error) {
if shouldFail(probabilidadDeError) {
return nil, errors.New("ocurrió un error aleatorio")
}
return h(mensaje)
}
}
}
// RandomPanic hace que el manejador genere un pánico basado en una probabilidad aleatoria. La probabilidad de pánico debe estar dentro del rango (0, 1).
func RandomPanic(probabilidadDePánico float32) message.HandlerMiddleware {
return func(h message.HandlerFunc) message.HandlerFunc {
return func(mensaje *message.Message) ([]*message.Message, error) {
if shouldFail(probabilidadDePánico) {
panic("ocurrió un pánico aleatorio")
}
return h(mensaje)
}
}
}
Recuperador
// RecoveredPanicError contiene el error recuperado del pánico y su información de traza de pila.
type RecoveredPanicError struct {
V interface{}
Stacktrace string
}
// Recoverer recupera cualquier pánico del manejador y adjunta RecoveredPanicError con la traza de pila al error devuelto por el manejador.
func Recoverer(h message.HandlerFunc) message.HandlerFunc {
return func(event *message.Message) (events []*message.Message, err error) {
panicked := true
defer func() {
if r := recover(); r != nil || panicked {
err = errors.WithStack(RecoveredPanicError{V: r, Stacktrace: string(debug.Stack())})
}
}()
events, err = h(event)
panicked = false
return events, err
}
}
Reintentar
// Retry proporciona un middleware que reintenta el controlador si se devuelve un error.
// El comportamiento de reintento, la atenuación exponencial y el tiempo transcurrido máximo se pueden configurar.
type Retry struct {
// MaxRetries es el número máximo de intentos a realizar.
MaxRetries int
// InitialInterval es el intervalo inicial entre reintentos. Los intervalos subsiguientes se escalarán por el Multiplicador.
InitialInterval time.Duration
// MaxInterval establece el límite superior para la atenuación exponencial de los reintentos.
MaxInterval time.Duration
// El Multiplicador es el factor por el cual se multiplicará el intervalo de espera entre reintentos.
Multiplier float64
// MaxElapsedTime establece el límite de tiempo máximo para los reintentos. Si es 0, está deshabilitado.
MaxElapsedTime time.Duration
// RandomizationFactor esparce aleatoriamente el tiempo de espera dentro del siguiente rango:
// [currentInterval * (1 - randomization_factor), currentInterval * (1 + randomization_factor)].
RandomizationFactor float64
// OnRetryHook es una función opcional que se ejecutará en cada intento de reintento.
// El número de reintento actual se pasa a través de retryNum.
OnRetryHook func(retryNum int, delay time.Duration)
Logger watermill.LoggerAdapter
}
// Middleware devuelve el middleware Retry.
func (r Retry) Middleware(h message.HandlerFunc) message.HandlerFunc {
return func(msg *message.Message) ([]*message.Message, error) {
producedMessages, err := h(msg)
if err == nil {
return producedMessages, nil
}
expBackoff := backoff.NewExponentialBackOff()
expBackoff.InitialInterval = r.InitialInterval
expBackoff.MaxInterval = r.MaxInterval
expBackoff.Multiplier = r.Multiplier
expBackoff.MaxElapsedTime = r.MaxElapsedTime
expBackoff.RandomizationFactor = r.RandomizationFactor
ctx := msg.Context()
if r.MaxElapsedTime > 0 {
var cancel func()
ctx, cancel = context.WithTimeout(ctx, r.MaxElapsedTime)
defer cancel()
}
retryNum := 1
expBackoff.Reset()
retryLoop:
for {
waitTime := expBackoff.NextBackOff()
select {
case
Limitador
// Throttle proporciona un middleware para limitar la cantidad de mensajes procesados dentro de un cierto período de tiempo.
// Esto se puede utilizar para evitar la sobrecarga de controladores que se ejecutan en una larga cola no procesada.
type Throttle struct {
ticker *time.Ticker
}
// NewThrottle crea un nuevo middleware Throttle.
// Ejemplo de duración y recuento: NewThrottle(10, time.Second) indica 10 mensajes por segundo.
func NewThrottle(count int64, duration time.Duration) *Throttle {
return &Throttle{
ticker: time.NewTicker(duration / time.Duration(count)),
}
}
// Middleware devuelve el middleware Throttle.
func (t Throttle) Middleware(h message.HandlerFunc) message.HandlerFunc {
return func(message *message.Message) ([]*message.Message, error) {
// Los limitadores compartidos por varios controladores esperarán sus "ticks".
Tiempo de espera
// Timeout cancela el contexto del mensaje entrante después de la duración especificada.
// Cualquier funcionalidad sensible al tiempo de espera del controlador debería escuchar msg.Context().Done() para saber cuándo fallar.
func Timeout(timeout time.Duration) func(message.HandlerFunc) message.HandlerFunc {
return func(h message.HandlerFunc) message.HandlerFunc {
return func(msg *message.Message) ([]*message.Message, error) {
ctx, cancel := context.WithTimeout(msg.Context(), timeout)
defer func() {
cancel()
}()
msg.SetContext(ctx)
return h(msg)
}
}
}