Einführung

Das Middleware wird verwendet, um das Ereignis-Framework zu erweitern, benutzerdefinierte Funktionalität hinzuzufügen und wichtige Funktionen bereitzustellen, die nichts mit der Logik des Hauptverarbeiters zu tun haben. Zum Beispiel das erneute Ausführen des Verarbeiters nach einer Fehlermeldung oder das Wiederherstellen nach einem Absturz und das Erfassen des Stapeltraces innerhalb des Verarbeiters.

Die Signatur der Middleware-Funktion ist wie folgt definiert:

Vollständiger Quellcode: github.com/ThreeDotsLabs/watermill/message/router.go

// ...
// HandlerMiddleware ermöglicht es uns, Dekorierer ähnlich wie den Handler zu schreiben.
// Es kann einige Operationen vor dem Handler ausführen (z. B. die konsumierte Nachricht ändern)
// und auch einige Operationen nach dem Handler ausführen (Ändern der produzierten Nachricht, ACK/NACK der konsumierten Nachricht, Fehlerbehandlung, Protokollierung usw.).
//
// Es kann dem Router durch Verwendung der Methode `AddMiddleware` angehängt werden.
//
// Beispiel:
//
//	func ExampleMiddleware(h HandlerFunc) HandlerFunc {
//		return func(message *message.Message) ([]*message.Message, error) {
//			fmt.Println("Vor der Ausführung des Handlers")
//			producedMessages, err := h(message)
//			fmt.Println("Nach der Ausführung des Handlers")
//
//			return producedMessages, err
//		}
//	}
type HandlerMiddleware func(h HandlerFunc) HandlerFunc
// ...

Verwendung

Middleware kann auf alle Handler im Router oder auf bestimmte Handler angewendet werden. Wenn Middleware direkt zum Router hinzugefügt wird, wird sie auf alle für den Router bereitgestellten Handler angewendet. Wenn eine Middleware nur auf einen bestimmten Handler angewendet wird, muss sie zum Handler im Router hinzugefügt werden.

Hier ist ein Anwendungsbeispiel:

Vollständiger Quellcode: github.com/ThreeDotsLabs/watermill/_examples/basic/3-router/main.go

// ...
    router, err := message.NewRouter(message.RouterConfig{}, logger)
    if err != nil {
        panic(err)
    }

    // Wenn das SIGTERM-Signal empfangen wird, wird SignalsHandler den Router ordnungsgemäß schließen.
    // Sie können den Router auch durch Aufruf von `r.Close()` schließen.
    router.AddPlugin(plugin.SignalsHandler)

    // Router-Level-Middleware wird für jede Nachricht, die an den Router gesendet wird, ausgeführt
    router.AddMiddleware(
        // CorrelationID kopiert die Korrelations-ID aus den Metadaten der eingehenden Nachricht in die generierte Nachricht
        middleware.CorrelationID,

        // Wenn der Handler einen Fehler zurückgibt, wird er erneut versucht.
        // Es wird höchstens MaxRetries Mal wiederholt, danach wird die Nachricht von PubSub Nacked und erneut gesendet.
        middleware.Retry{
            MaxRetries:      3,
            InitialInterval: time.Millisecond * 100,
            Logger:          logger,
        }.Middleware,

        // Recoverer behandelt Paniken im Handler.
        // In diesem Fall leitet es sie als Fehler an das Retry-Middleware weiter.
        middleware.Recoverer,
    )

    // Aus Gründen der Vereinfachung verwenden wir hier das gochannel Pub/Sub,
    // Sie können es durch eine beliebige Pub/Sub-Implementierung ersetzen, und es wird genauso funktionieren.
    pubSub := gochannel.NewGoChannel(gochannel.Config{}, logger)

    // Hintergrundaktivität: Einige eingehende Nachrichten veröffentlichen
    go publishMessages(pubSub)

    // AddHandler gibt einen Handler zurück, der verwendet werden kann, um Handler-Level-Middleware hinzuzufügen
    // oder um den Handler zu stoppen.
    handler := router.AddHandler(
        "struct_handler",          // Handler-Name, muss eindeutig sein
        "incoming_messages_topic", // Thema, von dem Ereignisse gelesen werden
        pubSub,
        "outgoing_messages_topic", // Thema, zu dem Ereignisse veröffentlicht werden
        pubSub,
        structHandler{}.Handler,
    )

    // Handler-Level-Middleware wird nur für bestimmte Handler ausgeführt
    // Eine solche Middleware kann auf die gleiche Weise wie Router-Level-Middleware zum Handler hinzugefügt werden
    handler.AddMiddleware(func(h message.HandlerFunc) message.HandlerFunc {
        return func(message *message.Message) ([]*message.Message, error) {
            log.Println("Ausführung von handler-spezifischer Middleware für", message.UUID)

            return h(message)
        }
    })

    // Nur für Debugging-Zwecke, wir drucken alle auf `incoming_messages_topic` empfangenen Nachrichten aus
    router.AddNoPublisherHandler(
        "print_incoming_messages",
        "incoming_messages_topic",
        pubSub,
        printMessages,
    )

    // Nur für Debugging-Zwecke, wir drucken alle Ereignisse, die an `outgoing_messages_topic` gesendet werden
    router.AddNoPublisherHandler(
        "print_outgoing_messages",
        "outgoing_messages_topic",
        pubSub,
        printMessages,
    )

    // Jetzt, da alle Handler registriert wurden, können wir den Router ausführen.
    // Run wird blockieren, bis der Router aufhört zu laufen.
// ...

Verfügbare Middleware

Hier sind die wiederverwendbaren Middleware, die von Watermill bereitgestellt werden, und Sie können auch ganz einfach Ihre eigene Middleware implementieren. Wenn Sie beispielsweise jede eingehende Nachricht in einem bestimmten Log-Format speichern möchten, ist dies der beste Weg, um es zu tun.

Schutzschalter

// CircuitBreaker ist ein Middleware, das den Handler in einen Schutzschalter einhüllt.
// Basierend auf der Konfiguration schaltet der Schutzschalter schnell ab, wenn der Handler weiterhin Fehler zurückgibt.
// Dies ist nützlich, um Kaskadenfehler zu verhindern.
type CircuitBreaker struct {
    cb *gobreaker.CircuitBreaker
}
// NewCircuitBreaker gibt eine neue CircuitBreaker-Middleware zurück.
// Für verfügbare Einstellungen siehe die gobreaker-Dokumentation.
func NewCircuitBreaker(settings gobreaker.Settings) CircuitBreaker {
    return CircuitBreaker{
        cb: gobreaker.NewCircuitBreaker(settings),
    }
}
// Middleware gibt die CircuitBreaker-Middleware zurück.
func (c CircuitBreaker) Middleware(h message.HandlerFunc) message.HandlerFunc {
    return func(msg *message.Message) ([]*message.Message, error) {
        out, err := c.cb.Execute(func() (interface{}, error) {
            return h(msg)
        })

        var result []*message.Message
        if out != nil {
            result = out.([]*message.Message)
        }

        return result, err
    }
}

Korrelation

// SetCorrelationID legt die Korrelations-ID für die Nachricht fest.
//
// Wenn eine Nachricht in das System gelangt, sollte SetCorrelationID aufgerufen werden.
// Wenn eine Nachricht bei einer Anfrage generiert wird (z. B. HTTP), sollte die Korrelations-ID der Nachricht mit der Korrelations-ID der Anfrage übereinstimmen.
func SetCorrelationID(id string, msg *message.Message) {
    if MessageCorrelationID(msg) != "" {
        return
    }

    msg.Metadata.Set(CorrelationIDMetadataKey, id)
}
// MessageCorrelationID gibt die Korrelations-ID aus der Nachricht zurück.
func MessageCorrelationID(message *message.Message) string {
    return message.Metadata.Get(CorrelationIDMetadataKey)
}
// CorrelationID fügt allen vom Handler generierten Nachrichten eine Korrelations-ID hinzu.
// Die ID basiert auf der vom Handler erhaltenen Nachrichten-ID.
//
// Damit CorrelationID korrekt funktioniert, muss zuerst SetCorrelationID für die Nachricht aufgerufen werden, um in das System einzutreten.
func CorrelationID(h message.HandlerFunc) message.HandlerFunc {
    return func(message *message.Message) ([]*message.Message, error) {
        producedMessages, err := h(message)

        correlationID := MessageCorrelationID(message)
        for _, msg := range producedMessages {
            SetCorrelationID(correlationID, msg)
        }

        return producedMessages, err
    }
}

Duplikator

// Duplicator bearbeitet die Nachricht zweimal, um sicherzustellen, dass der Endpunkt idempotent ist.
func Duplicator(h message.HandlerFunc) message.HandlerFunc {
    return func(msg *message.Message) ([]*message.Message, error) {
        firstProducedMessages, firstErr := h(msg)
        if firstErr != nil {
            return nil, firstErr
        }

        secondProducedMessages, secondErr := h(msg)
        if secondErr != nil {
            return nil, secondErr
        }

        return append(firstProducedMessages, secondProducedMessages...), nil
    }
}

Fehler ignorieren

// IgnoreErrors bietet eine Middleware, dass es dem Handler ermöglicht, bestimmte explizit definierte Fehler zu ignorieren.
type IgnoreErrors struct {
    ignoredErrors map[string]struct{}
}
// NewIgnoreErrors erstellt eine neue IgnoreErrors-Middleware.
func NewIgnoreErrors(errs []error) IgnoreErrors {
    errsMap := make(map[string]struct{}, len(errs))

    for _, err := range errs {
        errsMap[err.Error()] = struct{}{}
    }

    return IgnoreErrors{errsMap}
}
// Middleware gibt die IgnoreErrors-Middleware zurück.
func (i IgnoreErrors) Middleware(h message.HandlerFunc) message.HandlerFunc {
    return func(msg *message.Message) ([]*message.Message, error) {
        events, err := h(msg)
        if err != nil {
            if _, ok := i.ignoredErrors[errors.Cause(err).Error()]; ok {
                return events, nil
            }

            return events, err
        }

        return events, nil
    }
}

Sofortige Bestätigung

// InstantAck bewirkt, dass der Handler die eingehende Nachricht sofort bestätigt, unabhängig von Fehlern.
// Es kann verwendet werden, um die Durchsatzleistung zu verbessern, jedoch ist der Kompromiss:
// Wenn Sie eine genau-once-Zustellung sicherstellen müssen, erhalten Sie mindestens einmal Zustellung.
// Wenn eine geordnete Nachrichtenfolge erforderlich ist, kann dies die Reihenfolge unterbrechen.
func InstantAck(h message.HandlerFunc) message.HandlerFunc {
	return func(message *message.Message) ([]*message.Message, error) {
		message.Ack()
		return h(message)
	}
}
// PoisonQueue bietet eine Middleware-Funktion zur Behandlung von nicht verarbeitbaren Nachrichten und veröffentlicht sie in ein separates Thema.
// Anschließend wird die Haupt-Middleware-Kette fortgesetzt, und das Geschäft wird wie gewohnt fortgesetzt.
func PoisonQueue(pub message.Publisher, topic string) (message.HandlerMiddleware, error) {
	if topic == "" {
		return nil, ErrInvalidPoisonQueueTopic
	}

	pq := poisonQueue{
		topic: topic,
		pub:   pub,
		shouldGoToPoisonQueue: func(err error) bool {
			return true
		},
	}

	return pq.Middleware, nil
}

// PoisonQueueWithFilter ist ähnlich wie PoisonQueue, akzeptiert jedoch eine Funktion, um zu bestimmen, welche Fehler die Kriterien der Giftschlange erfüllen.
func PoisonQueueWithFilter(pub message.Publisher, topic string, shouldGoToPoisonQueue func(err error) bool) (message.HandlerMiddleware, error) {
	if topic == "" {
		return nil, ErrInvalidPoisonQueueTopic
	}

	pq := poisonQueue{
		topic: topic,
		pub:   pub,
		shouldGoToPoisonQueue: shouldGoToPoisonQueue,
	}

	return pq.Middleware, nil
}

Zufälliger Fehler

// RandomFail führt dazu, dass der Handler basierend auf einer zufälligen Wahrscheinlichkeit einen Fehler generiert. Die Fehlerwahrscheinlichkeit sollte im Bereich (0, 1) liegen.
func RandomFail(errorProbability float32) message.HandlerMiddleware {
	return func(h message.HandlerFunc) message.HandlerFunc {
		return func(message *message.Message) ([]*message.Message, error) {
			if shouldFail(errorProbability) {
				return nil, errors.New("ein zufälliger Fehler ist aufgetreten")
			}
			return h(message)
		}
	}
}

// RandomPanic führt dazu, dass der Handler basierend auf einer zufälligen Wahrscheinlichkeit in Panik gerät. Die Panikwahrscheinlichkeit sollte im Bereich (0, 1) liegen.
func RandomPanic(panicProbability float32) message.HandlerMiddleware {
	return func(h message.HandlerFunc) message.HandlerFunc {
		return func(message *message.Message) ([]*message.Message, error) {
			if shouldFail(panicProbability) {
				panic("eine zufällige Panik ist aufgetreten")
			}
			return h(message)
		}
	}
}

Recoverer

// RecoveredPanicError enthält den Fehler der wiederhergestellten Panik und seine Stack-Trace-Informationen.
type RecoveredPanicError struct {
	V          interface{}
	Stacktrace string
}

// Recoverer stellt jeden Panikfall des Handlers wieder her und fügt jedem vom Handler zurückgegebenen Fehler einen RecoveredPanicError mit Stack-Trace hinzu.
func Recoverer(h message.HandlerFunc) message.HandlerFunc {
	return func(event *message.Message) (events []*message.Message, err error) {
		panicked := true

		defer func() {
			if r := recover(); r != nil || panicked {
				err = errors.WithStack(RecoveredPanicError{V: r, Stacktrace: string(debug.Stack())})
			}
		}()

		events, err = h(event)
		panicked = false
		return events, err
	}
}

Wiederholung

// Retry stellt ein Middleware bereit, die den Handler erneut versucht, wenn ein Fehler auftritt.
// Das Wiederholungsverhalten, die exponentielle Rückversicherung und die maximale verstrichene Zeit können konfiguriert werden.
type Retry struct {
	// MaxRetries ist die maximale Anzahl von Versuchen, die unternommen werden sollen.
	MaxRetries int

	// InitialInterval ist das anfängliche Intervall zwischen Wiederholungen. Nachfolgende Intervalle werden durch den Multiplikator skaliert.
	InitialInterval time.Duration
	// MaxInterval setzt die obere Grenze für die exponentielle Rückversicherung von Wiederholungen.
	MaxInterval time.Duration
	// Multiplier ist der Faktor, um den das Warteintervall zwischen Wiederholungen multipliziert wird.
	Multiplier float64
	// MaxElapsedTime setzt das maximale Zeitlimit für Wiederholungen. Wenn 0, ist es deaktiviert.
	MaxElapsedTime time.Duration
	// RandomizationFactor verteilt zufällig die Rückversicherungszeit innerhalb des folgenden Bereichs:
	// [currentInterval * (1 - randomization_factor), currentInterval * (1 + randomization_factor)].
	RandomizationFactor float64

	// OnRetryHook ist eine optionale Funktion, die bei jedem Wiederholungsversuch ausgeführt werden soll.
	// Die aktuelle Wiederholungsnummer wird über retryNum übergeben.
	OnRetryHook func(retryNum int, delay time.Duration)

	Logger watermill.LoggerAdapter
}
// Middleware gibt die Retry-Middleware zurück.
func (r Retry) Middleware(h message.HandlerFunc) message.HandlerFunc {
	return func(msg *message.Message) ([]*message.Message, error) {
		producedMessages, err := h(msg)
		if err == nil {
			return producedMessages, nil
		}

		expBackoff := backoff.NewExponentialBackOff()
		expBackoff.InitialInterval = r.InitialInterval
		expBackoff.MaxInterval = r.MaxInterval
		expBackoff.Multiplier = r.Multiplier
		expBackoff.MaxElapsedTime = r.MaxElapsedTime
		expBackoff.RandomizationFactor = r.RandomizationFactor

		ctx := msg.Context()
		if r.MaxElapsedTime > 0 {
			var cancel func()
			ctx, cancel = context.WithTimeout(ctx, r.MaxElapsedTime)
			defer cancel()
		}

		retryNum := 1
		expBackoff.Reset()
	retryLoop:
		for {
			waitTime := expBackoff.NextBackOff()
			select {
			case 

Drossel

// Throttle bietet eine Middleware, um die Anzahl der innerhalb eines bestimmten Zeitraums verarbeiteten Nachrichten zu begrenzen.
// Dies kann verwendet werden, um eine Überlastung von Handlern zu verhindern, die in einer nicht verarbeiteten Warteschlange ausgeführt werden.
type Throttle struct {
	ticker *time.Ticker
}
// NewThrottle erstellt eine neue Throttle-Middleware.
// Beispiel Dauer und Anzahl: NewThrottle(10, time.Second) gibt an, dass 10 Nachrichten pro Sekunde verarbeitet werden.
func NewThrottle(count int64, duration time.Duration) *Throttle {
	return &Throttle{
		ticker: time.NewTicker(duration / time.Duration(count)),
	}
}
// Middleware gibt die Throttle-Middleware zurück.
func (t Throttle) Middleware(h message.HandlerFunc) message.HandlerFunc {
	return func(message *message.Message) ([]*message.Message, error) {
		// Drosseln, die von mehreren Handlern gemeinsam genutzt werden, warten auf ihre "Ticks".

Zeitüberschreitung

// Timeout bricht den Kontext der eingehenden Nachricht nach der angegebenen Dauer ab.
// Jede zeitkritische Funktionalität des Handlers sollte msg.Context().Done() überwachen, um zu wissen, wann ein Fehler aufgetreten ist.
func Timeout(timeout time.Duration) func(message.HandlerFunc) message.HandlerFunc {
	return func(h message.HandlerFunc) message.HandlerFunc {
		return func(msg *message.Message) ([]*message.Message, error) {
			ctx, cancel := context.WithTimeout(msg.Context(), timeout)
			defer func() {
				cancel()
			}()

			msg.SetContext(ctx)
			return h(msg)
		}
	}
}