Prometheus kullanarak Watermill'in gerçek zamanlı izlenmesi

Metrikler

Watermill, yayıncılar/aboneler için dekoratörler ve işleyiciler için orta yazılımlar kullanılarak izlenebilir. Resmi Go Prometheus istemcisi kullanarak varsayılan bir uygulama sunmaktayız.

components/metrics paketi, ilgili Prometheus kayıt defterini güncellemek için yayıncıları, aboneleri ve işleyicileri sarmalamak için uygun işlevler sağlayan PrometheusMetricsBuilder'ı dışa aktarır.

Tam kaynak kodu: github.com/ThreeDotsLabs/watermill/components/metrics/builder.go

// ...
// PrometheusMetricsBuilder, yayıncıları, aboneleri ve işleyicileri dekore etmek için yöntemler sağlar.
type PrometheusMetricsBuilder struct {
    // PrometheusRegistry, mevcut bir Prometheus kayıt defterini doldurabilir veya varsayılan kayıt defterini kullanmak için boş olabilir.
    PrometheusRegistry prometheus.Registerer

    Namespace string
    Subsystem string
}

// AddPrometheusRouterMetrics, mesaj yönlendiricisindeki tüm işleyicilere metrik orta yazılım eklemek için uygun bir işlevdir. Ayrıca, işleyicilerin yayıncılarını ve abonelerini dekore eder.
func (b PrometheusMetricsBuilder) AddPrometheusRouterMetrics(r *message.Router) {
// ...

Yayıncıları, Aboneleri ve İşleyicileri Sarma

Watermill'in yönlendiricisini kullandığınızda (çoğu durumda önerilir), AddPrometheusRouterMetrics işlevini kullanarak bu yönlendiriciye eklenen tüm işleyicilerin Prometheus kayıt defterini güncellemek üzere sardıklarından emin olabilirsiniz, aynı zamanda yayıncıları ve abonelerini de:

Tam kaynak kodu: github.com/ThreeDotsLabs/watermill/components/metrics/builder.go

// ...
// AddPrometheusRouterMetrics, mesaj yönlendiricisindeki tüm işleyicilere metrik orta yazılım eklemek için uygun bir işlevdir. Ayrıca, işleyicilerin yayıncılarını ve abonelerini dekore eder.
func (b PrometheusMetricsBuilder) AddPrometheusRouterMetrics(r *message.Router) {
    r.AddPublisherDecorators(b.DecoratePublisher)
    r.AddSubscriberDecorators(b.DecorateSubscriber)
    r.AddMiddleware(b.NewRouterMiddleware().Middleware)
}
// ...

AddPrometheusRouterMetrics'in kullanım örneği:

Tam kaynak kodu: github.com/ThreeDotsLabs/watermill/_examples/basic/4-metrics/main.go

// ...
    // Namespace ve subsystem parametrelerini boş bırakıyoruz
    metricsBuilder := metrics.NewPrometheusMetricsBuilder(prometheusRegistry, "", "")
    metricsBuilder.AddPrometheusRouterMetrics(router)
// ...

Yukarıdaki kod örneğinde, namespace ve subsystem parametrelerini boş bırakıyoruz. Prometheus istemci kitaplığı bu parametreleri kullanır ve metrik adlarını öneki ile ekler. Namespace veya subsystem kullanmak isteyebilirsiniz, ancak metrik adlarını etkileyeceğinden Grafana gösterge panelinizi buna göre ayarlamanız gerekecektir.

Bağımsız yayıncılar ve aboneler ayrıca PrometheusMetricsBuilder'ın özel yöntemlerini kullanarak dekore edilebilir:

Tam kaynak kodu: github.com/ThreeDotsLabs/watermill/_examples/basic/4-metrics/main.go

// ...
    subWithMetrics, err := metricsBuilder.DecorateSubscriber(pubSub)
    if err != nil {
        panic(err)
    }
    pubWithMetrics, err := metricsBuilder.DecoratePublisher(pub)
    if err != nil {
        panic(err)
    }
// ...

/metrics Uç Noktasını Açma

Prometheus'un çalışma prensibine göre, bir hizmetin veri toplama için bir HTTP uç noktasını açması gerekir. Geleneksel olarak, bu bir GET uç noktasıdır ve genellikle yol /metrics'tir.

Bu uç noktayı sağlamak için, önceden oluşturulmuş Prometheus Kaydı'nı kullanan ve aynı anda yeni bir Kayıt oluşturan iki kullanışlı işlev bulunmaktadır:

Tam kaynak kod: github.com/ThreeDotsLabs/watermill/components/metrics/http.go

// ...
// CreateRegistryAndServeHTTP, /metrics uç noktasını Prometheus'a açmak için belirtilen adreste bir HTTP sunucusu kurar.
// Metrik kaydı için yeni bir Prometheus Kaydı döndürür ve sunucuyu kapatmak için bir iptal işlevi döndürür.
func CreateRegistryAndServeHTTP(addr string) (registry *prometheus.Registry, cancel func()) {
	registry = prometheus.NewRegistry()
	return registry, ServeHTTP(addr, registry)
}

// ServeHTTP, /metrics uç noktasını Prometheus'a açmak için belirtilen adreste bir HTTP sunucusu kurar.
// Var olan bir Prometheus Kaydı kabul eder ve sunucuyu kapatmak için bir iptal işlevi döndürür.
func ServeHTTP(addr string, registry *prometheus.Registry) (cancel func()) {
// ...

Kullanım örneği aşağıdaki gibidir:

Tam kaynak kod: github.com/ThreeDotsLabs/watermill/_examples/basic/4-metrics/main.go

// ...
	prometheusRegistry, closeMetricsServer := metrics.CreateRegistryAndServeHTTP(*metricsAddr)
	defer closeMetricsServer()

	// Namespace ve alt sistem bırakılır
	metricsBuilder := metrics.NewPrometheusMetricsBuilder(prometheusRegistry, "", "")
	metricsBuilder.AddPrometheusRouterMetrics(router)
// ...

Örnek Uygulama

Panelin pratikte nasıl çalıştığını anlamak için, metrik örneğine başvurabilirsiniz.

Örneğin README'deki talimatları takip ederek çalıştırabilir ve Grafana'ya Prometheus veri kaynağını ekleyebilirsiniz.

Grafana Paneli

Yukarıda bahsedilen metrik uygulamasıyla birlikte kullanmak üzere bir Grafana paneli hazırladık. Bu, verimlilik, hata oranı ve yayın/işleme süresi hakkında temel bilgiler sağlar.

Bu paneli yerel olarak görüntülemek istiyorsanız, örnek uygulamayı kullanabilirsiniz.

Prometheus'a aktarılan metrikler hakkında daha fazla bilgi için, Aktarılan metrikler bölümüne bakın.

Paneli İçe Aktarma

Grafana panelini içe aktarmak için, sol menüden Dashboard/Yönet'i seçin, ardından +İçe Aktara tıklayın.

Panelin URL'sini https://grafana.com/dashboards/9777 (veya sadece ID'sini, 9777) girin, ardından Yükle'ye tıklayın.

Paneli İçe Aktar

Daha sonra /metrics uç noktasını sıyırma için kullanılan Prometheus veri kaynağını seçin. İçe Aktara tıklayın ve işlem tamam!

Dışa Aktarılan Ölçümler

Aşağıdaki, PrometheusMetricsBuilder tarafından Prometheus kayıt defterine kaydedilen tüm ölçümleri listeler.

Prometheus ölçüm türleri hakkında daha fazla bilgi için lütfen Prometheus belgelerine başvurun.

Nesne Ölçüm Açıklama Etiketler/Değerler
Abone subscriber_messages_received_total Bir Prometheus sayaçıdır. Bir abone tarafından alınan mesaj sayısını sayar. acked "acked" veya "nacked" olabilir.
Eğer abone bir işleyici içerisinde çalışıyorsa, handler_name ayarlayınız; aksi halde, "<işleyici yok>".
subscriber_name, aboneyi tanımlar. Eğer fmt.Stringer arabirimini uyguluyorsa, String() sonucudur; aksi halde, package.structName'dir.
İşleyici handler_execution_time_seconds Bir Prometheus histogramdır. Ara yazılım tarafından sarmalanmış işleyici işlevinin yürütme süresini kaydeder. handler_name, işleyicinin adıdır.
success, sarmalanmış işleyici işlevinin bir hata döndürüp döndürmediğine bağlı olarak "true" veya "false" olabilir.
Yayıncı publish_time_seconds Bir Prometheus histogramdır. Yayıncının süslenmiş yayın işlevinin yürütme süresini kaydeder. success, süslenmiş yayıncının bir hata döndürüp döndürmediğine bağlı olarak "true" veya "false" olabilir.
Eğer yayıncı bir işleyici içerisinde çalışıyorsa, handler_name ayarlayınız; aksi halde, "<işleyici yok>".
publisher_name, yayıncıyı tanımlar. Eğer fmt.Stringer arabirimini uyguluyorsa, String() sonucudur; aksi halde, package.structName'dir.

Ayrıca, her ölçümün, Prometheus tarafından sağlanan bir node etiketi vardır ve değeri, ölçüm kaynağının örneğine karşılık gelir ve Prometheus yapılandırma dosyası'ndaki iş adı olarak belirtilmiş bir job vardır.

Not: Yukarıda belirtildiği gibi, boş olmayan bir namespace veya subsystem kullanmak, bir ölçüm adı öneki sonucunu doğuracaktır. Grafana gösteriminde panel tanımı gibi karşılıklı ayarlamalar yapmanız gerekebilir.

Özelleştirme

Belirli bir ölçümün göz ardı edildiğine inanıyorsanız, bu temel uygulamayı kolayca genişletebilirsiniz. En iyi yol, ServeHTTP yöntemi ile kullanılan Prometheus kayıt defterini kullanmak ve ölçümleri Prometheus istemci belgesi'nde belirtilen şekilde kaydetmektir.

Bu ölçümleri güncellemenin öz bir yöntemi dekoratörler kullanmaktır. Tüm kaynak kodu github.com/ThreeDotsLabs/watermill/message/decorator.go'da bulunabilir.

// ...
// MessageTransformSubscriberDecorator, her bir mesaj üzerinden geçen dönüşüm işlevini çağıran bir abone dekoratörü oluşturur.
func MessageTransformSubscriberDecorator(transform func(*Message)) SubscriberDecorator {
	if transform == nil {
		panic("dönüşüm işlevi nil")
	}
	return func(sub Subscriber) (Subscriber, error) {
		return &messageTransformSubscriberDecorator{
			sub:       sub,
			transform: transform,
		}, nil
	}
}

// MessageTransformPublisherDecorator, her bir mesaj üzerinden geçen dönüşüm işlevini çağıran bir yayıncı dekoratörü oluşturur.
func MessageTransformPublisherDecorator(transform func(*Message)) PublisherDecorator {
	if transform == nil {
		panic("dönüşüm işlevi nil")
	}
	return func(pub Publisher) (Publisher, error) {
		return &messageTransformPublisherDecorator{
			Publisher: pub,
			transform: transform,
		}, nil
	}
}

type messageTransformSubscriberDecorator struct {
// ...

Ve/veya yönlendiricilerdeki ara yazılımlar.

Daha basit bir yöntem, gerekli ölçümleri yalnızca işleyici işlevlerinde güncellemektir.