Introduzione
Il middleware viene utilizzato per estendere il framework degli eventi, aggiungere funzionalità personalizzate e fornire funzionalità importanti non correlate alla logica dell'elaboratore principale. Ad esempio, ritentare l'elaboratore dopo aver restituito un errore o recuperare da un blocco improvviso e acquisire la traccia dello stack all'interno dell'elaboratore.
La firma della funzione del middleware è definita come segue:
Codice sorgente completo: github.com/ThreeDotsLabs/watermill/message/router.go
// ...
// HandlerMiddleware ci consente di scrivere decoratori simili all'elaboratore.
// Può eseguire alcune operazioni prima dell'elaboratore (ad esempio, modificare il messaggio consumato)
// e anche eseguire alcune operazioni dopo l'elaboratore (modificare il messaggio prodotto, ACK/NACK del messaggio consumato, gestire errori, logging, ecc.).
//
// Può essere collegato al router utilizzando il metodo `AddMiddleware`.
//
// Esempio:
//
// func ExampleMiddleware(h message.HandlerFunc) message.HandlerFunc {
// return func(message *message.Message) ([]*message.Message, error) {
// fmt.Println("Prima dell'esecuzione dell'elaboratore")
// messaggiProdotti, err := h(message)
// fmt.Println("Dopo l'esecuzione dell'elaboratore")
//
// return messaggiProdotti, err
// }
// }
type HandlerMiddleware func(h HandlerFunc) HandlerFunc
// ...
Utilizzo
I Middleware possono essere applicati a tutti gli handler nel router o a specifici handler. Quando i middleware vengono aggiunti direttamente al router, saranno applicati a tutti gli handler forniti per il router. Se un middleware viene applicato solo a un handler specifico, deve essere aggiunto all'handler nel router.
Ecco un esempio di utilizzo:
Codice Sorgente Completo: github.com/ThreeDotsLabs/watermill/_examples/basic/3-router/main.go
// ...
router, err := message.NewRouter(message.RouterConfig{}, logger)
if err != nil {
panic(err)
}
// Quando viene ricevuto il segnale SIGTERM, SignalsHandler chiuderà il router in modo sicuro.
// È anche possibile chiudere il router chiamando `r.Close()`.
router.AddPlugin(plugin.SignalsHandler)
// I middleware a livello di router verranno eseguiti su ogni messaggio inviato al router
router.AddMiddleware(
// CorrelationID copierà l'ID di correlazione dai metadati del messaggio in ingresso al messaggio generato
middleware.CorrelationID,
// Se l'handler restituisce un errore, verrà ritentato.
// Sarà ritentato al massimo MaxRetries volte, dopodiché il messaggio verrà Nacked e re-inviato da PubSub.
middleware.Retry{
MaxRetries: 3,
InitialInterval: time.Millisecond * 100,
Logger: logger,
}.Middleware,
// Recoverer gestisce i panici nell'handler.
// In questo caso, li passa come errori al middleware Retry.
middleware.Recoverer,
)
// Per semplicità, qui usiamo il gochannel Pub/Sub,
// è possibile sostituirlo con qualsiasi implementazione di Pub/Sub e funzionerà allo stesso modo.
pubSub := gochannel.NewGoChannel(gochannel.Config{}, logger)
// Pubblica alcuni messaggi in arrivo in background
go publishMessages(pubSub)
// AddHandler restituisce un handler che può essere utilizzato per aggiungere middleware a livello di handler
// o per interrompere l'handler.
handler := router.AddHandler(
"struct_handler", // Nome dell'handler, deve essere univoco
"incoming_messages_topic", // Argomento da cui verranno letti gli eventi
pubSub,
"outgoing_messages_topic", // Argomento a cui verranno pubblicati gli eventi
pubSub,
structHandler{}.Handler,
)
// I middleware a livello di handler vengono eseguiti solo per gli handler specifici
// Tali middleware possono essere aggiunti all'handler allo stesso modo dei middleware a livello di router
handler.AddMiddleware(func(h message.HandlerFunc) message.HandlerFunc {
return func(message *message.Message) ([]*message.Message, error) {
log.Println("Esecuzione di middleware specifici dell'handler per", message.UUID)
return h(message)
}
})
// Solo a scopo di debug, stampiamo tutti i messaggi ricevuti su `incoming_messages_topic`
router.AddNoPublisherHandler(
"print_incoming_messages",
"incoming_messages_topic",
pubSub,
printMessages,
)
// Solo a scopo di debug, stampiamo tutti gli eventi inviati a `outgoing_messages_topic`
router.AddNoPublisherHandler(
"print_outgoing_messages",
"outgoing_messages_topic",
pubSub,
printMessages,
)
// Ora che tutti gli handler sono stati registrati, possiamo eseguire il router.
// Run bloccherà fino a quando il router non smetterà di funzionare.
// ...
Middleware Disponibili
Ecco i middleware riutilizzabili forniti da Watermill, e è anche possibile implementare facilmente il proprio middleware. Ad esempio, se si desidera memorizzare ogni messaggio in arrivo in un certo formato di log, questo è il modo migliore per farlo.
Interruttore di Circuiti
// CircuitBreaker è un middleware che avvolge l'handler in un'interruttore di circuito.
// In base alla configurazione, l'interruttore di circuito farà fail rapidamente se l'handler continua a restituire errori.
// Questo è utile per prevenire fallimenti a cascata.
type CircuitBreaker struct {
cb *gobreaker.CircuitBreaker
}
// NewCircuitBreaker restituisce un nuovo middleware CircuitBreaker.
// Per le impostazioni disponibili, consultare la documentazione di gobreaker.
func NewCircuitBreaker(settings gobreaker.Settings) CircuitBreaker {
return CircuitBreaker{
cb: gobreaker.NewCircuitBreaker(settings),
}
}
// Middleware restituisce il middleware CircuitBreaker.
func (c CircuitBreaker) Middleware(h message.HandlerFunc) message.HandlerFunc {
return func(msg *message.Message) ([]*message.Message, error) {
out, err := c.cb.Execute(func() (interface{}, error) {
return h(msg)
})
var result []*message.Message
if out != nil {
result = out.([]*message.Message)
}
return result, err
}
}
Correlazione
// SetCorrelationID imposta l'ID di correlazione per il messaggio.
//
// Quando un messaggio entra nel sistema, dovrebbe essere chiamato SetCorrelationID.
// Quando un messaggio viene generato in una richiesta (ad esempio, HTTP), l'ID di correlazione del messaggio dovrebbe essere lo stesso dell'ID di correlazione della richiesta.
func SetCorrelationID(id string, msg *message.Message) {
if MessageCorrelationID(msg) != "" {
return
}
msg.Metadata.Set(CorrelationIDMetadataKey, id)
}
// MessageCorrelationID restituisce l'ID di correlazione dal messaggio.
func MessageCorrelationID(message *message.Message) string {
return message.Metadata.Get(CorrelationIDMetadataKey)
}
// CorrelationID aggiunge un ID di correlazione a tutti i messaggi generati dall'handler.
// L'ID si basa sull'ID del messaggio ricevuto dall'handler.
//
// Affinché CorrelationID funzioni correttamente, SetCorrelationID deve essere chiamato prima affinché il messaggio entri nel sistema.
func CorrelationID(h message.HandlerFunc) message.HandlerFunc {
return func(message *message.Message) ([]*message.Message, error) {
producedMessages, err := h(message)
correlationID := MessageCorrelationID(message)
for _, msg := range producedMessages {
SetCorrelationID(correlationID, msg)
}
return producedMessages, err
}
}
Duplicatore
// Duplicator elabora il messaggio due volte per garantire che l'endpoint sia idempotente.
func Duplicator(h message.HandlerFunc) message.HandlerFunc {
return func(msg *message.Message) ([]*message.Message, error) {
firstProducedMessages, firstErr := h(msg)
if firstErr != nil {
return nil, firstErr
}
secondProducedMessages, secondErr := h(msg)
if secondErr != nil {
return nil, secondErr
}
return append(firstProducedMessages, secondProducedMessages...), nil
}
}
Ignora Errori
// IgnoreErrors fornisce un middleware che consente all'handler di ignorare determinati errori esplicitamente definiti.
type IgnoreErrors struct {
ignoredErrors map[string]struct{}
}
// NewIgnoreErrors crea un nuovo middleware IgnoreErrors.
func NewIgnoreErrors(errs []error) IgnoreErrors {
errsMap := make(map[string]struct{}, len(errs))
for _, err := range errs {
errsMap[err.Error()] = struct{}{}
}
return IgnoreErrors{errsMap}
}
// Middleware restituisce il middleware IgnoreErrors.
func (i IgnoreErrors) Middleware(h message.HandlerFunc) message.HandlerFunc {
return func(msg *message.Message) ([]*message.Message, error) {
events, err := h(msg)
if err != nil {
if _, ok := i.ignoredErrors[errors.Cause(err).Error()]; ok {
return events, nil
}
return events, err
}
return events, nil
}
}
Ack Istantaneo
// InstantAck fa sì che l'handler riconosca immediatamente il messaggio in ingresso, indipendentemente da eventuali errori.
// Può essere utilizzato per migliorare il throughput, ma il compromesso è il seguente:
// Se è necessario garantire la consegna esattamente una volta, potrebbe essere garantita almeno una volta la consegna.
// Se sono richiesti messaggi ordinati, potrebbe interrompere l'ordinamento.
func InstantAck(h message.HandlerFunc) message.HandlerFunc {
return func(message *message.Message) ([]*message.Message, error) {
message.Ack()
return h(message)
}
}
Veleno
// PoisonQueue fornisce una funzionalità middleware per gestire i messaggi non elaborabili e pubblicarli su un topic separato.
// Quindi, la catena middleware principale continua ad eseguire, e l'attività procede come al solito.
func PoisonQueue(pub message.Publisher, topic string) (message.HandlerMiddleware, error) {
if topic == "" {
return nil, ErrInvalidPoisonQueueTopic
}
pq := poisonQueue{
topic: topic,
pub: pub,
shouldGoToPoisonQueue: func(err error) bool {
return true
},
}
return pq.Middleware, nil
}
// PoisonQueueWithFilter è simile a PoisonQueue, ma accetta una funzione per determinare quali errori soddisfano i criteri della coda velenosa.
func PoisonQueueWithFilter(pub message.Publisher, topic string, shouldGoToPoisonQueue func(err error) bool) (message.HandlerMiddleware, error) {
if topic == "" {
return nil, ErrInvalidPoisonQueueTopic
}
pq := poisonQueue{
topic: topic,
pub: pub,
shouldGoToPoisonQueue: shouldGoToPoisonQueue,
}
return pq.Middleware, nil
}
Fallimento Casuale
// RandomFail fa in modo che il gestore non riesca in base a una probabilità casuale. La probabilità di errore deve essere compresa nell'intervallo (0, 1).
func RandomFail(errorProbability float32) message.HandlerMiddleware {
return func(h message.HandlerFunc) message.HandlerFunc {
return func(message *message.Message) ([]*message.Message, error) {
if shouldFail(errorProbability) {
return nil, errors.New("si è verificato un errore casuale")
}
return h(message)
}
}
}
// RandomPanic fa in modo che il gestore entri in stato di panico sulla base di una probabilità casuale. La probabilità di panico deve essere compresa nell'intervallo (0, 1).
func RandomPanic(panicProbability float32) message.HandlerMiddleware {
return func(h message.HandlerFunc) message.HandlerFunc {
return func(message *message.Message) ([]*message.Message, error) {
if shouldFail(panicProbability) {
panic("si è verificato un panico casuale")
}
return h(message)
}
}
}
Recupero
// RecoveredPanicError contiene l'errore del panico recuperato e le informazioni sulla traccia dello stack.
type RecoveredPanicError struct {
V interface{}
Stacktrace string
}
// Recoverer recupera eventuali panico dal gestore e allega RecoveredPanicError con traccia dello stack a qualsiasi errore restituito dal gestore.
func Recoverer(h message.HandlerFunc) message.HandlerFunc {
return func(event *message.Message) (events []*message.Message, err error) {
panicked := true
defer func() {
if r := recover(); r != nil || panicked {
err = errors.WithStack(RecoveredPanicError{V: r, Stacktrace: string(debug.Stack())})
}
}()
events, err = h(event)
panicked = false
return events, err
}
}
Ritentare
// Retry fornisce un middleware che riprova l'handler se viene restituito un errore.
// Il comportamento di retry, il ritardo esponenziale e il tempo massimo trascorso possono essere configurati.
type Retry struct {
// MaxRetries è il numero massimo di tentativi da effettuare.
MaxRetries int
// InitialInterval è l'intervallo iniziale tra i ritentativi. Gli intervalli successivi saranno scalati dal Moltiplicatore.
InitialInterval time.Duration
// MaxInterval imposta il limite superiore per il ritardo esponenziale dei ritentativi.
MaxInterval time.Duration
// Moltiplicatore è il fattore per cui l'intervallo di attesa tra i ritentativi verrà moltiplicato.
Multiplier float64
// MaxElapsedTime imposta il limite di tempo massimo per i ritentativi. Se è 0, è disabilitato.
MaxElapsedTime time.Duration
// RandomizationFactor distribuisce casualmente il tempo di attesa entro il seguente intervallo:
// [currentInterval * (1 - randomization_factor), currentInterval * (1 + randomization_factor)].
RandomizationFactor float64
// OnRetryHook è una funzione opzionale da eseguire in ogni tentativo di ritentativo.
// Il numero di ritentativi corrente viene passato attraverso retryNum.
OnRetryHook func(retryNum int, delay time.Duration)
Logger watermill.LoggerAdapter
}
// Middleware restituisce il middleware Retry.
func (r Retry) Middleware(h message.HandlerFunc) message.HandlerFunc {
return func(msg *message.Message) ([]*message.Message, error) {
producedMessages, err := h(msg)
if err == nil {
return producedMessages, nil
}
expBackoff := backoff.NewExponentialBackOff()
expBackoff.InitialInterval = r.InitialInterval
expBackoff.MaxInterval = r.MaxInterval
expBackoff.Multiplier = r.Multiplier
expBackoff.MaxElapsedTime = r.MaxElapsedTime
expBackoff.RandomizationFactor = r.RandomizationFactor
ctx := msg.Context()
if r.MaxElapsedTime > 0 {
var cancel func()
ctx, cancel = context.WithTimeout(ctx, r.MaxElapsedTime)
defer cancel()
}
retryNum := 1
expBackoff.Reset()
retryLoop:
for {
waitTime := expBackoff.NextBackOff()
select {
case
Limitazione
// Throttle fornisce un middleware per limitare il numero di messaggi elaborati entro un certo periodo di tempo.
// Questo può essere utilizzato per evitare sovraccarichi degli handler in esecuzione su una coda lunga non elaborata.
type Throttle struct {
ticker *time.Ticker
}
// NewThrottle crea un nuovo middleware Throttle.
// Esempio di durata e conteggio: NewThrottle(10, time.Second) indica 10 messaggi al secondo.
func NewThrottle(count int64, duration time.Duration) *Throttle {
return &Throttle{
ticker: time.NewTicker(duration / time.Duration(count)),
}
}
// Middleware restituisce il middleware Throttle.
func (t Throttle) Middleware(h message.HandlerFunc) message.HandlerFunc {
return func(message *message.Message) ([]*message.Message, error) {
// I throttle condivisi da più handler attendono i loro "tick".
Timeout
// Timeout annulla il contesto del messaggio in arrivo dopo la durata specificata.
// Eventuali funzionalità sensibili al timeout dell'handler dovrebbero ascoltare msg.Context().Done() per sapere quando fallire.
func Timeout(timeout time.Duration) func(message.HandlerFunc) message.HandlerFunc {
return func(h message.HandlerFunc) message.HandlerFunc {
return func(msg *message.Message) ([]*message.Message, error) {
ctx, cancel := context.WithTimeout(msg.Context(), timeout)
defer func() {
cancel()
}()
msg.SetContext(ctx)
return h(msg)
}
}
}