インストール

go get -u github.com/ThreeDotsLabs/watermill

準備

イベント駆動アプリケーションの基本的なアイデアは常に同じです:着信メッセージのリスニングとそれに対する反応です。Watermillは複数のパブリッシャーとサブスクライバーのためにこの動作を実装するのをサポートしています。

Watermillの中核部分はMessageであり、httpパッケージのhttp.Requestと同じくらい重要です。ほとんどのWatermillの機能はこの構造体をある方法で使用します。

PubSubライブラリが提供する複雑な機能にもかかわらず、Watermillではそれらを使用するためにはPublisherSubscriberの2つのインターフェースを実装するだけで十分です。

type Publisher interface {
	Publish(topic string, messages ...*Message) error
	Close() error
}

type Subscriber interface {
	Subscribe(ctx context.Context, topic string) (<-chan *Message, error)
}

メッセージの購読

メッセージの購読から始めましょう。Subscribeはトピック名を受け取り、着信メッセージを受信するためのチャンネルを返します。トピックの具体的な意味は、PubSubの実装に依存します。

messages, err := subscriber.Subscribe(ctx, "example.topic")
if err != nil {
	panic(err)
}

for msg := range messages {
	fmt.Printf("received message: %s, payload: %s\n", msg.UUID, string(msg.Payload))
	msg.Ack()
}

サポートされているPubSubの詳細な例については、次のコンテンツを参照してください。

Goチャンネルの例

完全な例のコード:github.com/ThreeDotsLabs/watermill/_examples/pubsubs/go-channel/main.go

// ...
package main

import (
	"context"
	"fmt"
	"time"

	"github.com/ThreeDotsLabs/watermill"
	"github.com/ThreeDotsLabs/watermill/message"
	"github.com/ThreeDotsLabs/watermill/pubsub/gochannel"
)

func main() {
	pubSub := gochannel.NewGoChannel(
		gochannel.Config{},
		watermill.NewStdLogger(false, false),
	)

messages, err := pubSub.Subscribe(context.Background(), "example.topic")
if err != nil {
panic(err)
}

go process(messages)
// ...

完全な例のコード:github.com/ThreeDotsLabs/watermill/_examples/pubsubs/go-channel/main.go

// ...
func process(messages <-chan *message.Message) {
	for msg := range messages {
	fmt.Printf("received message: %s, payload: %s\n", msg.UUID, string(msg.Payload))

// 受信および処理したことを確認する必要があります。そうしないと、メッセージが複数回再送されます。
msg.Ack()
}
}

Kafkaの例

完全なソースコード:github.com/ThreeDotsLabs/watermill/_examples/pubsubs/kafka/main.go

// ...
package main

import (
	"context"
	"log"
	"time"

	"github.com/Shopify/sarama"

	"github.com/ThreeDotsLabs/watermill"
	"github.com/ThreeDotsLabs/watermill-kafka/v2/pkg/kafka"
	"github.com/ThreeDotsLabs/watermill/message"
)

func main() {
	saramaSubscriberConfig := kafka.DefaultSaramaSubscriberConfig()
	// auto.offset.reset: earliestに相当
	saramaSubscriberConfig.Consumer.Offsets.Initial = sarama.OffsetOldest

	subscriber, err := kafka.NewSubscriber(
		kafka.SubscriberConfig{
			Brokers:               []string{"kafka:9092"},
			Unmarshaler:           kafka.DefaultMarshaler{},
			OverwriteSaramaConfig: saramaSubscriberConfig,
			ConsumerGroup:         "test_consumer_group",
		},
		watermill.NewStdLogger(false, false),
	)
	if err != nil {
		panic(err)
	}

	messages, err := subscriber.Subscribe(context.Background(), "example.topic")
	if err != nil {
		panic(err)
	}

	go process(messages)
// ...
}

完全なソースコード:github.com/ThreeDotsLabs/watermill/_examples/pubsubs/kafka/main.go

// ...
func process(messages <-chan *message.Message) {
	for msg := range messages {
		log.Printf("メッセージを受信しました: %s, ペイロード: %s", msg.UUID, string(msg.Payload))

		// 受信および処理が完了したことを確認する必要があります。さもなければ、メッセージは繰り返し送信されます。
		msg.Ack()
	}
}

RabbitMQ(AMQP)の例

完全なソースコード:github.com/ThreeDotsLabs/watermill/_examples/pubsubs/amqp/main.go

// ...
package main

import (
	"context"
	"log"
	"time"

	"github.com/ThreeDotsLabs/watermill"
	"github.com/ThreeDotsLabs/watermill-amqp/v2/pkg/amqp"
	"github.com/ThreeDotsLabs/watermill/message"
)

var amqpURI = "amqp://guest:guest@rabbitmq:5672/"

func main() {
	amqpConfig := amqp.NewDurableQueueConfig(amqpURI)

	subscriber, err := amqp.NewSubscriber(
		// この構成は以下の例に基づいています: https://www.rabbitmq.com/tutorials/tutorial-two-go.html
		// これは単純なキューとして使用されます。
		//
		// Pub/Subスタイルのサービスを実装する場合は、
		// https://watermill.io/pubsubs/amqp/#amqp-consumer-groups を参照してください
		amqpConfig,
		watermill.NewStdLogger(false, false),
	)
	if err != nil {
		panic(err)
	}

	messages, err := subscriber.Subscribe(context.Background(), "example.topic")
	if err != nil {
		panic(err)
	}

	go process(messages)
// ...
}

完全なソースコード:github.com/ThreeDotsLabs/watermill/_examples/pubsubs/amqp/main.go

// ...
func process(messages <-chan *message.Message) {
	for msg := range messages {
		log.Printf("メッセージを受信しました: %s, ペイロード: %s", msg.UUID, string(msg.Payload))

		// メッセージが受信および処理されたことを確認する必要があります。さもなければ、メッセージは繰り返し送信されます。
		msg.Ack()
	}
}

SQLの例

完全なソースコード:github.com/ThreeDotsLabs/watermill/_examples/pubsubs/sql/main.go

// ...
package main

import (
	"context"
	stdSQL "database/sql"
	"log"
	"time"

	driver "github.com/go-sql-driver/mysql"

	"github.com/ThreeDotsLabs/watermill"
	"github.com/ThreeDotsLabs/watermill-sql/v2/pkg/sql"
	"github.com/ThreeDotsLabs/watermill/message"
)

func main() {
	db := createDB()
	logger := watermill.NewStdLogger(false, false)

	subscriber, err := sql.NewSubscriber(
		db,
		sql.SubscriberConfig{
			SchemaAdapter:    sql.DefaultMySQLSchema{},
			OffsetsAdapter:   sql.DefaultMySQLOffsetsAdapter{},
			InitializeSchema: true,
		},
		logger,
	)
	if err != nil {
		panic(err)
	}

	messages, err := subscriber.Subscribe(context.Background(), "example_topic")
	if err != nil {
		panic(err)
	}

	go process(messages)
// ...

完全なソースコード:github.com/ThreeDotsLabs/watermill/_examples/pubsubs/sql/main.go

// ...
func process(messages <-chan *message.Message) {
	for msg := range messages {
		log.Printf("Received message: %s, payload: %s", msg.UUID, string(msg.Payload))

		// メッセージが受信され、処理されたことを確認する必要があります。
		// そうしないと、メッセージは繰り返し再配信されます。
		msg.Ack()
	}
}

メッセージの作成

Watermillはメッセージ形式を強制しません。NewMessageはペイロードをバイトスライスで期待しています。文字列、JSON、protobuf、Avro、gob、または[]byteにシリアル化できる他の形式を使用できます。

メッセージUUIDは任意ですが、デバッグに役立つため推奨されています。

msg := message.NewMessage(watermill.NewUUID(), []byte("こんにちは、世界!"))

メッセージのパブリッシュ

Publishメソッドはトピックと1つ以上のメッセージをパブリッシュする必要があります。

err := publisher.Publish("example.topic", msg)
if err != nil {
    panic(err)
}

Goチャネルの例

完全なソースコード:github.com/ThreeDotsLabs/watermill/_examples/pubsubs/go-channel/main.go

// ...
	go process(messages)

	publishMessages(pubSub)
}

func publishMessages(publisher message.Publisher) {
	for {
		msg := message.NewMessage(watermill.NewUUID(), []byte("こんにちは、世界!"))

		if err := publisher.Publish("example.topic", msg); err != nil {
			panic(err)
		}

		time.Sleep(time.Second)
// ...

Kafkaの例

完全なソースコード:github.com/ThreeDotsLabs/watermill/_examples/pubsubs/kafka/main.go

// ...
	go process(messages)

	publisher, err := kafka.NewPublisher(
		kafka.PublisherConfig{
			Brokers:   []string{"kafka:9092"},
			Marshaler: kafka.DefaultMarshaler{},
		},
		watermill.NewStdLogger(false, false),
	)
	if err != nil {
		panic(err)
	}

	publishMessages(publisher)
}

func publishMessages(publisher message.Publisher) {
	for {
		msg := message.NewMessage(watermill.NewUUID(), []byte("こんにちは、世界!"))

		if err := publisher.Publish("example.topic", msg); err != nil {
			panic(err)
		}

		time.Sleep(time.Second)
// ...

RabbitMQ (AMQP) Example

完全なソースコード:github.com/ThreeDotsLabs/watermill/_examples/pubsubs/amqp/main.go

// ...
	go process(messages)

	publisher, err := amqp.NewPublisher(amqpConfig, watermill.NewStdLogger(false, false))
	if err != nil {
		panic(err)
	}

	publishMessages(publisher)
}

func publishMessages(publisher message.Publisher) {
	for {
		msg := message.NewMessage(watermill.NewUUID(), []byte("こんにちは、世界!"))

		if err := publisher.Publish("example.topic", msg); err != nil {
			panic(err)
		}

		time.Sleep(time.Second)
// ...

SQL Example

完全なソースコード:github.com/ThreeDotsLabs/watermill/_examples/pubsubs/sql/main.go

// ...
	go process(messages)

	publisher, err := sql.NewPublisher(
		db,
		sql.PublisherConfig{
			SchemaAdapter: sql.DefaultMySQLSchema{},
		},
		logger,
	)
	if err != nil {
		panic(err)
	}

	publishMessages(publisher)
}

func createDB() *stdSQL.DB {
	conf := driver.NewConfig()
	conf.Net = "tcp"
	conf.User = "root"
	conf.Addr = "mysql"
	conf.DBName = "watermill"

	db, err := stdSQL.Open("mysql", conf.FormatDSN())
	if err != nil {
		panic(err)
	}

	err = db.Ping()
	if err != nil {
		panic(err)
	}

	return db
}

func publishMessages(publisher message.Publisher) {
	for {
		msg := message.NewMessage(watermill.NewUUID(), []byte(`{"message": "こんにちは、世界!"}`))

		if err := publisher.Publish("example_topic", msg); err != nil {
			panic(err)
		}

		time.Sleep(time.Second)
// ...

メッセージルーターの使用

パブリッシャーとサブスクライバーはWatermillの低レベルな部分です。ほとんどの場合、相関、メトリクス、ポイズンキュー、リトライ、およびレート制限などのより高レベルのインターフェースと機能を使用したいと考えることが一般的です。

メッセージが正常に処理された後にメッセージを確認する場合もあります。他の場合では、すぐに確認したいと考え、その後で処理を検討したいと思うかもしれません。時には、受信したメッセージに基づいて特定のアクションを実行し、応答として別のメッセージを発行したいこともあります。

これらの要件を満たすために、ルーターと呼ばれるコンポーネントがあります。

メッセージルーターの例アプリケーション

例アプリケーションのフローは次のとおりです。

  1. 1秒ごとにincoming_messages_topicでメッセージを生成します。
  2. struct_handler リスナーは incoming_messages_topic を処理します。メッセージを受信すると、UUID を出力し、 outgoing_messages_topic に新しいメッセージを生成します。
  3. print_incoming_messages ハンドラは incoming_messages_topic を監視し、メッセージのUUID、ペイロード、およびメタデータを出力します。
  4. print_outgoing_messages ハンドラは outgoing_messages_topic を監視し、メッセージのUUID、ペイロード、メタデータを出力します。相関IDは incoming_messages_topic のメッセージと同じである必要があります。

ルーターの設定

まず、プラグインとミドルウェアを追加してルーターを構成します。次に、ルーターが使用するハンドラを設定します。各ハンドラはメッセージを個別に処理します。

完全なソースコード:github.com/ThreeDotsLabs/watermill/_examples/basic/3-router/main.go

// ...
package main

import (
	"context"
	"fmt"
	"log"
	"time"

	"github.com/ThreeDotsLabs/watermill"
	"github.com/ThreeDotsLabs/watermill/message"
	"github.com/ThreeDotsLabs/watermill/message/router/middleware"
	"github.com/ThreeDotsLabs/watermill/message/router/plugin"
	"github.com/ThreeDotsLabs/watermill/pubsub/gochannel"
)

var (
	// この例では、シンプルなロガーの実装を使用していますが、
	// 独自の `watermill.LoggerAdapter` 実装を提供することもできます。
	logger = watermill.NewStdLogger(false, false)
)

func main() {
	router, err := message.NewRouter(message.RouterConfig{}, logger)
	if err != nil {
		panic(err)
	}

	// SIGTERMシグナルを受信した場合、SignalsHandlerがRouterを正常に閉じます。
	// `r.Close()` を呼び出すことでルーターを閉じることもできます。
	router.AddPlugin(plugin.SignalsHandler)

	// ルーターレベルのミドルウェアは、ルーターに送信されるすべてのメッセージに対して実行されます
	router.AddMiddleware(
		// CorrelationIDは、受信したメッセージのメタデータから相関IDを生成されたメッセージにコピーします
		middleware.CorrelationID,

		// ハンドラがエラーを返した場合、ハンドラ機能をリトライします。
		// MaxRetriesに達した後、メッセージはNackedされ、PubSubが再送信する責任を負います。
		middleware.Retry{
			MaxRetries:      3,
			InitialInterval: time.Millisecond * 100,
			Logger:          logger,
		}.Middleware,

		// Recovererはハンドラ内のパニックを処理します。
		// この場合、それらをエラーとしてRetryミドルウェアに渡します。
		middleware.Recoverer,
	)

	// 単純化のため、ここではgochannel Pub/Subを使用していますが、
	// それを任意のPub/Sub実装で置き換えることができ、効果は同じです。
	pubSub := gochannel.NewGoChannel(gochannel.Config{}, logger)

	// バックグラウンドでいくつかの受信メッセージを生成します
	go publishMessages(pubSub)

	// AddHandlerは、ハンドラレベルのミドルウェアの追加やハンドラの停止に使用できるハンドラを返します
	handlerr := router.AddHandler(
		"struct_handler",          // ハンドラ名、ユニークである必要があります
		"incoming_messages_topic", // イベントが読み取られるトピック
		pubSub,
		"outgoing_messages_topic", // イベントを送信するトピック
		pubSub,
		structHandler{}.Handler,
	)

	// ハンドラレベルのミドルウェアは特定のハンドラに対してのみ実行されます
	// この種のミドルウェアは、ルーターレベルのミドルウェアと同じ方法で追加できます
	handlerr.AddMiddleware(func(h message.HandlerFunc) message.HandlerFunc {
		return func(message *message.Message) ([]*message.Message, error) {
			log.Println("メッセージ", message.UUID, "のハンドラ固有のミドルウェアを実行しています")

			return h(message)
		}
	})

	// デバッグ目的で、"incoming_messages_topic" で受信したすべてのメッセージを表示します
	router.AddNoPublisherHandler(
		"print_incoming_messages",
		"incoming_messages_topic",
		pubSub,
		printMessages,
	)

	// デバッグ目的で、"outgoing_messages_topic" に送信されたすべてのイベントを表示します
	router.AddNoPublisherHandler(
		"print_outgoing_messages",
		"outgoing_messages_topic",
		pubSub,
		printMessages,
	)

	// すべてのハンドラが登録されたら、ルーターを実行します。
	// 実行はルーターが動作している間ブロックします。
	ctx := context.Background()
	if err := router.Run(ctx); err != nil {
		panic(err)
	}
}
// ...

受信メッセージ

struct_handlerincoming_messages_topic からメッセージを消費するため、バックグラウンドで publishMessages() を呼び出して受信トラフィックをシミュレートします。SetCorrelationID ミドルウェアが追加されたことに注意してください。ルーターは生成されたすべてのメッセージに相関IDを追加します(メタデータに格納されます)。

完全なソースコード: github.com/ThreeDotsLabs/watermill/_examples/basic/3-router/main.go

// …
func publishMessages(publisher message.Publisher) {
    for {
        msg := message.NewMessage(watermill.NewUUID(), []byte("Hello, world!"))
        middleware.SetCorrelationID(watermill.NewUUID(), msg)

        log.Printf("メッセージを送信中 %s, 相関 ID: %s\n", msg.UUID, middleware.MessageCorrelationID(msg))

        if err := publisher.Publish("incoming_messages_topic", msg); err != nil {
            panic(err)
        }

        time.Sleep(time.Second)
    }
}
// …

ハンドラー

ハンドラー関数 には、2つのタイプがあることに気付いたかもしれません:

  1. 関数 func(msg *message.Message) ([]*message.Message, error)
  2. メソッド func (c structHandler) Handler(msg *message.Message) ([]*message.Message, error)

ハンドラーが依存関係を持たない関数である場合、最初のオプションを使用することが適しています。ハンドラーにデータベースハンドル、ロガーなどの依存関係が必要な場合は、2番目のオプションが役立ちます。

完全なソースコード: github.com/ThreeDotsLabs/watermill/_examples/basic/3-router/main.go

// …
func printMessages(msg *message.Message) error {
    fmt.Printf(
        "\n> メッセージを受信:%s\n> %s\n> メタデータ:%v\n\n",
        msg.UUID, string(msg.Payload), msg.Metadata,
    )
    return nil
}

type structHandler struct {
    // ここにいくつかの依存関係を追加できます
}

func (s structHandler) Handler(msg *message.Message) ([]*message.Message, error) {
    log.Println("structHandler がメッセージを受け取りました", msg.UUID)

    msg = message.NewMessage(watermill.NewUUID(), []byte("structHandler によって生成されたメッセージ"))
    return message.Messages{msg}, nil
}

完了!

この例を実行するには go run main.go を実行してください。