Prometheusを使用したWatermillのリアルタイム監視

メトリクス

Watermillは、パブリッシャー/サブスクライバーのデコレータやハンドラのためのミドルウェアを使用して監視できます。公式のPrometheus client for Goを使用したデフォルトの実装を提供しています。

components/metricsパッケージはPrometheusMetricsBuilderをエクスポートし、関連するPrometheusレジストリを更新するための便利な機能を提供します。

フルソースコード:github.com/ThreeDotsLabs/watermill/components/metrics/builder.go

// ...
// PrometheusMetricsBuilderは、パブリッシャー、サブスクライバー、ハンドラをデコレートするためのメソッドを提供します。
type PrometheusMetricsBuilder struct {
    // PrometheusRegistryは既存のPrometheusレジストリを埋めることができます。またはデフォルトのレジストリを使用するために空にすることができます。
    PrometheusRegistry prometheus.Registerer

    Namespace string
    Subsystem string
}

// AddPrometheusRouterMetricsは、メッセージルーター上のすべてのハンドラにメトリックミドルウェアを追加するための便利な機能です。また、ハンドラのパブリッシャーやサブスクライバーをデコレートします。
func (b PrometheusMetricsBuilder) AddPrometheusRouterMetrics(r *message.Router) {
// ...

パブリッシャー、サブスクライバー、およびハンドラのデコレーション

Watermillのルーターを使用している場合(ほとんどの場合に推奨されます)、便利な関数AddPrometheusRouterMetricsを使用して、このルーターに追加されたすべてのハンドラがPrometheusレジストリを更新するためにラップされるようにすることができます。さらに、それらのパブリッシャーやサブスクライバーも同様にデコレートされます。

フルソースコード:github.com/ThreeDotsLabs/watermill/components/metrics/builder.go

// ...
// AddPrometheusRouterMetricsは、メッセージルーター上のすべてのハンドラにメトリックミドルウェアを追加するための便利な機能です。また、ハンドラのパブリッシャーやサブスクライバーをデコレートします。
func (b PrometheusMetricsBuilder) AddPrometheusRouterMetrics(r *message.Router) {
    r.AddPublisherDecorators(b.DecoratePublisher)
    r.AddSubscriberDecorators(b.DecorateSubscriber)
    r.AddMiddleware(b.NewRouterMiddleware().Middleware)
}
// ...

AddPrometheusRouterMetricsの使用例:

フルソースコード:github.com/ThreeDotsLabs/watermill/_examples/basic/4-metrics/main.go

// ...
    // namespaceとsubsystemのパラメータを空にしておきます
    metricsBuilder := metrics.NewPrometheusMetricsBuilder(prometheusRegistry, "", "")
    metricsBuilder.AddPrometheusRouterMetrics(router)
// ...

上記のコードスニペットでは、namespaceおよびsubsystemパラメータを空にしています。Prometheusクライアントライブラリは、これらのパラメータを使用してメトリック名にプレフィックスを追加します。namespaceやsubsystemを使用する必要があるかもしれませんが、これによってメトリック名が影響を受けるため、Grafanaダッシュボードを適切に調整する必要があります。

独立したパブリッシャーやサブスクライバーも、PrometheusMetricsBuilderの専用メソッドを使用してデコレートすることができます:

フルソースコード:github.com/ThreeDotsLabs/watermill/_examples/basic/4-metrics/main.go

// ...
    subWithMetrics, err := metricsBuilder.DecorateSubscriber(pubSub)
    if err != nil {
        panic(err)
    }
    pubWithMetrics, err := metricsBuilder.DecoratePublisher(pub)
    if err != nil {
        panic(err)
    }
// ...

/metricsエンドポイントの公開

Prometheusの動作原理によると、データスクレイピングのためにサービスはHTTPエンドポイントを公開する必要があります。通常、これはGETエンドポイントであり、パスは通常/metricsです。

このエンドポイントを提供するためには、以前に作成したPrometheus Registryを使用する関数と、新しいRegistryを同時に作成する別の関数があります。

完全なソースコード:github.com/ThreeDotsLabs/watermill/components/metrics/http.go

// ...
// CreateRegistryAndServeHTTPは、指定されたアドレスでHTTPサーバーを確立し、Prometheusに/metricsエンドポイントを公開します。
// これはメトリックの登録のための新しいPrometheus Registryと、サーバーをシャットダウンするためのキャンセル関数を返します。
func CreateRegistryAndServeHTTP(addr string) (registry *prometheus.Registry, cancel func()) {
	registry = prometheus.NewRegistry()
	return registry, ServeHTTP(addr, registry)
}

// ServeHTTPは、指定されたアドレスでHTTPサーバーを確立し、Prometheusに/metricsエンドポイントを公開します。
// 既存のPrometheus Registryを受け入れ、サーバーをシャットダウンするためのキャンセル関数を返します。
func ServeHTTP(addr string, registry *prometheus.Registry) (cancel func()) {
// ...

以下は使用例です:

完全なソースコード:github.com/ThreeDotsLabs/watermill/_examples/basic/4-metrics/main.go

// ...
	prometheusRegistry, closeMetricsServer := metrics.CreateRegistryAndServeHTTP(*metricsAddr)
	defer closeMetricsServer()

	// namespaceとsubsystemを空のままにします
	metricsBuilder := metrics.NewPrometheusMetricsBuilder(prometheusRegistry, "", "")
	metricsBuilder.AddPrometheusRouterMetrics(router)
// ...

サンプルアプリケーション

ダッシュボードが実際にどのように動作するかを理解するためには、メトリクスの例を参照できます。

この例のREADMEの手順に従って実行し、GrafanaにPrometheusのデータソースを追加してください。

Grafanaダッシュボード

前述のメトリクス実装で使用するGrafanaダッシュボードを準備しています。スループット、障害率、および公開/処理の期間に関する基本的な情報を提供します。

このダッシュボードをローカルで表示したい場合は、例のアプリケーションを使用できます。

Prometheusにエクスポートされるメトリクスの詳細については、「エクスポートされるメトリクス」を参照してください。

ダッシュボードのインポート

Grafanaダッシュボードをインポートするには、左メニューから[ダッシュボード/管理]を選択し、次に +インポート をクリックします。

ダッシュボードのURL https://grafana.com/dashboards/9777(またはIDである9777)を入力し、[ロード]をクリックします。

ダッシュボードのインポート

次に、/metricsエンドポイントをスクレイプするために使用されるPrometheusデータソースを選択します。[インポート]をクリックして完了です!

Exported Metrics(エクスポートされたメトリクス)

以下は、PrometheusMetricsBuilderによってPrometheusレジストリに登録されたすべてのメトリクスをリストしたものです。

Prometheusメトリックの種類についての詳細は、Prometheusのドキュメントを参照してください。

オブジェクト メトリック 説明 ラベル/値
Subscriber subscriber_messages_received_total Prometheusのカウンター。サブスクライバーが受信したメッセージの数をカウントします。 acked は "acked" または "nacked" です。
サブスクライバーがハンドラ内で動作する場合は、handler_nameを設定してください。それ以外の場合は、「」です。
subscriber_name はサブスクライバーを識別します。fmt.Stringerインタフェースを実装していれば、String()の結果です。そうでなければ、package.structNameです。
Handler handler_execution_time_seconds Prometheusのヒストグラム。ミドルウェアによってラップされたハンドラ関数の実行時間を記録します。 handler_name はハンドラの名前です。
success は、ラップされたハンドラ関数がエラーを返すかどうかに応じて "true" または "false" です。
Publisher publish_time_seconds Prometheusのヒストグラム。パブリッシャーのデコレートされたパブリッシュ関数の実行時間を記録します。 success は、デコレートされたパブリッシャーがエラーを返すかどうかに応じて "true" または "false" です。
パブリッシャーがハンドラ内で動作する場合は、handler_nameを設定してください。それ以外の場合は、「」です。
publisher_name はパブリッシャーを識別します。fmt.Stringerインタフェースを実装していれば、String()の結果です。そうでなければ、package.structNameです。

さらに、各メトリックにはPrometheusによって提供されたnodeラベルがあり、その値はメトリックソースのインスタンスに対応し、jobPrometheusの構成ファイルで指定されたジョブ名です。

注意: 前述のように、namespaceまたはsubsystemに空でない値を使用すると、メトリック名のプレフィックスが付加されます。それに伴うGrafanaダッシュボードのパネル定義など、対応する調整が必要になる場合があります。

カスタマイズ(Customization)

特定のメトリックが見落とされたと考える場合は、この基本実装を簡単に拡張できます。最良の方法は、ServeHTTPメソッドで使用されるPrometheusレジストリを使用して、Prometheusクライアントのドキュメントに従ってメトリックを登録することです。詳細はこちらを参照してください。

これらのメトリックを更新する簡潔な方法は、デコレータを使用することです。完全なソースコードはgithub.com/ThreeDotsLabs/watermill/message/decorator.goで見つけることができます。

// ...
// MessageTransformSubscriberDecoratorは、サブスクライバーをデコレートし、各メッセージに対してtransform関数を呼び出すサブスクライバーデコレータを作成します。
func MessageTransformSubscriberDecorator(transform func(*Message)) SubscriberDecorator {
	if transform == nil {
		panic("transform function is nil")
	}
	return func(sub Subscriber) (Subscriber, error) {
		return &messageTransformSubscriberDecorator{
			sub:       sub,
			transform: transform,
		}, nil
	}
}

// MessageTransformPublisherDecoratorは、パブリッシャーをデコレートし、各メッセージに対してtransform関数を呼び出すパブリッシャーデコレータを作成します。
func MessageTransformPublisherDecorator(transform func(*Message)) PublisherDecorator {
	if transform == nil {
		panic("transform function is nil")
	}
	return func(pub Publisher) (Publisher, error) {
		return &messageTransformPublisherDecorator{
			Publisher: pub,
			transform: transform,
		}, nil
	}
}

type messageTransformSubscriberDecorator struct {
// ...

または、ルーター内のミドルウェアでも可能です。

より簡単な方法は、必要なメトリックをハンドラ関数内でのみ更新することです。