Wstęp
Middleware jest używane do rozszerzania struktury zdarzeń, dodawania niestandardowej funkcjonalności i zapewniania istotnej funkcjonalności niezwiązanej z logiką głównego obsługującego. Na przykład ponowne wykonanie obsługującego po zwróceniu błędu, lub odzyskiwanie po panice i przechwytywanie śladu stosu wewnątrz obsługującego.
Sygnatura funkcji middleware jest zdefiniowana następująco:
Pełny kod źródłowy: github.com/ThreeDotsLabs/watermill/message/router.go
// ...
// HandlerMiddleware pozwala nam pisać dekoratory podobne do obsługującego.
// Może wykonać pewne operacje przed obsługującym (np. modyfikować pobrane wiadomości)
// oraz wykonać pewne operacje po obsługującym (modyfikować wygenerowane wiadomości, potwierdzać/blokować pobraną wiadomość, obsługiwać błędy, logować, itp.).
//
// Może być dołączony do routera za pomocą metody `AddMiddleware`.
//
// Przykład:
//
// func ExampleMiddleware(h message.HandlerFunc) message.HandlerFunc {
// return func(message *message.Message) ([]*message.Message, error) {
// fmt.Println("Przed wykonaniem obsługującego")
// wygenerowaneWiadomosci, err := h(message)
// fmt.Println("Po wykonaniu obsługującego")
//
// return wygenerowaneWiadomosci, err
// }
// }
type HandlerMiddleware func(h HandlerFunc) HandlerFunc
// ...
Użycie
Middleware można zastosować do wszystkich obsługiwanych w routerze lub do określonych obsługujących. Gdy middleware jest dodane bezpośrednio do routera, zostanie zastosowane do wszystkich obsługujących dostarczonych dla routera. Jeśli middleware jest stosowane tylko do określonego obsługującego, należy dodać je do obsługującego w routerze.
Oto przykład użycia:
Pełny kod źródłowy: github.com/ThreeDotsLabs/watermill/_examples/basic/3-router/main.go
// ...
router, err := message.NewRouter(message.RouterConfig{}, logger)
if err != nil {
panic(err)
}
// Po otrzymaniu sygnału SIGTERM, SignalsHandler będzie grzecznie zamykać router.
// Możesz także zamknąć router, wywołując `r.Close()`.
router.AddPlugin(plugin.SignalsHandler)
// Middleware na poziomie routera zostanie wykonane dla każdej wiadomości wysłanej do routera
router.AddMiddleware(
// CorrelationID skopiuje ID korelacji z metadanych otrzymanej wiadomości do wygenerowanej wiadomości
middleware.CorrelationID,
// Jeśli obsługujący zwraca błąd, zostanie ponowiony.
// Będzie ponowiony maksymalnie MaxRetries razy, po czym wiadomość zostanie odrzucona (Nack) i ponownie wysłana przez PubSub.
middleware.Retry{
MaxRetries: 3,
InitialInterval: time.Millisecond * 100,
Logger: logger,
}.Middleware,
// Recoverer obsługuje paniki w obsługującym.
// W tym przypadku przekazuje je jako błędy do middleware Retry.
middleware.Recoverer,
)
// Dla uproszczenia, tutaj używamy Pub/Sub gochannel,
// można go zastąpić dowolną implementacją Pub/Sub, a będzie działać tak samo.
pubSub := gochannel.NewGoChannel(gochannel.Config{}, logger)
// W tle publikuje niektóre przychodzące wiadomości
go publishMessages(pubSub)
// AddHandler zwraca obsługującego, który może być użyty do dodania middleware na poziomie obsługującego
// lub zatrzymania obsługującego.
handler := router.AddHandler(
"struct_handler", // Nazwa obsługującego, musi być unikalna
"incoming_messages_topic", // Temat, z którego będą czytane zdarzenia
pubSub,
"outgoing_messages_topic", // Temat, do którego będą publikowane zdarzenia
pubSub,
structHandler{}.Handler,
)
// Middleware na poziomie obsługującego jest wykonywane tylko dla konkretnych obsługujących
// Takie middleware można dodać do obsługującego w ten sam sposób, co middleware na poziomie routera
handler.AddMiddleware(func(h message.HandlerFunc) message.HandlerFunc {
return func(message *message.Message) ([]*message.Message, error) {
log.Println("Wykonanie obsługującego middleware dla", message.UUID)
return h(message)
}
})
// W celach debugowania, drukujemy wszystkie otrzymane wiadomości na `incoming_messages_topic`
router.AddNoPublisherHandler(
"print_incoming_messages",
"incoming_messages_topic",
pubSub,
printMessages,
)
// W celach debugowania, drukujemy wszystkie zdarzenia wysłane do `outgoing_messages_topic`
router.AddNoPublisherHandler(
"print_outgoing_messages",
"outgoing_messages_topic",
pubSub,
printMessages,
)
// Teraz, gdy wszystkie obsługujące zostały zarejestrowane, możemy uruchomić routera.
// Uruchomienie zatrzyma się, dopóki router nie przestanie działać.
// ...
Dostępne Middleware
Oto ponownie wykorzystywane middleware dostarczane przez Watermill, oraz możesz także łatwo zaimplementować własne middleware. Na przykład, jeśli chcesz przechowywać każdą przychodzącą wiadomość w określonym formacie dziennika, jest to najlepszy sposób, aby to zrobić.
Wyłącznik obwodu
// CircuitBreaker jest middleware'em, który obudowuje obsługę w wyłącznik obwodu.
// W zależności od konfiguracji, wyłącznik obwodu będzie szybko zawodzić, jeśli obsługa będzie kontynuować zwracanie błędów.
// Jest to przydatne do zapobiegania kaskadowym awariom.
type CircuitBreaker struct {
cb *gobreaker.CircuitBreaker
}
// NewCircuitBreaker zwraca nowe middleware CircuitBreaker.
// Dostępne ustawienia można znaleźć w dokumentacji gobreaker.
func NewCircuitBreaker(settings gobreaker.Settings) CircuitBreaker {
return CircuitBreaker{
cb: gobreaker.NewCircuitBreaker(settings),
}
}
// Middleware zwraca middleware CircuitBreaker.
func (c CircuitBreaker) Middleware(h message.HandlerFunc) message.HandlerFunc {
return func(msg *message.Message) ([]*message.Message, error) {
out, err := c.cb.Execute(func() (interface{}, error) {
return h(msg)
})
var result []*message.Message
if out != nil {
result = out.([]*message.Message)
}
return result, err
}
}
Korelacja
// SetCorrelationID ustawia identyfikator korelacji dla wiadomości.
//
// Gdy wiadomość wchodzi do systemu, należy wywołać SetCorrelationID.
// Gdy wiadomość jest generowana w żądaniu (np. HTTP), identyfikator korelacji wiadomości powinien być taki sam jak identyfikator korelacji żądania.
func SetCorrelationID(id string, msg *message.Message) {
if MessageCorrelationID(msg) != "" {
return
}
msg.Metadata.Set(CorrelationIDMetadataKey, id)
}
// MessageCorrelationID zwraca identyfikator korelacji z wiadomości.
func MessageCorrelationID(message *message.Message) string {
return message.Metadata.Get(CorrelationIDMetadataKey)
}
// CorrelationID dodaje identyfikator korelacji do wszystkich wiadomości generowanych przez obsługę.
// Identyfikator jest oparty na identyfikatorze wiadomości otrzymanym przez obsługę.
//
// Aby CorrelationID działał poprawnie, najpierw należy wywołać SetCorrelationID dla wiadomości wchodzącej do systemu.
func CorrelationID(h message.HandlerFunc) message.HandlerFunc {
return func(message *message.Message) ([]*message.Message, error) {
producedMessages, err := h(message)
correlationID := MessageCorrelationID(message)
for _, msg := range producedMessages {
SetCorrelationID(correlationID, msg)
}
return producedMessages, err
}
}
Duplikator
// Duplikator przetwarza wiadomość dwukrotnie, aby zapewnić, że punkt końcowy jest idempotentny.
func Duplicator(h message.HandlerFunc) message.HandlerFunc {
return func(msg *message.Message) ([]*message.Message, error) {
firstProducedMessages, firstErr := h(msg)
if firstErr != nil {
return nil, firstErr
}
secondProducedMessages, secondErr := h(msg)
if secondErr != nil {
return nil, secondErr
}
return append(firstProducedMessages, secondProducedMessages...), nil
}
}
Ignorowanie błędów
// IgnoreErrors dostarcza middleware, które pozwala obsłudze ignorować pewne jawnie zdefiniowane błędy.
type IgnoreErrors struct {
ignoredErrors map[string]struct{}
}
// NewIgnoreErrors tworzy nowe middleware IgnoreErrors.
func NewIgnoreErrors(errs []error) IgnoreErrors {
errsMap := make(map[string]struct{}, len(errs))
for _, err := range errs {
errsMap[err.Error()] = struct{}{}
}
return IgnoreErrors{errsMap}
}
// Middleware zwraca middleware IgnoreErrors.
func (i IgnoreErrors) Middleware(h message.HandlerFunc) message.HandlerFunc {
return func(msg *message.Message) ([]*message.Message, error) {
events, err := h(msg)
if err != nil {
if _, ok := i.ignoredErrors[errors.Cause(err).Error()]; ok {
return events, nil
}
return events, err
}
return events, nil
}
}
Natychmiastowe potwierdzenie
// InstantAck sprawia, że obsługa natychmiastowo potwierdza przychodzącą wiadomość, bez względu na jakiekolwiek błędy.
// Można go używać do poprawy wydajności, ale kosztem jest to:
// Jeśli wymagane jest zapewnienie dostarczenia dokładnie jeden raz, można otrzymać dostarczenie co najmniej raz.
// Jeśli wymagane są wiadomości uporządkowane, może to przerwać kolejność.
func InstantAck(h message.HandlerFunc) message.HandlerFunc {
return func(message *message.Message) ([]*message.Message, error) {
message.Ack()
return h(message)
}
}
Trucizna
// PoisonQueue zapewnia funkcję pośredniczącą do obsługi wiadomości niemożliwych do przetworzenia i publikuje je do osobnego tematu.
// Następnie łańcuch główny funkcji pośredniczących kontynuuje wykonywanie, a biznes przebiega normalnie.
func PoisonQueue(pub message.Publisher, topic string) (message.HandlerMiddleware, error) {
if topic == "" {
return nil, ErrInvalidPoisonQueueTopic
}
pq := poisonQueue{
topic: topic,
pub: pub,
shouldGoToPoisonQueue: func(err error) bool {
return true
},
}
return pq.Middleware, nil
}
// PoisonQueueWithFilter jest podobne do funkcji PoisonQueue, ale akceptuje funkcję określającą, które błędy spełniają kryteria kolejki zatrucia.
func PoisonQueueWithFilter(pub message.Publisher, topic string, shouldGoToPoisonQueue func(err error) bool) (message.HandlerMiddleware, error) {
if topic == "" {
return nil, ErrInvalidPoisonQueueTopic
}
pq := poisonQueue{
topic: topic,
pub: pub,
shouldGoToPoisonQueue: shouldGoToPoisonQueue,
}
return pq.Middleware, nil
}
Losowa Awaria
// RandomFail powoduje awarię obsługi na podstawie losowej prawdopodobieństwa. Prawdopodobieństwo błędu powinno mieścić się w zakresie (0, 1).
func RandomFail(errorProbability float32) message.HandlerMiddleware {
return func(h message.HandlerFunc) message.HandlerFunc {
return func(message *message.Message) ([]*message.Message, error) {
if shouldFail(errorProbability) {
return nil, errors.New("wystąpił losowy błąd")
}
return h(message)
}
}
}
// RandomPanic powoduje panikę obsługi na podstawie losowego prawdopodobieństwa. Prawdopodobieństwo paniki powinno zawierać się w zakresie (0, 1).
func RandomPanic(panicProbability float32) message.HandlerMiddleware {
return func(h message.HandlerFunc) message.HandlerFunc {
return func(message *message.Message) ([]*message.Message, error) {
if shouldFail(panicProbability) {
panic("wystąpiła losowa panika")
}
return h(message)
}
}
}
Wyłapiarka
// RecoveredPanicError przechowuje informacje o błędzie odzyskanej paniki i jej śladzie stosu.
type RecoveredPanicError struct {
V interface{}
Stacktrace string
}
// Recoverer odzyskuje ewentualną panikę z obsługi i dołącza RecoveredPanicError z śladem stosu do każdego błędu zwróconego z obsługi.
func Recoverer(h message.HandlerFunc) message.HandlerFunc {
return func(event *message.Message) (events []*message.Message, err error) {
panicked := true
defer func() {
if r := recover(); r != nil || panicked {
err = errors.WithStack(RecoveredPanicError{V: r, Stacktrace: string(debug.Stack())})
}
}()
events, err = h(event)
panicked = false
return events, err
}
}
Ponów próbę
// Retry jest środowiskiem, które ponawia próbę obsługi, jeśli wystąpi błąd.
// Zachowanie ponowienia próby, wykładniczego opóźnienia i maksymalny czas upłynął można skonfigurować.
type Retry struct {
// MaxRetries to maksymalna liczba prób.
MaxRetries int
// InitialInterval to początkowy odstęp między próbami. Następne odstępy będą skalowane przez Mnożnik.
InitialInterval time.Duration
// MaxInterval ustawia górny limit wykładniczego opóźnienia ponownych prób.
MaxInterval time.Duration
// Multiplier to czynnik, przez który będzie mnożony odstęp oczekiwania między próbami.
Multiplier float64
// MaxElapsedTime ustawia maksymalny limit czasu na ponowienia. Jeśli wynosi 0, jest wyłączony.
MaxElapsedTime time.Duration
// RandomizationFactor losowo rozkłada czas oczekiwania według następującego zakresu:
// [currentInterval * (1 - randomization_factor), currentInterval * (1 + randomization_factor)].
RandomizationFactor float64
// OnRetryHook to opcjonalna funkcja wykonywana przy każdej próbie ponowienia.
// Aktualna liczba prób jest przekazywana jako retryNum.
OnRetryHook func(retryNum int, delay time.Duration)
Logger watermill.LoggerAdapter
}
// Middleware zwraca pośrednika Retry.
func (r Retry) Middleware(h message.HandlerFunc) message.HandlerFunc {
return func(msg *message.Message) ([]*message.Message, error) {
producedMessages, err := h(msg)
if err == nil {
return producedMessages, nil
}
expBackoff := backoff.NewExponentialBackOff()
expBackoff.InitialInterval = r.InitialInterval
expBackoff.MaxInterval = r.MaxInterval
expBackoff.Multiplier = r.Multiplier
expBackoff.MaxElapsedTime = r.MaxElapsedTime
expBackoff.RandomizationFactor = r.RandomizationFactor
ctx := msg.Context()
if r.MaxElapsedTime > 0 {
var cancel func()
ctx, cancel = context.WithTimeout(ctx, r.MaxElapsedTime)
defer cancel()
}
retryNum := 1
expBackoff.Reset()
retryLoop:
for {
waitTime := expBackoff.NextBackOff()
select {
case
Ograniczanie przepustowości
// Throttle dostarcza pośrednika do ograniczania liczby przetwarzanych wiadomości w określonym interwale czasowym.
// Może to zapobiec przeciążeniu obsługujących wiadomości na nieprzetworzonej długiej kolejce.
type Throttle struct {
ticker *time.Ticker
}
// NewThrottle tworzy nowy pośrednik Throttle.
// Przykładowa wartość duration i count: NewThrottle(10, time.Second) oznacza 10 wiadomości na sekundę.
func NewThrottle(count int64, duration time.Duration) *Throttle {
return &Throttle{
ticker: time.NewTicker(duration / time.Duration(count)),
}
}
// Middleware zwraca pośrednika Throttle.
func (t Throttle) Middleware(h message.HandlerFunc) message.HandlerFunc {
return func(message *message.Message) ([]*message.Message, error) {
// Ograniczenia udostępniane przez wiele obsługujących wiadomości będą oczekiwać na ich "ticki".
Limit czasowy
// Timeout anuluje kontekst przychodzącej wiadomości po określonym czasie.
// Wszystkie funkcje czułe na upłynięcie czasu obsługującego powinny słuchać msg.Context().Done(), aby dowiedzieć się kiedy zakończyć.
func Timeout(timeout time.Duration) func(message.HandlerFunc) message.HandlerFunc {
return func(h message.HandlerFunc) message.HandlerFunc {
return func(msg *message.Message) ([]*message.Message, error) {
ctx, cancel := context.WithTimeout(msg.Context(), timeout)
defer func() {
cancel()
}()
msg.SetContext(ctx)
return h(msg)
}
}
}