お勧めしますが、本番環境でジョブプロセスやキューを監視するためには、Prometheusなどのモニタリングツールの使用をお勧めします。
キューメトリクス
Web UIを使用している場合、次の2つのパラメータを指定してPrometheusとの統合を有効にできます。
-
--enable-metrics-exporter
: キューメトリクスの収集を有効にし、それらを/metrics
エンドポイントにエクスポートします。 -
--prometheus-addr
: Web UI内でキューメトリクスの可視化を有効にします。
キューメトリクスページは以下のように表示されます:
Web UIを使用していない場合、Asynqにはキューメトリクスをエクスポートするために実行できるバイナリファイルが付属しています。また、キューメトリクスを収集するためのx/metrics
パッケージも含まれています。
ジョブプロセスメトリクス
AsynqのHandler
インターフェースとServeMux
は、可観測性のためにメトリクスを取得することができます。
以下は、Prometheusを使用してジョブプロセスメトリクスをエクスポートする例です。アプリケーション内でコードを計器化して、Prometheusによって追跡されるデフォルトメトリクス(メモリやCPUなど)に加えて、追加のアプリケーション固有のメトリクスを追跡することができます。
次の例コードでは、以下のアプリケーション固有のメトリクスが追跡されています:
- ジョブプロセスによって処理されたタスクの合計数(成功したタスクと失敗したタスクの両方を含む)。
- ジョブプロセスによって処理された失敗したタスクの数。
- 現在ジョブプロセスで処理中のタスクの数。
package main
import (
"context"
"log"
"net/http"
"os"
"os/signal"
"runtime"
"github.com/hibiken/asynq"
"github.com/hibiken/asynq/examples/tasks"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/client_golang/prometheus/promhttp"
"golang.org/x/sys/unix"
)
// メトリクス変数。
var (
processedCounter = promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "processed_tasks_total",
Help: "処理されたタスクの合計数",
},
[]string{"task_type"},
)
failedCounter = promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "failed_tasks_total",
Help: "処理された失敗タスクの合計数",
},
[]string{"task_type"},
)
inProgressGauge = promauto.NewGaugeVec(
prometheus.GaugeOpts{
Name: "in_progress_tasks",
Help: "現在処理中のタスクの数",
},
[]string{"task_type"},
)
)
func metricsMiddleware(next asynq.Handler) asynq.Handler {
return asynq.HandlerFunc(func(ctx context.Context, t *asynq.Task) error {
inProgressGauge.WithLabelValues(t.Type()).Inc()
err := next.ProcessTask(ctx, t)
inProgressGauge.WithLabelValues(t.Type()).Dec()
if err != nil {
failedCounter.WithLabelValues(t.Type()).Inc()
}
processedCounter.WithLabelValues(t.Type()).Inc()
return err
})
}
func main() {
httpServeMux := http.NewServeMux()
httpServeMux.Handle("/metrics", promhttp.Handler())
metricsSrv := &http.Server{
Addr: ":2112",
Handler: httpServeMux,
}
done := make(chan struct{})
// メトリクスサーバーを開始します。
go func() {
err := metricsSrv.ListenAndServe()
if err != nil && err != http.ErrServerClosed {
log.Printf("エラー: メトリクスサーバーでエラーが発生しました: %v", err)
}
close(done)
}()
srv := asynq.NewServer(
asynq.RedisClientOpt{Addr: ":6379"},
asynq.Config{Concurrency: 20},
)
mux := asynq.NewServeMux()
mux.Use(metricsMiddleware)
mux.HandleFunc(tasks.TypeEmail, tasks.HandleEmailTask)
// ワーカーサーバーを開始します。
if err := srv.Start(mux); err != nil {
log.Fatalf("ワーカーサーバーの開始に失敗しました: %v", err)
}
// 終了シグナルを待機します。
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, unix.SIGTERM, unix.SIGINT)
}