Zalecamy korzystanie z narzędzi monitorujących, takich jak Prometheus, do monitorowania procesów prac i kolejek w środowisku produkcyjnym.
Metryki kolejki
Jeśli korzystasz z interfejsu internetowego (Web UI), możesz włączyć integrację z Prometeuszem, dostarczając dwóch parametrów:
-
--enable-metrics-exporter
: Włącz zbieranie metryk kolejki i eksportuj je do punktu końcowego/metrics
. -
--prometheus-addr
: Włącz wizualizację metryk kolejki w interfejsie internetowym (Web UI).
Strona metryk kolejki wygląda tak:
Jeśli nie korzystasz z interfejsu internetowego (Web UI), Asynq jest dostarczane wraz z plikiem binarnym, który można uruchomić, aby eksportować metryki kolejki. Zawiera również pakiet x/metrics
do zbierania metryk kolejki.
Metryki procesu zadania
Interfejs Handler
i ServeMux
w Asynq mogą być zinstrumentowane metrykami do obserwacji.
Poniżej znajduje się przykład eksportowania metryk procesu zadania przy użyciu Prometheus. Możemy zinstrumentować nasz kod w aplikacji, aby śledzić dodatkowe metryki określone dla aplikacji oraz domyślne metryki (takie jak pamięć i CPU) śledzone przez Prometheus.
Poniżej znajduje się lista metryk określonych dla aplikacji śledzonych w przykładowym kodzie:
- Łączna liczba zadań przetworzonych przez proces zadania (obejmująca zarówno udane, jak i nieudane zadania).
- Liczba nieudanych zadań przetworzonych przez proces zadania.
- Aktualna liczba zadań przetwarzanych przez proces zadania.
package main
import (
"context"
"log"
"net/http"
"os"
"os/signal"
"runtime"
"github.com/hibiken/asynq"
"github.com/hibiken/asynq/examples/tasks"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/client_golang/prometheus/promhttp"
"golang.org/x/sys/unix"
)
// Zmienne metryk.
var (
processedCounter = promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "processed_tasks_total",
Help: "Całkowita liczba przetworzonych zadań",
},
[]string{"task_type"},
)
failedCounter = promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "failed_tasks_total",
Help: "Całkowita liczba nieudanych zadań przetworzonych",
},
[]string{"task_type"},
)
inProgressGauge = promauto.NewGaugeVec(
prometheus.GaugeOpts{
Name: "in_progress_tasks",
Help: "Aktualna liczba zadań przetwarzanych",
},
[]string{"task_type"},
)
)
func metricsMiddleware(next asynq.Handler) asynq.Handler {
return asynq.HandlerFunc(func(ctx context.Context, t *asynq.Task) error {
inProgressGauge.WithLabelValues(t.Type()).Inc()
err := next.ProcessTask(ctx, t)
inProgressGauge.WithLabelValues(t.Type()).Dec()
if err != nil {
failedCounter.WithLabelValues(t.Type()).Inc()
}
processedCounter.WithLabelValues(t.Type()).Inc()
return err
})
}
func main() {
httpServeMux := http.NewServeMux()
httpServeMux.Handle("/metrics", promhttp.Handler())
metricsSrv := &http.Server{
Addr: ":2112",
Handler: httpServeMux,
}
done := make(chan struct{})
// Uruchomienie serwera metryk.
go func() {
err := metricsSrv.ListenAndServe()
if err != nil && err != http.ErrServerClosed {
log.Printf("Błąd: serwer metryk zwrócił błąd: %v", err)
}
close(done)
}()
srv := asynq.NewServer(
asynq.RedisClientOpt{Addr: ":6379"},
asynq.Config{Concurrency: 20},
)
mux := asynq.NewServeMux()
mux.Use(metricsMiddleware)
mux.HandleFunc(tasks.TypeEmail, tasks.HandleEmailTask)
// Uruchomienie serwera pracowniczego.
if err := srv.Start(mux); err != nil {
log.Fatalf("Nieudane uruchomienie serwera pracowniczego: %v", err)
}
// Oczekiwanie na sygnały zakończenia.
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, unix.SIGTERM, unix.SIGINT)
}