Kami merekomendasikan menggunakan alat pemantauan seperti Prometheus untuk memantau proses pekerjaan dan antrian pada lingkungan produksi.
Metrik Antrian
Jika Anda menggunakan Web UI, Anda dapat mengaktifkan integrasi dengan Prometheus dengan memberikan dua parameter:
-
--enable-metrics-exporter
: Aktifkan pengumpulan metrik antrian dan ekspor ke endpoint/metrics
. -
--prometheus-addr
: Aktifkan visualisasi metrik antrian dalam Web UI.
Halaman metrik antrian terlihat seperti ini:
Jika Anda tidak menggunakan Web UI, Asynq dilengkapi dengan file biner yang dapat Anda jalankan untuk mengekspor metrik antrian. Ini juga termasuk paket x/metrics
untuk mengumpulkan metrik antrian.
Metrik Proses Pekerjaan
Antarmuka Handler
Asynq dan ServeMux
dapat diinstrumentasi dengan metrik untuk observabilitas.
Berikut contoh pengeluaran metrik proses pekerjaan menggunakan Prometheus. Kita dapat menginstrumen kode kita dalam aplikasi untuk melacak metrik aplikasi khusus tambahan, serta metrik default (seperti memori dan CPU) yang dilacak oleh Prometheus.
Berikut adalah daftar metrik khusus aplikasi yang dilacak dalam contoh kode:
- Total jumlah tugas yang diproses oleh proses pekerjaan (termasuk tugas yang berhasil maupun yang gagal).
- Jumlah tugas gagal yang diproses oleh proses pekerjaan.
- Jumlah tugas saat ini yang sedang diproses oleh proses pekerjaan.
package main
import (
"context"
"log"
"net/http"
"os"
"os/signal"
"runtime"
"github.com/hibiken/asynq"
"github.com/hibiken/asynq/examples/tasks"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/client_golang/prometheus/promhttp"
"golang.org/x/sys/unix"
)
// Variabel metrik.
var (
processedCounter = promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "processed_tasks_total",
Help: "Total jumlah tugas yang diproses",
},
[]string{"tipe_tugas"},
)
failedCounter = promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "failed_tasks_total",
Help: "Total jumlah tugas gagal yang diproses",
},
[]string{"tipe_tugas"},
)
inProgressGauge = promauto.NewGaugeVec(
prometheus.GaugeOpts{
Name: "in_progress_tasks",
Help: "Jumlah saat ini tugas yang sedang diproses",
},
[]string{"tipe_tugas"},
)
)
func metricsMiddleware(next asynq.Handler) asynq.Handler {
return asynq.HandlerFunc(func(ctx context.Context, t *asynq.Task) error {
inProgressGauge.WithLabelValues(t.Type()).Inc()
err := next.ProcessTask(ctx, t)
inProgressGauge.WithLabelValues(t.Type()).Dec()
if err != nil {
failedCounter.WithLabelValues(t.Type()).Inc()
}
processedCounter.WithLabelValues(t.Type()).Inc()
return err
})
}
func main() {
httpServeMux := http.NewServeMux()
httpServeMux.Handle("/metrics", promhttp.Handler())
metricsSrv := &http.Server{
Addr: ":2112",
Handler: httpServeMux,
}
done := make(chan struct{})
// Mulai server metrik.
go func() {
err := metricsSrv.ListenAndServe()
if err != nil && err != http.ErrServerClosed {
log.Printf("Error: server metrik error: %v", err)
}
close(done)
}()
srv := asynq.NewServer(
asynq.RedisClientOpt{Addr: ":6379"},
asynq.Config{Concurrency: 20},
)
mux := asynq.NewServeMux()
mux.Use(metricsMiddleware)
mux.HandleFunc(tasks.TypeEmail, tasks.HandleEmailTask)
// Mulai server pekerja.
if err := srv.Start(mux); err != nil {
log.Fatalf("Gagal memulai server pekerja: %v", err)
}
// Tunggu sinyal terminasi.
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, unix.SIGTERM, unix.SIGINT)
}