Herramientas de Monitoreo

Se recomienda utilizar herramientas de monitoreo como Prometheus para monitorear los procesos de trabajo y las colas en el entorno de producción.

Métricas de Cola

Si estás utilizando la Interfaz de Usuario Web, puedes habilitar la integración con Prometheus proporcionando dos parámetros:

  • --enable-metrics-exporter: Habilita la recopilación de métricas de cola y las exporta al punto final /metrics.
  • --prometheus-addr: Habilita la visualización de métricas de cola dentro de la Interfaz de Usuario Web.

La página de métricas de cola luce así:

Screen Shot 2021-12-19 at 4 37 19 PM

Si no estás utilizando la Interfaz de Usuario Web, Asynq viene con un archivo binario que puedes ejecutar para exportar métricas de cola. También incluye un paquete x/metrics para recopilar métricas de cola.

Métricas de Proceso de Trabajo

La interfaz Handler de Asynq y ServeMux pueden ser instrumentados con métricas para observabilidad.

Aquí tienes un ejemplo de exportación de métricas de proceso de trabajo usando Prometheus. Podemos instrumentar nuestro código dentro de la aplicación para rastrear métricas específicas de la aplicación, así como métricas predeterminadas (como memoria y CPU) rastreadas por Prometheus.

Aquí tienes una lista de métricas específicas de la aplicación rastreadas en el código de ejemplo:

  • El número total de tareas procesadas por el proceso de trabajo (incluyendo tareas exitosas y fallidas).
  • El número de tareas fallidas procesadas por el proceso de trabajo.
  • El número actual de tareas siendo procesadas por el proceso de trabajo.
package main

import (
    "context"
    "log"
    "net/http"
    "os"
    "os/signal"
    "runtime"

    "github.com/hibiken/asynq"
    "github.com/hibiken/asynq/examples/tasks"
    "github.com/prometheus/client_golang/prometheus"
    "github.com/prometheus/client_golang/prometheus/promauto"
    "github.com/prometheus/client_golang/prometheus/promhttp"
    "golang.org/x/sys/unix"
)

// Variables de Métricas.
var (
    processedCounter = promauto.NewCounterVec(
        prometheus.CounterOpts{
            Name: "processed_tasks_total",
            Help: "Número total de tareas procesadas",
        },
        []string{"tipo_tarea"},
    )

    failedCounter = promauto.NewCounterVec(
        prometheus.CounterOpts{
            Name: "failed_tasks_total",
            Help: "Número total de tareas fallidas procesadas",
        },
        []string{"tipo_tarea"},
    )

    inProgressGauge = promauto.NewGaugeVec(
        prometheus.GaugeOpts{
            Name: "tareas_en_proceso",
            Help: "Número actual de tareas siendo procesadas",
        },
        []string{"tipo_tarea"},
    )
)

func metricsMiddleware(next asynq.Handler) asynq.Handler {
    return asynq.HandlerFunc(func(ctx context.Context, t *asynq.Task) error {
        inProgressGauge.WithLabelValues(t.Type()).Inc()
        err := next.ProcessTask(ctx, t)
        inProgressGauge.WithLabelValues(t.Type()).Dec()
        if err != nil {
            failedCounter.WithLabelValues(t.Type()).Inc()
        }
        processedCounter.WithLabelValues(t.Type()).Inc()
        return err
    })
}

func main() {
    httpServeMux := http.NewServeMux()
    httpServeMux.Handle("/metrics", promhttp.Handler())
    metricsSrv := &http.Server{
        Addr:    ":2112",
        Handler: httpServeMux,
    }
    done := make(chan struct{})

    // Inicia el servidor de métricas.
    go func() {
        err := metricsSrv.ListenAndServe()
        if err != nil && err != http.ErrServerClosed {
            log.Printf("Error: servidor de métricas con error: %v", err)
        }
        close(done)
    }()

    srv := asynq.NewServer(
        asynq.RedisClientOpt{Addr: ":6379"},
        asynq.Config{Concurrency: 20},
    )

    mux := asynq.NewServeMux()
    mux.Use(metricsMiddleware)
    mux.HandleFunc(tasks.TypeEmail, tasks.HandleEmailTask)

    // Inicia el servidor de trabajador.
    if err := srv.Start(mux); err != nil {
        log.Fatalf("Fallo al iniciar el servidor de trabajador: %v", err)
    }

    // Espera señales de terminación.
    sigs := make(chan os.Signal, 1)
    signal.Notify(sigs, unix.SIGTERM, unix.SIGINT)
}