Introduction

Le middleware est utilisé pour étendre le framework d'événements, ajouter des fonctionnalités personnalisées et fournir des fonctionnalités importantes indépendantes de la logique du gestionnaire principal. Par exemple, réessayer le gestionnaire après avoir renvoyé une erreur, ou récupérer d'une panique et capturer la trace de la pile dans le gestionnaire.

La signature de la fonction middleware est définie comme suit :

Code source complet : github.com/ThreeDotsLabs/watermill/message/router.go

// ...
// HandlerMiddleware nous permet d'écrire des décorateurs similaires au gestionnaire.
// Il peut exécuter des opérations avant le gestionnaire (par exemple, modifier le message consommé)
// et également effectuer des opérations après le gestionnaire (modifier le message produit, ACK/NACK le message consommé, gérer les erreurs, les journaux, etc.).
//
// Il peut être attaché au routeur en utilisant la méthode `AddMiddleware`.
//
// Exemple :
//
//	func ExampleMiddleware(h message.HandlerFunc) message.HandlerFunc {
//		return func(message *message.Message) ([]*message.Message, error) {
//			fmt.Println("Avant d'exécuter le gestionnaire")
//			messagesProduits, err := h(message)
//			fmt.Println("Après avoir exécuté le gestionnaire")
//
//			return messagesProduits, err
//		}
//	}
type HandlerMiddleware func(h HandlerFunc) HandlerFunc
// ...

Utilisation

Les middlewares peuvent être appliqués à tous les gestionnaires du routeur ou à des gestionnaires spécifiques. Lorsqu'un middleware est ajouté directement au routeur, il sera appliqué à tous les gestionnaires fournis pour le routeur. Si un middleware n'est appliqué qu'à un gestionnaire spécifique, il doit être ajouté au gestionnaire dans le routeur.

Voici un exemple d'utilisation :

Code source complet : github.com/ThreeDotsLabs/watermill/_examples/basic/3-router/main.go

// ...
router, err := message.NewRouter(message.RouterConfig{}, logger)
if err != nil {
    panic(err)
}

// Lors de la réception du signal SIGTERM, SignalsHandler fermera proprement le routeur.
// Vous pouvez également fermer le routeur en appelant `r.Close()`.
router.AddPlugin(plugin.SignalsHandler)

// Les middlewares au niveau du routeur seront exécutés sur chaque message envoyé au routeur
router.AddMiddleware(
    // CorrelationID copiera l'ID de corrélation des métadonnées du message entrant vers le message généré
    middleware.CorrelationID,

    // Si le gestionnaire renvoie une erreur, celle-ci sera réessayée.
    // Elle sera réessayée au plus MaxRetries fois, après quoi le message sera Nacked et renvoyé par PubSub.
    middleware.Retry{
        MaxRetries:      3,
        InitialInterval: time.Millisecond * 100,
        Logger:          logger,
    }.Middleware,

    // Recoverer gère les paniques dans le gestionnaire.
    // Dans ce cas, elle les transmet en tant qu'erreurs au middleware Retry.
    middleware.Recoverer,
)

// Pour simplifier, nous utilisons ici le Pub/Sub gochannel,
// vous pouvez le remplacer par n'importe quelle implémentation Pub/Sub et cela fonctionnera de la même manière.
pubSub := gochannel.NewGoChannel(gochannel.Config{}, logger)

// Publier certains messages entrants en arrière-plan
go publishMessages(pubSub)

// AddHandler renvoie un gestionnaire qui peut être utilisé pour ajouter des middlewares au niveau du gestionnaire
// ou pour arrêter le gestionnaire.
handler := router.AddHandler(
    "struct_handler",          // Nom du gestionnaire, doit être unique
    "incoming_messages_topic", // Sujet à partir duquel les événements seront lus
    pubSub,
    "outgoing_messages_topic", // Sujet vers lequel les événements seront publiés
    pubSub,
    structHandler{}.Handler,
)

// Les middlewares au niveau du gestionnaire ne sont exécutés que pour des gestionnaires spécifiques
// De tels middlewares peuvent être ajoutés au gestionnaire de la même manière qu'au niveau du routeur
handler.AddMiddleware(func(h message.HandlerFunc) message.HandlerFunc {
    return func(message *message.Message) ([]*message.Message, error) {
        log.Println("Exécution des middlewares spécifiques au gestionnaire pour", message.UUID)

        return h(message)
    }
})

// Uniquement à des fins de débogage, nous imprimons tous les messages reçus sur `incoming_messages_topic`
router.AddNoPublisherHandler(
    "print_incoming_messages",
    "incoming_messages_topic",
    pubSub,
    printMessages,
)

// Uniquement à des fins de débogage, nous imprimons tous les événements envoyés à `outgoing_messages_topic`
router.AddNoPublisherHandler(
    "print_outgoing_messages",
    "outgoing_messages_topic",
    pubSub,
    printMessages,
)

// Maintenant que tous les gestionnaires ont été enregistrés, nous pouvons exécuter le routeur.
// Run bloquera jusqu'à ce que le routeur cesse de fonctionner.
// ...

Middlewares disponibles

Voici les middlewares réutilisables fournis par Watermill, et vous pouvez également implémenter facilement vos propres middlewares. Par exemple, si vous souhaitez stocker chaque message entrant dans un certain type de format de journal, c'est la meilleure façon de le faire.

Disjoncteur

// CircuitBreaker est un middleware qui enveloppe le gestionnaire dans un disjoncteur.
// En fonction de la configuration, le disjoncteur sera en panne rapide si le gestionnaire continue de renvoyer des erreurs.
// Ceci est utile pour prévenir les défaillances en cascade.
type CircuitBreaker struct {
    cb *gobreaker.CircuitBreaker
}
// NewCircuitBreaker retourne un nouveau middleware CircuitBreaker.
// Pour les paramètres disponibles, veuillez vous référer à la documentation de gobreaker.
func NewCircuitBreaker(settings gobreaker.Settings) CircuitBreaker {
    return CircuitBreaker{
        cb: gobreaker.NewCircuitBreaker(settings),
    }
}
// Middleware renvoie le middleware CircuitBreaker.
func (c CircuitBreaker) Middleware(h message.HandlerFunc) message.HandlerFunc {
    return func(msg *message.Message) ([]*message.Message, error) {
        out, err := c.cb.Execute(func() (interface{}, error) {
            return h(msg)
        })

        var result []*message.Message
        if out != nil {
            result = out.([]*message.Message)
        }

        return result, err
    }
}

Corrélation

// SetCorrelationID définit l'ID de corrélation pour le message.
//
// Lorsqu'un message entre dans le système, SetCorrelationID doit être appelé.
// Lorsqu'un message est généré dans une requête (par exemple, HTTP), l'ID de corrélation du message doit être le même que l'ID de corrélation de la requête.
func SetCorrelationID(id string, msg *message.Message) {
    if MessageCorrelationID(msg) != "" {
        return
    }

    msg.Metadata.Set(CorrelationIDMetadataKey, id)
}
// MessageCorrelationID renvoie l'ID de corrélation du message.
func MessageCorrelationID(message *message.Message) string {
    return message.Metadata.Get(CorrelationIDMetadataKey)
}
// CorrelationID ajoute un ID de corrélation à tous les messages générés par le gestionnaire.
// L'ID est basé sur l'ID de message reçu par le gestionnaire.
//
// Pour que CorrelationID fonctionne correctement, SetCorrelationID doit être appelé en premier pour que le message entre dans le système.
func CorrelationID(h message.HandlerFunc) message.HandlerFunc {
    return func(message *message.Message) ([]*message.Message, error) {
        producedMessages, err := h(message)

        correlationID := MessageCorrelationID(message)
        for _, msg := range producedMessages {
            SetCorrelationID(correlationID, msg)
        }

        return producedMessages, err
    }
}

Duplicateur

// Duplicator traite le message deux fois pour garantir que le point de terminaison est idempotent.
func Duplicator(h message.HandlerFunc) message.HandlerFunc {
    return func(msg *message.Message) ([]*message.Message, error) {
        firstProducedMessages, firstErr := h(msg)
        if firstErr != nil {
            return nil, firstErr
        }

        secondProducedMessages, secondErr := h(msg)
        if secondErr != nil {
            return nil, secondErr
        }

        return append(firstProducedMessages, secondProducedMessages...), nil
    }
}

Ignorer les erreurs

// IgnoreErrors fournit un middleware qui permet au gestionnaire d'ignorer certaines erreurs explicitement définies.
type IgnoreErrors struct {
    ignoredErrors map[string]struct{}
}
// NewIgnoreErrors crée un nouveau middleware IgnoreErrors.
func NewIgnoreErrors(errs []error) IgnoreErrors {
    errsMap := make(map[string]struct{}, len(errs))

    for _, err := range errs {
        errsMap[err.Error()] = struct{}{}
    }

    return IgnoreErrors{errsMap}
}
// Middleware renvoie le middleware IgnoreErrors.
func (i IgnoreErrors) Middleware(h message.HandlerFunc) message.HandlerFunc {
    return func(msg *message.Message) ([]*message.Message, error) {
        events, err := h(msg)
        if err != nil {
            if _, ok := i.ignoredErrors[errors.Cause(err).Error()]; ok {
                return events, nil
            }

            return events, err
        }

        return events, nil
    }
}

Acknowledge Instantané

// InstantAck fait en sorte que le gestionnaire accueille immédiatement le message entrant, indépendamment de toute erreur.
// Il peut être utilisé pour améliorer le débit, mais le compromis est :
// Si vous avez besoin de garantir une livraison exactement une fois, vous pouvez obtenir au moins une fois la livraison.
// Si vous avez besoin de messages ordonnés, cela peut rompre l'ordre.
func InstantAck(h message.HandlerFunc) message.HandlerFunc {
	return func(message *message.Message) ([]*message.Message, error) {
		message.Ack()
		return h(message)
	}
}

Poison

// PoisonQueue fournit une fonctionnalité intermédiaire pour gérer les messages non traitables et les publier sur un sujet séparé.
// Ensuite, la chaîne intermédiaire principale continue à s'exécuter et l'activité se déroule comme d'habitude.
func PoisonQueue(pub message.Publisher, topic string) (message.HandlerMiddleware, error) {
	if topic == "" {
		return nil, ErrInvalidPoisonQueueTopic
	}

	pq := poisonQueue{
		topic: topic,
		pub:   pub,
		shouldGoToPoisonQueue: func(err error) bool {
			return true
		},
	}

	return pq.Middleware, nil
}

// PoisonQueueWithFilter est similaire à PoisonQueue, mais accepte une fonction pour déterminer quelles erreurs répondent aux critères de la file empoisonnée.
func PoisonQueueWithFilter(pub message.Publisher, topic string, shouldGoToPoisonQueue func(err error) bool) (message.HandlerMiddleware, error) {
	if topic == "" {
		return nil, ErrInvalidPoisonQueueTopic
	}

	pq := poisonQueue{
		topic: topic,
		pub:   pub,
		shouldGoToPoisonQueue: shouldGoToPoisonQueue,
	}

	return pq.Middleware, nil
}

Random Fail

// RandomFail provoque l'échec du gestionnaire en fonction d'une probabilité aléatoire. La probabilité d'erreur doit être comprise entre 0 et 1.
func RandomFail(errorProbability float32) message.HandlerMiddleware {
	return func(h message.HandlerFunc) message.HandlerFunc {
		return func(message *message.Message) ([]*message.Message, error) {
			if shouldFail(errorProbability) {
				return nil, errors.New("une erreur aléatoire s'est produite")
			}
			return h(message)
		}
	}
}

// RandomPanic provoque le gestionnaire à planter en fonction d'une probabilité aléatoire. La probabilité de panique doit être comprise entre 0 et 1.
func RandomPanic(panicProbability float32) message.HandlerMiddleware {
	return func(h message.HandlerFunc) message.HandlerFunc {
		return func(message *message.Message) ([]*message.Message, error) {
			if shouldFail(panicProbability) {
				panic("une panique aléatoire s'est produite")
			}
			return h(message)
		}
	}
}

Recoverer

// RecoveredPanicError contient l'erreur de panique récupérée et les informations de trace de pile associées.
type RecoveredPanicError struct {
	V          interface{}
	Stacktrace string
}

// Recoverer récupère toute panique du gestionnaire et attache RecoveredPanicError avec la trace de pile à toute erreur renvoyée par le gestionnaire.
func Recoverer(h message.HandlerFunc) message.HandlerFunc {
	return func(event *message.Message) (events []*message.Message, err error) {
		panicked := true

		defer func() {
			if r := recover(); r != nil || panicked {
				err = errors.WithStack(RecoveredPanicError{V: r, Stacktrace: string(debug.Stack())})
			}
		}()

		events, err = h(event)
		panicked = false
		return events, err
	}
}

Retry

// Retry fournit un middleware qui réessaie le gestionnaire si une erreur est renvoyée.
// Le comportement de réessai, le délai d'attente exponentiel et le temps écoulé maximum peuvent être configurés.
type Retry struct {
	// MaxRetries est le nombre maximum de tentatives à effectuer.
	MaxRetries int

	// InitialInterval est l'intervalle initial entre les réessais. Les intervalles suivants seront multipliés par le Multiplicateur.
	InitialInterval time.Duration
	// MaxInterval définit la limite supérieure du délai d'attente exponentiel des réessais.
	MaxInterval time.Duration
	// Multiplicateur est le facteur par lequel l'intervalle d'attente entre les réessais sera multiplié.
	Multiplier float64
	// MaxElapsedTime définit la limite de temps maximale pour les réessais. Si 0, il est désactivé.
	MaxElapsedTime time.Duration
	// RandomizationFactor étend de manière aléatoire le temps d'attente dans la plage suivante :
	// [currentInterval * (1 - randomization_factor), currentInterval * (1 + randomization_factor)].
	RandomizationFactor float64

	// OnRetryHook est une fonction optionnelle à exécuter à chaque tentative de réessai.
	// Le numéro de réessai actuel est passé via retryNum.
	OnRetryHook func(retryNum int, delay time.Duration)

	Logger watermill.LoggerAdapter
}
// Middleware renvoie le middleware Retry.
func (r Retry) Middleware(h message.HandlerFunc) message.HandlerFunc {
	return func(msg *message.Message) ([]*message.Message, error) {
		producedMessages, err := h(msg)
		if err == nil {
			return producedMessages, nil
		}

		expBackoff := backoff.NewExponentialBackOff()
		expBackoff.InitialInterval = r.InitialInterval
		expBackoff.MaxInterval = r.MaxInterval
		expBackoff.Multiplier = r.Multiplier
		expBackoff.MaxElapsedTime = r.MaxElapsedTime
		expBackoff.RandomizationFactor = r.RandomizationFactor

		ctx := msg.Context()
		if r.MaxElapsedTime > 0 {
			var cancel func()
			ctx, cancel = context.WithTimeout(ctx, r.MaxElapsedTime)
			defer cancel()
		}

		retryNum := 1
		expBackoff.Reset()
	retryLoop:
		for {
			waitTime := expBackoff.NextBackOff()
			select {
			case 

Throttle

// Throttle fournit un middleware pour limiter le nombre de messages traités dans une certaine période de temps.
// Cela peut être utilisé pour empêcher la surcharge des gestionnaires s'exécutant sur une longue file non traitée.
type Throttle struct {
	ticker *time.Ticker
}
// NewThrottle crée un nouveau middleware Throttle.
// Exemple de durée et de comptage : NewThrottle(10, time.Second) indique 10 messages par seconde.
func NewThrottle(count int64, duration time.Duration) *Throttle {
	return &Throttle{
		ticker: time.NewTicker(duration / time.Duration(count)),
	}
}
// Middleware renvoie le middleware Throttle.
func (t Throttle) Middleware(h message.HandlerFunc) message.HandlerFunc {
	return func(message *message.Message) ([]*message.Message, error) {
		// Les limites partagées par plusieurs gestionnaires attendront leurs "tics".

Timeout

// Timeout annule le contexte du message entrant après la durée spécifiée.
// Toutes les fonctionnalités sensibles au délai d'exécution du gestionnaire doivent écouter msg.Context().Done() pour savoir quand échouer.
func Timeout(timeout time.Duration) func(message.HandlerFunc) message.HandlerFunc {
	return func(h message.HandlerFunc) message.HandlerFunc {
		return func(msg *message.Message) ([]*message.Message, error) {
			ctx, cancel := context.WithTimeout(msg.Context(), timeout)
			defer func() {
				cancel()
			}()

			msg.SetContext(ctx)
			return h(msg)
		}
	}
}