Kami merekomendasikan menggunakan alat pemantauan seperti Prometheus untuk memantau proses pekerjaan dan antrian pada lingkungan produksi.

Metrik Antrian

Jika Anda menggunakan Web UI, Anda dapat mengaktifkan integrasi dengan Prometheus dengan memberikan dua parameter:

  • --enable-metrics-exporter: Aktifkan pengumpulan metrik antrian dan ekspor ke endpoint /metrics.
  • --prometheus-addr: Aktifkan visualisasi metrik antrian dalam Web UI.

Halaman metrik antrian terlihat seperti ini:

Screen Shot 2021-12-19 at 4 37 19 PM

Jika Anda tidak menggunakan Web UI, Asynq dilengkapi dengan file biner yang dapat Anda jalankan untuk mengekspor metrik antrian. Ini juga termasuk paket x/metrics untuk mengumpulkan metrik antrian.

Metrik Proses Pekerjaan

Antarmuka Handler Asynq dan ServeMux dapat diinstrumentasi dengan metrik untuk observabilitas.

Berikut contoh pengeluaran metrik proses pekerjaan menggunakan Prometheus. Kita dapat menginstrumen kode kita dalam aplikasi untuk melacak metrik aplikasi khusus tambahan, serta metrik default (seperti memori dan CPU) yang dilacak oleh Prometheus.

Berikut adalah daftar metrik khusus aplikasi yang dilacak dalam contoh kode:

  • Total jumlah tugas yang diproses oleh proses pekerjaan (termasuk tugas yang berhasil maupun yang gagal).
  • Jumlah tugas gagal yang diproses oleh proses pekerjaan.
  • Jumlah tugas saat ini yang sedang diproses oleh proses pekerjaan.
package main

import (
    "context"
    "log"
    "net/http"
    "os"
    "os/signal"
    "runtime"

    "github.com/hibiken/asynq"
    "github.com/hibiken/asynq/examples/tasks"
    "github.com/prometheus/client_golang/prometheus"
    "github.com/prometheus/client_golang/prometheus/promauto"
    "github.com/prometheus/client_golang/prometheus/promhttp"
    "golang.org/x/sys/unix"
)

// Variabel metrik.
var (
    processedCounter = promauto.NewCounterVec(
        prometheus.CounterOpts{
            Name: "processed_tasks_total",
            Help: "Total jumlah tugas yang diproses",
        },
        []string{"tipe_tugas"},
    )

    failedCounter = promauto.NewCounterVec(
        prometheus.CounterOpts{
            Name: "failed_tasks_total",
            Help: "Total jumlah tugas gagal yang diproses",
        },
        []string{"tipe_tugas"},
    )

    inProgressGauge = promauto.NewGaugeVec(
        prometheus.GaugeOpts{
            Name: "in_progress_tasks",
            Help: "Jumlah saat ini tugas yang sedang diproses",
        },
        []string{"tipe_tugas"},
    )
)

func metricsMiddleware(next asynq.Handler) asynq.Handler {
    return asynq.HandlerFunc(func(ctx context.Context, t *asynq.Task) error {
        inProgressGauge.WithLabelValues(t.Type()).Inc()
        err := next.ProcessTask(ctx, t)
        inProgressGauge.WithLabelValues(t.Type()).Dec()
        if err != nil {
            failedCounter.WithLabelValues(t.Type()).Inc()
        }
        processedCounter.WithLabelValues(t.Type()).Inc()
        return err
    })
}

func main() {
    httpServeMux := http.NewServeMux()
    httpServeMux.Handle("/metrics", promhttp.Handler())
    metricsSrv := &http.Server{
        Addr:    ":2112",
        Handler: httpServeMux,
    }
    done := make(chan struct{})

    // Mulai server metrik.
    go func() {
        err := metricsSrv.ListenAndServe()
        if err != nil && err != http.ErrServerClosed {
            log.Printf("Error: server metrik error: %v", err)
        }
        close(done)
    }()

    srv := asynq.NewServer(
        asynq.RedisClientOpt{Addr: ":6379"},
        asynq.Config{Concurrency: 20},
    )

    mux := asynq.NewServeMux()
    mux.Use(metricsMiddleware)
    mux.HandleFunc(tasks.TypeEmail, tasks.HandleEmailTask)

    // Mulai server pekerja.
    if err := srv.Start(mux); err != nil {
        log.Fatalf("Gagal memulai server pekerja: %v", err)
    }

    // Tunggu sinyal terminasi.
    sigs := make(chan os.Signal, 1)
    signal.Notify(sigs, unix.SIGTERM, unix.SIGINT)
}