Giriş

Middleware, etkinlik çerçevesini genişletmek, özel işlevsellik eklemek ve ana işleyicinin mantığıyla ilgili olmayan önemli işlevselliği sağlamak için kullanılır. Örneğin, bir hata döndükten sonra işleyiciyi tekrar denemek veya işleyici içinde panikten kurtulmak ve stack trace'ı yakalamak.

Middleware işlev imzası aşağıda tanımlanmıştır:

Tam kaynak kod: github.com/ThreeDotsLabs/watermill/message/router.go

// ...
// HandlerMiddleware, işleyiciye benzer dekoratörler yazmamıza olanak tanır.
// İşleyiciden önce bazı işlemleri gerçekleştirebilir (örneğin, tüketilen iletiyi değiştirme)
// ve işleyiciden sonra bazı işlemleri gerçekleştirebilir (üretilen iletiyi değiştirme, tüketilen iletiyi ACK/NACK etme, hataları işleme, günlüğe kaydetme vb.).
//
// `AddMiddleware` yöntemi kullanılarak yönlendiriciye eklenir.
//
// Örnek:
//
//	func ExampleMiddleware(h message.HandlerFunc) message.HandlerFunc {
//		return func(message *message.Message) ([]*message.Message, error) {
//			fmt.Println("İşleyiciyi çalıştırmadan önce")
//			üretilenIletiler, err := h(message)
//			fmt.Println("İşleyiciyi çalıştırdıktan sonra")
//
//			return üretilenIletiler, err
//		}
//	}
type HandlerMiddleware func(h HandlerFunc) HandlerFunc
// ...

Kullanım

Ara yazılım, yönlendiricideki tüm işleyicilere veya belirli işleyicilere uygulanabilir. Ara yazılım doğrudan yönlendiğinde, yönlendirici için sağlanan tüm işleyicilere uygulanacaktır. Bir ara yazılım yalnızca belirli bir işleyiciye uygulanıyorsa, yönlendiricideki işleyiciye eklenmelidir.

İşte bir kullanım örneği:

Tam Kaynak Kod: github.com/ThreeDotsLabs/watermill/_examples/basic/3-router/main.go

// ...
router, err := message.NewRouter(message.RouterConfig{}, logger)
if err != nil {
    panic(err)
}

// SIGTERM sinyali alındığında, SignalsHandler yönlendiriciyi düzenli bir şekilde kapatır.
// Ayrıca, yönlendiriciyi 'r.Close()' çağırarak da kapatabilirsiniz.
router.AddPlugin(plugin.SignalsHandler)

// Yönlendirici seviyesindeki ara yazılım, yönlendiriciye gönderilen her ileti için yürütülecektir
router.AddMiddleware(
    // CorrelationID, gelen mesaj meta verilerinden oluşturulan iletiye korelasyon kimliğini kopyalar
    middleware.CorrelationID,

    // İşleyici bir hata döndürürse, tekrar denenecektir.
    // En fazla MaxRetries kez tekrar denenecek, bu süre zarfında ileti Nacked olacak ve PubSub tarafından yeniden gönderilecektir.
    middleware.Retry{
        MaxRetries:      3,
        InitialInterval: time.Millisecond * 100,
        Logger:          logger,
    }.Middleware,

    // Recoverer, işleyicideki çökmelerle başa çıkar.
    // Bu durumda, onları hata olarak Retry ara yazılıma geçirir.
    middleware.Recoverer,
)

// Basitlik için, burada gochannel Pub/Sub kullanıyoruz,
// herhangi bir Pub/Sub uygulamasıyla değiştirebilir ve aynı şekilde çalışacaktır.
pubSub := gochannel.NewGoChannel(gochannel.Config{}, logger)

// Gelen bazı iletileri arka planda yayınlayın
go publishMessages(pubSub)

// AddHandler, işleyici seviyesindeki ara yazılım eklemek veya işleyiciyi durdurmak için kullanılabilecek bir işleyici döndürür.
// veya durdurmak için kullanılabilir.
handler := router.AddHandler(
    "struct_handler",          // İşleyici adı, benzersiz olmalıdır
    "incoming_messages_topic", // Olayların okunacağı konu
    pubSub,
    "outgoing_messages_topic", // Olayların yayınlanacağı konu
    pubSub,
    structHandler{}.Handler,
)

// İşleyici seviyesindeki ara yazılım yalnızca belirli işleyiciler için yürütülür
// Bu tür ara yazılım, yönlendirici seviyesindeki ara yazılıma benzer şekilde işleyiciye eklenir
handler.AddMiddleware(func(h message.HandlerFunc) message.HandlerFunc {
    return func(message *message.Message) ([]*message.Message, error) {
        log.Println("Mesaj için işleyiciye özgü ara yazılım yürütülüyor", message.UUID)

        return h(message)
    }
})

// Sadece hata ayıklama amaçlı olarak, 'incoming_messages_topic' üzerinde alınan tüm iletiler yazdırılır
router.AddNoPublisherHandler(
    "print_incoming_messages",
    "incoming_messages_topic",
    pubSub,
    printMessages,
)

// Sadece hata ayıklama amaçlı olarak, 'outgoing_messages_topic' üzerine gönderilen tüm olaylar yazdırılır
router.AddNoPublisherHandler(
    "print_outgoing_messages",
    "outgoing_messages_topic",
    pubSub,
    printMessages,
)

// Tüm işleyicilerin kaydedilmesinden sonra yönlendiriciyi çalıştırabiliriz.
// Run, yönlendirici durana kadar bloke olur.
// ...

Mevcut Ara Yazılımlar

İşte Watermill tarafından sağlanan yeniden kullanılabilir ara yazılımlar ve ayrıca kolayca kendi ara yazılımınızı da uygulayabilirsiniz. Örneğin, her gelen iletiyi belirli bir türde log formatında saklamak istiyorsanız, bunu yapmanın en iyi yolu budur.

Devre Kesici (Circuit Breaker)

// CircuitBreaker, bir işleyiciyi bir devre kesiciyle saran bir ara yazılımdır.
// Yapılandırmaya bağlı olarak, işleyici hatalar vermeye devam ederse devre kesici hızlı bir şekilde başarısız olur.
// Bu, ardışık hataları önlemek için kullanışlıdır.
type CircuitBreaker struct {
    cb *gobreaker.CircuitBreaker
}
// NewCircuitBreaker, yeni bir CircuitBreaker ara yazılımı döndürür.
// Kullanılabilir ayarlar için lütfen gobreaker belgelerine başvurun.
func NewCircuitBreaker(settings gobreaker.Settings) CircuitBreaker {
    return CircuitBreaker{
        cb: gobreaker.NewCircuitBreaker(settings),
    }
}
// Middleware, Devre Kesici ara yazılımını döndürür.
func (c CircuitBreaker) Middleware(h message.HandlerFunc) message.HandlerFunc {
    return func(msg *message.Message) ([]*message.Message, error) {
        out, err := c.cb.Execute(func() (interface{}, error) {
            return h(msg)
        })

        var result []*message.Message
        if out != nil {
            result = out.([]*message.Message)
        }

        return result, err
    }
}

Korelasyon (Correlation)

// SetCorrelationID, mesaj için korelasyon kimliğini ayarlar.
//
// Bir mesaj sistemimize girdiğinde, SetCorrelationID çağrılmalıdır.
// Bir istekte (ör. HTTP) oluşturulan bir mesaj, mesajın korelasyon kimliği isteğin korelasyon kimliğiyle aynı olmalıdır.
func SetCorrelationID(id string, msg *message.Message) {
    if MessageCorrelationID(msg) != "" {
        return
    }

    msg.Metadata.Set(CorrelationIDMetadataKey, id)
}
// MessageCorrelationID, mesajdan korelasyon kimliğini döndürür.
func MessageCorrelationID(message *message.Message) string {
    return message.Metadata.Get(CorrelationIDMetadataKey)
}
// CorrelationID, işleyici tarafından üretilen tüm mesajlara bir korelasyon kimliği ekler.
// Kimlik, işleyici tarafından alınan mesaj kimliğine dayanmaktadır.
//
// CorrelationID'nin doğru çalışması için, mesajın sistemimize girmesi için önce SetCorrelationID'nin çağrılması gerekir.
func CorrelationID(h message.HandlerFunc) message.HandlerFunc {
    return func(message *message.Message) ([]*message.Message, error) {
        producedMessages, err := h(message)

        correlationID := MessageCorrelationID(message)
        for _, msg := range producedMessages {
            SetCorrelationID(correlationID, msg)
        }

        return producedMessages, err
    }
}

Çoğaltıcı (Duplicator)

// Duplicator, uç noktanın idempotent olmasını sağlamak için mesajı iki kez işler.
func Duplicator(h message.HandlerFunc) message.HandlerFunc {
    return func(msg *message.Message) ([]*message.Message, error) {
        firstProducedMessages, firstErr := h(msg)
        if firstErr != nil {
            return nil, firstErr
        }

        secondProducedMessages, secondErr := h(msg)
        if secondErr != nil {
            return nil, secondErr
        }

        return append(firstProducedMessages, secondProducedMessages...), nil
    }
}

Hataları Yoksay (Ignore Errors)

// IgnoreErrors, işleyicinin belirli açıkça tanımlanmış hataları yoksaymasına izin veren bir ara yazılım sağlar.
type IgnoreErrors struct {
    ignoredErrors map[string]struct{}
}
// NewIgnoreErrors, yeni bir IgnoreErrors ara yazılımı oluşturur.
func NewIgnoreErrors(errs []error) IgnoreErrors {
    errsMap := make(map[string]struct{}, len(errs))

    for _, err := range errs {
        errsMap[err.Error()] = struct{}{}
    }

    return IgnoreErrors{errsMap}
}
// Middleware, IgnoreErrors ara yazılımını döndürür.
func (i IgnoreErrors) Middleware(h message.HandlerFunc) message.HandlerFunc {
    return func(msg *message.Message) ([]*message.Message, error) {
        events, err := h(msg)
        if err != nil {
            if _, ok := i.ignoredErrors[errors.Cause(err).Error()]; ok {
                return events, nil
            }

            return events, err
        }

        return events, nil
    }
}

Anında Onay (Instant Ack)

// InstantAck, işleyicinin gelen mesajı herhangi bir hata olmaksızın hemen onaylamasını sağlar.
// Verimliliği artırmak için kullanılabilir ancak takas şunlardır:
// Kesinlikle bir kere teslimat sağlamanız gerekiyorsa, en az bir kez teslimat alabilirsiniz.
// Sıralı mesajlara ihtiyacınız varsa, sıralamayı bozabilir.
func InstantAck(h message.HandlerFunc) message.HandlerFunc {
    return func(message *message.Message) ([]*message.Message, error) {
        message.Ack()
        return h(message)
    }
}

Zehir

// PoisonQueue, işlenemeyen mesajları ele almak ve bunları ayrı bir konuya yayınlamak için bir ara yazılım özelliği sağlar.
// Ardından, ana ara yazılım zinciri devam eder ve iş aslında devam eder.
func PoisonQueue(pub message.Publisher, topic string) (message.HandlerMiddleware, error) {
	if topic == "" {
		return nil, ErrInvalidPoisonQueueTopic
	}

	pq := poisonQueue{
		topic: topic,
		pub:   pub,
		shouldGoToPoisonQueue: func(err error) bool {
			return true
		},
	}

	return pq.Middleware, nil
}

// PoisonQueueWithFilter, PoisonQueue'ye benzer, ancak zehir kuyruğu kriterlerine uyan hataları belirlemek için bir işlevi kabul eder.
func PoisonQueueWithFilter(pub message.Publisher, topic string, shouldGoToPoisonQueue func(err error) bool) (message.HandlerMiddleware, error) {
	if topic == "" {
		return nil, ErrInvalidPoisonQueueTopic
	}

	pq := poisonQueue{
		topic: topic,
		pub:   pub,
		shouldGoToPoisonQueue: shouldGoToPoisonQueue,
	}

	return pq.Middleware, nil
}

Rastgele Başarısızlık

// RandomFail, işleyicinin rastgele bir olasılığa dayalı olarak başarısız olmasına neden olur. Hata olasılığı (0, 1) aralığında olmalıdır.
func RandomFail(errorProbability float32) message.HandlerMiddleware {
	return func(h message.HandlerFunc) message.HandlerFunc {
		return func(message *message.Message) ([]*message.Message, error) {
			if shouldFail(errorProbability) {
				return nil, errors.New("rastgele bir hata oluştu")
			}
			return h(message)
		}
	}
}

// RandomPanic, işleyicinin rastgele bir olasılığa dayalı olarak paniklemesine neden olur. Panik olasılığı (0, 1) aralığında olmalıdır.
func RandomPanic(panicProbability float32) message.HandlerMiddleware {
	return func(h message.HandlerFunc) message.HandlerFunc {
		return func(message *message.Message) ([]*message.Message, error) {
			if shouldFail(panicProbability) {
				panic("rastgele bir panik oluştu")
			}
			return h(message)
		}
	}
}

Kurtarıcı

// RecoveredPanicError, kurtarılan panik hatasını ve stack izleme bilgilerini tutar.
type RecoveredPanicError struct {
	V          interface{}
	Stacktrace string
}

// Recoverer, işleyiciden herhangi bir panik kurtarır ve kurtarılan panik hatasıyla stack izleme bilgisini herhangi bir hata döndürür.
func Recoverer(h message.HandlerFunc) message.HandlerFunc {
	return func(event *message.Message) (events []*message.Message, err error) {
		panicked := true

		defer func() {
			if r := recover(); r != nil || panicked {
				err = errors.WithStack(RecoveredPanicError{V: r, Stacktrace: string(debug.Stack())})
			}
		}()

		events, err = h(event)
		panicked = false
		return events, err
	}
}

Yeniden Deneme

// Retry, bir hata döndürülürse işleyiciyi yeniden dener sağlayan bir ara yazılım sağlar.
// Yeniden deneme davranışı, üs tabanlı bekleme (exponential backoff) ve maksimum geçen zaman yapılandırılabilir.
type Retry struct {
	// MaxRetries, yapılacak maksimum deneme sayısını belirtir.
	MaxRetries int

	// InitialInterval, yeniden denemeler arasındaki ilk aralıktır. Sonraki aralıklar Çarpan tarafından ölçeklendirilir.
	InitialInterval time.Duration
	// MaxInterval, yeniden deneme için üs tabanlı bekleme süresinin üst sınırını belirler.
	MaxInterval time.Duration
	// Multiplier, yeniden denemeler arasındaki bekleme süresi Çarpanıdır.
	Multiplier float64
	// MaxElapsedTime, yeniden deneme için maksimum zaman sınırını belirler. 0 ise devre dışı bırakılır.
	MaxElapsedTime time.Duration
	// RandomizationFactor, bekleme süresini rastgele yaymak için kullanılır:
	// [currentInterval * (1 - randomization_factor), currentInterval * (1 + randomization_factor)].
	RandomizationFactor float64

	// OnRetryHook, her yeniden deneme girişiminde yürütülecek isteğe bağlı bir işlevdir.
	// Geçerli yeniden deneme numarası, yeniden deneme yapılacak gecikme süresiyle birlikte geçirilir.
	OnRetryHook func(retryNum int, delay time.Duration)

	Logger watermill.LoggerAdapter
}
// Middleware, Retry ara yazılımını döndürür.
func (r Retry) Middleware(h message.HandlerFunc) message.HandlerFunc {
	return func(msg *message.Message) ([]*message.Message, error) {
		produc


edMessages, err := h(msg)
		if err == nil {
			return producedMessages, nil
		}

		expBackoff := backoff.NewExponentialBackOff()
		expBackoff.InitialInterval = r.InitialInterval
		expBackoff.MaxInterval = r.MaxInterval
		expBackoff.Multiplier = r.Multiplier
		expBackoff.MaxElapsedTime = r.MaxElapsedTime
		expBackoff.RandomizationFactor = r.RandomizationFactor

		ctx := msg.Context()
		if r.MaxElapsedTime > 0 {
			var cancel func()
			ctx, cancel = context.WithTimeout(ctx, r.MaxElapsedTime)
			defer cancel()
		}

		retryNum := 1
		expBackoff.Reset()
	retryLoop:
		for {
			waitTime := expBackoff.NextBackOff()
			select {
			case 

Boğma

// Throttle, belirli bir zaman dilimi içinde işlenen mesajların sayısını sınırlandıran bir ara yazılım sağlar.
// Bu, işlenmemiş uzun bir kuyrukta çalışan işleyicilerin aşırı yüklenmesini önlemek için kullanılabilir.
type Throttle struct {
	ticker *time.Ticker
}
// NewThrottle, yeni bir Throttle ara yazılımı oluşturur.
// Örnek süre ve sayı: NewThrottle(10, time.Second), saniyede 10 mesajı gösterir.
func NewThrottle(count int64, duration time.Duration) *Throttle {
	return &Throttle{
		ticker: time.NewTicker(duration / time.Duration(count)),
	}
}
// Middleware, Throttle ara yazılımını döndürür.
func (t Throttle) Middleware(h message.HandlerFunc) message.HandlerFunc {
	return func(message *message.Message) ([]*message.Message, error) {
		// Birden çok işleyici tarafından paylaşılan boğmalar kendi "tıkaçları" için bekleyecek.

Zaman Aşımı

// ZamanAşımı, belirtilen süreden sonra gelen mesajın bağlamını iptal eder.
// İşleyicinin zaman aşımı duyarlı işlevleri başarısız olacağında msg.Context().Done() dinlenmelidir.
func Timeout(timeout time.Duration) func(message.HandlerFunc) message.HandlerFunc {
	return func(h message.HandlerFunc) message.HandlerFunc {
		return func(msg *message.Message) ([]*message.Message, error) {
			ctx, cancel := context.WithTimeout(msg.Context(), timeout)
			defer func() {
				cancel()
			}()

			msg.SetContext(ctx)
			return h(msg)
		}
	}
}