package main
import (
"context"
"log"
"net/http"
"os"
"os/signal"
"runtime"
"github.com/hibiken/asynq"
"github.com/hibiken/asynq/examples/tasks"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/client_golang/prometheus/promhttp"
"golang.org/x/sys/unix"
)
// Metrikvariablen.
var (
processedCounter = promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "processed_tasks_total",
Help: "Gesamtanzahl der verarbeiteten Aufgaben",
},
[]string{"task_type"},
)
failedCounter = promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "failed_tasks_total",
Help: "Gesamtanzahl der fehlgeschlagenen verarbeiteten Aufgaben",
},
[]string{"task_type"},
)
inProgressGauge = promauto.NewGaugeVec(
prometheus.GaugeOpts{
Name: "in_progress_tasks",
Help: "Aktuelle Anzahl der verarbeiteten Aufgaben",
},
[]string{"task_type"},
)
)
func metricsMiddleware(next asynq.Handler) asynq.Handler {
return asynq.HandlerFunc(func(ctx context.Context, t *asynq.Task) error {
inProgressGauge.WithLabelValues(t.Type()).Inc()
err := next.ProcessTask(ctx, t)
inProgressGauge.WithLabelValues(t.Type()).Dec()
if err != nil {
failedCounter.WithLabelValues(t.Type()).Inc()
}
processedCounter.WithLabelValues(t.Type()).Inc()
return err
})
}
func main() {
httpServeMux := http.NewServeMux()
httpServeMux.Handle("/metrics", promhttp.Handler())
metricsSrv := &http.Server{
Addr: ":2112",
Handler: httpServeMux,
}
done := make(chan struct{})
// Starte den Metrikserver.
go func() {
err := metricsSrv.ListenAndServe()
if err != nil && err != http.ErrServerClosed {
log.Printf("Fehler: Metrikserverfehler: %v", err)
}
close(done)
}()
srv := asynq.NewServer(
asynq.RedisClientOpt{Addr: ":6379"},
asynq.Config{Concurrency: 20},
)
mux := asynq.NewServeMux()
mux.Use(metricsMiddleware)
mux.HandleFunc(tasks.TypeEmail, tasks.HandleEmailTask)
// Starte den Worker-Server.
if err := srv.Start(mux); err != nil {
log.Fatalf("Fehler beim Starten des Worker-Servers: %v", err)
}
// Warte auf Beendigungssignale.
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, unix.SIGTERM, unix.SIGINT)
}