Publisher ve Abone (Subscriber), Watermill'in alt seviye parçalarıdır. Pratik uygulamalarda genellikle ilişkiler, metrikler, zehirli mesaj kuyrukları, yeniden deneme, hız sınırlama vb. gibi yüksek düzeyli arabirimler ve işlevleri kullanmak istersiniz.
Bazen, işleme başarılı olduğunda Onay göndermek istemeyebilirsiniz. Bazen, başka bir mesaj işlendikten sonra bir mesaj göndermek isteyebilirsiniz.
Bu gereksinimleri karşılamak için Yönlendirici (Router) adında bir bileşen bulunmaktadır.
Yapılandırma
Tam kaynak kod: github.com/ThreeDotsLabs/watermill/message/router.go
// ...
type RouterConfig struct {
// CloseTimeout, yönlendiricinin kapatılırken işleyiciler için ne kadar çalışması gerektiğini belirler.
CloseTimeout time.Duration
}
func (c *RouterConfig) setDefaults() {
if c.CloseTimeout == 0 {
c.CloseTimeout = time.Second * 30
}
}
// Validate, yönlendirici yapılandırmasında herhangi bir hata olup olmadığını kontrol eder.
func (c RouterConfig) Validate() error {
return nil
}
// ...
Handler
Öncelikle, HandlerFunc
fonksiyonunu uygulamanız gerekmektedir:
Tam kaynak kod: github.com/ThreeDotsLabs/watermill/message/router.go
// ...
// HandlerFunc, bir mesaj alındığında çağrılan fonksiyondur.
//
// HandlerFunc bir hata döndürmediğinde, msg.Ack() otomatik olarak çağrılacaktır.
//
// HandlerFunc bir hata döndürdüğünde, msg.Nack() çağrılacaktır.
//
// HandlerFunc içinde msg.Ack() çağrıldığında ve HandlerFunc bir hata döndürdüğünde,
// Ack zaten gönderildiği için msg.Nack() gönderilmeyecektir.
//
// Birden fazla mesaj alındığında (HandlerFunc'te msg.Ack() gönderildiği veya Abone birden fazla tüketiciyi desteklediğinde),
// HandlerFunc eş zamanlı olarak yürütülecektir.
type HandlerFunc func(msg *Message) ([]*Message, error)
// ...
Daha sonra, yeni bir işleyici eklemek için Router.AddHandler
kullanmanız gerekmektedir:
Tam kaynak kod: github.com/ThreeDotsLabs/watermill/message/router.go
// ...
// AddHandler, yeni bir işleyici ekler.
// handlerName benzersiz olmalıdır. Şu anda yalnızca hata ayıklama için kullanılır.
// subscribeTopic, işleyicinin mesajları alacağı konudur.
// publishTopic, işleyicinin döndürdüğü mesajların Router tarafından oluşturulacağı konudur.
// İşleyicinin birden fazla konuya yayınlaması gerektiğinde,
// önerilen, yalnızca Publisher'ın işleyiciye enjekte edilmesi veya middleware'ın uygulanmasıdır,
// böylece metaveriye bağlı olarak mesajları yakalayabilir ve belirli konulara yayınlayabilir.
// Router zaten çalışırken bir işleyici eklenirse, RunHandlers() açıkça çağrılmalıdır.
func (r *Router) AddHandler(
handlerName string,
subscribeTopic string,
subscriber Subscriber,
publishTopic string,
publisher Publisher,
handlerFunc HandlerFunc,
) *Handler {
r.logger.Info("İşleyici ekleniyor", watermill.LogFields{
"handler_name": handlerName,
"topic": subscribeTopic,
})
r.handlersLock.Lock()
defer r.handlersLock.Unlock()
if _, ok := r.handlers[handlerName]; ok {
panic(DuplicateHandlerNameError{handlerName})
}
publisherName, subscriberName := internal.StructName(publisher), internal.StructName(subscriber)
newHandler := &handler{
name: handlerName,
logger: r.logger,
subscriber: subscriber,
subscribeTopic: subscribeTopic,
subscriberName: subscriberName,
publisher: publisher,
publishTopic: publishTopic,
publisherName: publisherName,
handlerFunc: handlerFunc,
runningHandlersWg: r.runningHandlersWg,
runningHandlersWgLock: r.runningHandlersWgLock,
messagesCh: nil,
routersCloseCh: r.closingInProgressCh,
startedCh: make(chan struct{}),
}
r.handlersWg.Add(1)
r.handlers[handlerName] = newHandler
select {
case r.handlerAdded struct{}{}:
default:
// closeWhenAllHandlersStopped her zaman handlerAdded'ı beklemiyor
}
return &Handler{
router: r,
handler: newHandler,
}
}
// AddNoPublisherHandler, yeni bir işleyici ekler.
// Bu işleyici mesaj döndüremez.
// Bir mesaj döndürdüğünde hata oluşur ve bir Nack gönderilir.
//
// handlerName benzersiz olmalıdır. Şu anda yalnızca hata ayıklama için kullanılır.
// subscribeTopic, işleyicinin mesajları alacağı konudur.
// subscriber, mesajları tüketmek için kullanılan abonedir.
// Router zaten çalışırken bir işleyici eklenirse, RunHandlers() açıkça çağrılmalıdır.
func (r *Router) AddNoPublisherHandler(
handlerName string,
subscribeTopic string,
subscriber Subscriber,
handlerFunc NoPublishHandlerFunc,
) *Handler {
handlerFuncAdapter := func(msg *Message) ([]*Message, error) {
// ...
"Başlarken" bölümündeki örnek kullanımı inceleyiniz. Tam kaynak kod: github.com/ThreeDotsLabs/watermill/_examples/basic/3-router/main.go
// ...
// AddHandler, işleyici düzeyinde middleware eklemek veya işleyicileri durdurmak için kullanılabilen bir işleyici döndürür.
handler := router.AddHandler(
"struct_handler", // işleyici adı, benzersiz olmalıdır
"incoming_messages_topic", // olayların okunduğu konu
pubSub,
"outgoing_messages_topic", // olayların yayınlandığı konu
pubSub,
structHandler{}.Handler,
)
// İşleyici düzeyindeki middleware yalnızca belirli işleyiciler için yürütülür
// Bu middleware, router düzeyindeki middleware'lerle aynı şekilde eklenir
handler.AddMiddleware(func(h message.HandlerFunc) message.HandlerFunc {
return func(message *message.Message) ([]*message.Message, error) {
log.Println("İşleyiciye özgü middleware'i uygulama, mesaj UUID'si: ", message.UUID)
return h(message)
}
})
// ...
Yayıncı İşleyici Yok
Her işleyici yeni bir mesaj oluşturmayabilir. Böyle bir işleyici eklemek için Router.AddNoPublisherHandler
'ı kullanabilirsiniz:
Tam kaynak kod: github.com/ThreeDotsLabs/watermill/message/router.go
// ...
// AddNoPublisherHandler yeni bir işleyici ekler.
// Bu işleyici mesaj döndüremez.
// Mesaj döndürdüğünde bir hata oluşacak ve Nack gönderilecek.
//
// handlerName benzersiz olmalıdır ve şu anda yalnızca hata ayıklama amaçları için kullanılır.
//
// subscribeTopic, işleyicinin mesajları alacağı konudur.
//
// subscriber, mesajları tüketmek için kullanılır.
//
// Çalışmakta olan bir yönlendirmeye işleyici eklerseniz, açıkça RunHandlers() çağrısı yapmanız gerekir.
func (r *Router) AddNoPublisherHandler(
handlerName string,
subscribeTopic string,
subscriber Subscriber,
handlerFunc NoPublishHandlerFunc,
) *Handler {
handlerFuncAdapter := func(msg *Message) ([]*Message, error) {
// ...
}
Onaylama
Varsayılan olarak, HanderFunc
bir hata döndürmediğinde, msg.Ack()
çağrılacaktır. Bir hata döndürüldüğünde, msg.Nack()
çağrılacaktır. Dolayısıyla, mesajı işledikten sonra msg.Ack()
veya msg.Nack
'ı çağırmak zorunda değilsiniz (tabii ki isterseniz çağırabilirsiniz).
Mesaj Üretme
İşleyici tarafından birden fazla mesaj döndürüldüğünde, çoğu Yayıncı uygulamasının mesajların atomik olarak yayınlanmasını desteklemediğine dikkat edin. Eğer broker veya depo kullanılamaz durumdaysa, sadece bazı mesajlar oluşturulabilir ve msg.Nack()
gönderilecektir.
Bu bir problemse, her işleyicinin yalnızca bir mesaj yayınlamasını düşünün.
Yönlendiriciyi Çalıştırma
Yönlendiriciyi çalıştırmak için Run()
çağrısı yapmalısınız.
Tam kaynak kod: github.com/ThreeDotsLabs/watermill/message/router.go
// ...
// Run tüm eklentileri ve işleyicileri çalıştırır ve verilen konulara abone olmayı başlatır.
// Bu çağrı, yönlendirici çalışırken bekler.
//
// Tüm işleyiciler durduğunda (örneğin abonelik kapatıldığı için), yönlendirici de duracaktır.
//
// Run() durdurmak için yönlendirici üzerinde Close() çağrısı yapmalısınız.
//
// ctx, tüm abonelere iletilir.
//
// Tüm işleyiciler durduğunda (örneğin bağlantılar kapandığında), Run() de duracaktır.
func (r *Router) Run(ctx context.Context) (err error) {
// ...
}
Yönlendiricinin Çalıştığından Emin Olma
Yönlendiricinin çalışıp çalışmadığını anlamak faydalı olabilir. Bunu Running()
yöntemini kullanarak başarabilirsiniz.
Tam kaynak kod: github.com/ThreeDotsLabs/watermill/message/router.go
// ...
// Running, yönlendirici çalıştığında kapanır.
// Başka bir deyişle, yönlendiricinin çalışmasını şu şekilde bekleyebilirsiniz:
// fmt.Println("Yönlendirici başlatılıyor")
// go r.Run(ctx)
// // fmt.Println("Yönlendirici çalışıyor")
// Uyarı: Tarihsel nedenlerden dolayı, bu kanal yönlendiricinin kapanma durumunun farkında değildir - eğer yönlendirici çalışmaya devam ederse ve ardından kapanırsa kapanacaktır.
func (r *Router) Running() chan struct{} {
// ...
}
Ayrıca IsRunning
işlevini kullanarak bir boolean değeri döndüren fonksiyonu da kullanabilirsiniz:
Tam kaynak kod: github.com/ThreeDotsLabs/watermill/message/router.go
// ...
// IsRunning, yönlendirici çalıştığında true döndürür.
//
// Uyarı: Tarihsel nedenlerden dolayı, bu yöntem yönlendiricinin kapalı durumu hakkında bilgi sahibi değildir.
// Yönlendiricinin kapatılıp kapatılmadığını bilmek istiyorsanız, IsClosed'ı kullanın.
func (r *Router) IsRunning() bool {
// ...
}
Yönlendiriciyi kapatma
Yönlendiriciyi kapatmak için Close()
metodunu çağırmanız gerekmektedir.
Tam kaynak kod: github.com/ThreeDotsLabs/watermill/message/router.go
// ...
// Close yönlendiriciyi yapılandırmada belirtilen bir zaman aşımı ile kapatır.
func (r *Router) Close() error {
r.closedLock.Lock()
// ...
Close()
tüm yayıncıları ve aboneleri kapatır ve tüm işleyicilerin tamamlanmasını bekler.
Close()
yapılandırmadaki RouterConfig.CloseTimeout
içinde belirlenen zaman aşımını bekler. Zaman aşımına ulaşıldığında Close()
bir hata dönecektir.
Yönlendiriciyi başlattıktan sonra işleyici eklemek
Yönlendirici zaten çalışır durumdayken yeni bir işleyici ekleyebilirsiniz. Bunun için AddNoPublisherHandler
veya AddHandler
metodunu çağırmanız ve ardından RunHandlers
metodunu çağırmanız gerekmektedir.
Tam kaynak kod: github.com/ThreeDotsLabs/watermill/message/router.go
// ...
// RunHandlers, Run() metodundan sonra eklenen tüm işleyicileri çalıştırır.
// RunHandlers idempotent olduğu için birden fazla kez güvenle çağrılabilir.
func (r *Router) RunHandlers(ctx context.Context) error {
// ...
Çalışan işleyicileri durdurma
Yalnızca bir çalışan işleyiciyi durdurabilirsiniz, bunun için Stop()
metodunu çağırmanız gerekmektedir.
Lütfen dikkat edin, yönlendirici çalışan işleyiciler kalmadığında kapanacaktır.
Tam kaynak kod: github.com/ThreeDotsLabs/watermill/message/router.go
// ...
// Stop işleyiciyi durdurur.
// Stop asenkron olarak çalışır.
// İşleyicinin durdurulup durdurulmadığını Stopped() fonksiyonu ile kontrol edebilirsiniz.
func (h *Handler) Stop() {
// ...
Yürütme modeli
Aboneler, tek bir iletiyi sıralı olarak tüketebilir veya çoklu iletileri paralel olarak tüketebilir.
-
Tek ileti akışı en basit yöntemdir, bu durumda aboneler
msg.Ack()
çağrılana kadar yeni iletiler almayacaktır. -
Çoklu ileti akışı sadece belirli aboneler tarafından desteklenmektedir. Aynı anda birden fazla konu bölümüne abone olarak, hatta önceden tanımlanmış olmayan iletiler de dahil olmak üzere çoklu iletiler paralel bir şekilde tüketilebilir (ör. Kafka aboneleri nasıl çalışır). Yönlendirici bu modeli
HandlerFunc
ı paralel olarak çalıştırarak işler.
Desteklenen yürütme modellerini anlamak için seçilen Yaygın/Yayın belgelemesine başvurun.
Ara yazılım
Tam kaynak kod: github.com/ThreeDotsLabs/watermill/message/router.go
// ...
// HandlerMiddleware, HandlerFunc için bir dekoratör gibi bir şey yazmamızı sağlayan bir öğedir.
// Bu, işleyici öncesi (ör. tüketilen iletiyi değiştirme) veya işleyici sonrası işlemleri (oluşturulan iletiyi değiştirme, tüketilen iletiyi onaylama/reddetme, hataları işleme, günlüğe kaydetme vb.) gerçekleştirebilir.
//
// Router'a `AddMiddleware` metoduyla eklenebilir.
//
// Örnek:
//
// func ExampleMiddleware(h message.HandlerFunc) message.HandlerFunc {
// return func(message *message.Message) ([]*message.Message, error) {
// fmt.Println("işleyici öncesi çalıştırıldı")
// üretilenIletiler, err := h(message)
// fmt.Println("işleyici sonrası çalıştırıldı")
//
// return üretilenIletiler, err
// }
// }
type HandlerMiddleware func(h HandlerFunc) HandlerFunc
// ...
Standart ara yazılımın tam listesi Middlewares içinde bulunabilir.
Eklentiler
Tam kaynak kod: github.com/ThreeDotsLabs/watermill/message/router.go
// ...
// RouterPlugin, yönlendirici başlatıldığında çalıştırılan bir fonksiyondur.
type RouterPlugin func(*Router) error
// ...
Standart eklentilerin tam listesi message/router/plugin içinde bulunabilir.
Bağlam
Her işleyici tarafından alınan her ileti için context
içinde bazı kullanışlı değerler saklanır:
Tam kaynak kod: github.com/ThreeDotsLabs/watermill/message/router_context.go
// ...
// HandlerNameFromCtx, mesajı bağlamdan tüketen yönlendiricideki işleyicinin adını döndürür.
func HandlerNameFromCtx(ctx context.Context) string {
return valFromCtx(ctx, handlerNameKey)
}
// PublisherNameFromCtx, mesaj yayıncısının adını döndürür.
// Örneğin, Kafka için `kafka.Publisher` olacaktır.
func PublisherNameFromCtx(ctx context.Context) string {
return valFromCtx(ctx, publisherNameKey)
}
// SubscriberNameFromCtx, mesaj abonesinin adını döndürür.
// Örneğin, Kafka için `kafka.Subscriber` olacaktır.
func SubscriberNameFromCtx(ctx context.Context) string {
return valFromCtx(ctx, subscriberNameKey)
}
// SubscribeTopicFromCtx, iletişim kurulan konuyu döndürür.
func SubscribeTopicFromCtx(ctx context.Context) string {
return valFromCtx(ctx, subscribeTopicKey)
}
// PublishTopicFromCtx, mesajın yayınlanacağı konuyu döndürür.
func PublishTopicFromCtx(ctx context.Context) string {
return valFromCtx(ctx, publishTopicKey)
}
// ...