Üretim ortamında iş süreçlerinizi ve kuyruklarınızı izlemek için Prometheus gibi izleme araçlarını kullanmanızı öneririz.

Kuyruk Metrikleri

Eğer Web UI kullanıyorsanız, Prometheus entegrasyonunu sağlamak için aşağıdaki iki parametreyi sağlayarak yapılandırabilirsiniz:

  • --enable-metrics-exporter: Kuyruk metriklerinin toplanmasını ve /metrics uç noktasına aktarılmasını etkinleştirin.
  • --prometheus-addr: Web UI içinde kuyruk metriklerinin görselleştirilmesini etkinleştirin.

Kuyruk metrikleri sayfası aşağıdaki gibi görünür:

Screen Shot 2021-12-19 at 4 37 19 PM

Eğer Web UI kullanmıyorsanız, Asynq bir ikinci seçenek olarak kuyruk metriklerini aktarmak için çalıştırabileceğiniz bir ikili dosya ile gelir. Ayrıca kuyruk metriklerini toplamak için x/metrics paketini içermektedir.

İş Süreci Metrikleri

AsynqHandler arayüzü ve ServeMux metriklerle gözlemlenebilir hale getirilebilir.

Aşağıda Prometheus kullanarak iş süreci metriklerinin nasıl aktarıldığına dair bir örnek bulunmaktadır. Kodumuzu uygulama içinde enstrümantasyon ekleyerek, Prometheus tarafından izlenen varsayılan metriklerin yanı sıra ek uygulama özgü metriklerini de izleyebiliriz.

Örnek kod içinde izlenen uygulama özgü metriklerin listesi aşağıdaki gibidir:

  • İş süreci tarafından işlenen toplam görev sayısı (başarılı ve başarısız görevlerin hepsi dahil).
  • İş süreci tarafından işlenen başarısız görevlerin sayısı.
  • İş süreci tarafından işlenmekte olan mevcut görev sayısı.
package main

import (
    "context"
    "log"
    "net/http"
    "os"
    "os/signal"
    "runtime"

    "github.com/hibiken/asynq"
    "github.com/hibiken/asynq/examples/tasks"
    "github.com/prometheus/client_golang/prometheus"
    "github.com/prometheus/client_golang/prometheus/promauto"
    "github.com/prometheus/client_golang/prometheus/promhttp"
    "golang.org/x/sys/unix"
)

// Metrik değişkenleri.
var (
    processedCounter = promauto.NewCounterVec(
        prometheus.CounterOpts{
            Name: "processed_tasks_total",
            Help: "Toplam işlenen görev sayısı",
        },
        []string{"task_type"},
    )

    failedCounter = promauto.NewCounterVec(
        prometheus.CounterOpts{
            Name: "failed_tasks_total",
            Help: "İşlenen başarısız görevlerin toplam sayısı",
        },
        []string{"task_type"},
    )

    inProgressGauge = promauto.NewGaugeVec(
        prometheus.GaugeOpts{
            Name: "in_progress_tasks",
            Help: "İşlenmekte olan mevcut görev sayısı",
        },
        []string{"task_type"},
    )
)

func metricsMiddleware(next asynq.Handler) asynq.Handler {
    return asynq.HandlerFunc(func(ctx context.Context, t *asynq.Task) error {
        inProgressGauge.WithLabelValues(t.Type()).Inc()
        err := next.ProcessTask(ctx, t)
        inProgressGauge.WithLabelValues(t.Type()).Dec()
        if err != nil {
            failedCounter.WithLabelValues(t.Type()).Inc()
        }
        processedCounter.WithLabelValues(t.Type()).Inc()
        return err
    })
}

func main() {
    httpServeMux := http.NewServeMux()
    httpServeMux.Handle("/metrics", promhttp.Handler())
    metricsSrv := &http.Server{
        Addr:    ":2112",
        Handler: httpServeMux,
    }
    done := make(chan struct{})

    // Metrik sunucusunu başlat.
    go func() {
        err := metricsSrv.ListenAndServe()
        if err != nil && err != http.ErrServerClosed {
            log.Printf("Hata: Metrik sunucusu hata verdi: %v", err)
        }
        close(done)
    }()

    srv := asynq.NewServer(
        asynq.RedisClientOpt{Addr: ":6379"},
        asynq.Config{Concurrency: 20},
    )

    mux := asynq.NewServeMux()
    mux.Use(metricsMiddleware)
    mux.HandleFunc(tasks.TypeEmail, tasks.HandleEmailTask)

    // İşçi sunucusunu başlat.
    if err := srv.Start(mux); err != nil {
        log.Fatalf("İşçi sunucusunu başlatma başarısız oldu: %v", err)
    }

    // Sonlandırma sinyallerini bekleyin.
    sigs := make(chan os.Signal, 1)
    signal.Notify(sigs, unix.SIGTERM, unix.SIGINT)
}