Installation

go get -u github.com/ThreeDotsLabs/watermill

Vorbereitung

Die grundlegende Idee hinter ereignisgesteuerten Anwendungen ist immer die gleiche: das Lauschen auf eingehende Nachrichten und das Reagieren darauf. Watermill unterstützt die Implementierung dieses Verhaltens für mehrere Publisher und Subscriber.

Der Kern von Watermill ist die Message, die genauso wichtig ist wie http.Request im http-Paket. Die meisten Watermill-Funktionen verwenden diese Struktur in irgendeiner Weise.

Trotz der komplexen Funktionen, die von der PubSub-Bibliothek bereitgestellt werden, ist es für Watermill nur notwendig, zwei Schnittstellen zu implementieren, um sie zu verwenden: Publisher und Subscriber.

type Publisher interface {
	Publish(topic string, messages ...*Message) error
	Close() error
}

type Subscriber interface {
	Subscribe(ctx context.Context, topic string) (<-chan *Message, error)
}

Abonnieren von Nachrichten

Lassen Sie uns mit dem Abonnieren beginnen. Subscribe erwartet einen Themenname und gibt einen Kanal zurück, um eingehende Nachrichten zu empfangen. Die spezifische Bedeutung des Themas hängt von der PubSub-Implementierung ab.

messages, err := subscriber.Subscribe(ctx, "example.topic")
if err != nil {
	panic(err)
}

for msg := range messages {
	fmt.Printf("Nachricht erhalten: %s, Payload: %s\n", msg.UUID, string(msg.Payload))
	msg.Ack()
}

Für ausführliche Beispiele zu unterstützten PubSub siehe den folgenden Inhalt.

Go-Channel-Beispiel

Vollständiger Beispielcode: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/go-channel/main.go

// ...
package main

import (
	"context"
	"fmt"
	"time"

	"github.com/ThreeDotsLabs/watermill"
	"github.com/ThreeDotsLabs/watermill/message"
	"github.com/ThreeDotsLabs/watermill/pubsub/gochannel"
)

func main() {
	pubSub := gochannel.NewGoChannel(
		gochannel.Config{},
		watermill.NewStdLogger(false, false),
	)

messages, err := pubSub.Subscribe(context.Background(), "example.topic")
if err != nil {
	panic(err)
}

go process(messages)
// ...

Vollständiger Beispielcode: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/go-channel/main.go

// ...
func process(messages <-chan *message.Message) {
	for msg := range messages {
		fmt.Printf("Nachricht erhalten: %s, Payload: %s\n", msg.UUID, string(msg.Payload))

	// Wir müssen bestätigen, dass wir die Nachricht erhalten und verarbeitet haben,
	// ansonsten wird sie mehrmals erneut gesendet.
		msg.Ack()
	}
}

Kafka Beispiel

Vollständiger Quellcode: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/kafka/main.go

// ...
package main

import (
	"context"
	"log"
	"time"

	"github.com/Shopify/sarama"

	"github.com/ThreeDotsLabs/watermill"
	"github.com/ThreeDotsLabs/watermill-kafka/v2/pkg/kafka"
	"github.com/ThreeDotsLabs/watermill/message"
)

func main() {
	saramaSubscriberConfig := kafka.DefaultSaramaSubscriberConfig()
	// Entsprechend auto.offset.reset: earliest
	saramaSubscriberConfig.Consumer.Offsets.Initial = sarama.OffsetOldest

	subscriber, err := kafka.NewSubscriber(
		kafka.SubscriberConfig{
			Brokers:               []string{"kafka:9092"},
			Unmarshaler:           kafka.DefaultMarshaler{},
			OverwriteSaramaConfig: saramaSubscriberConfig,
			ConsumerGroup:         "test_consumer_group",
		},
		watermill.NewStdLogger(false, false),
	)
	if err != nil {
		panic(err)
	}

	messages, err := subscriber.Subscribe(context.Background(), "example.topic")
	if err != nil {
		panic(err)
	}

	go process(messages)
// ...
}

Vollständiger Quellcode: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/kafka/main.go

// ...
func process(messages <-chan *message.Message) {
	for msg := range messages {
		log.Printf("Nachricht erhalten: %s, Nutzlast: %s", msg.UUID, string(msg.Payload))

		// Wir müssen bestätigen, dass wir die Nachricht erhalten und verarbeitet haben,
		// Andernfalls wird die Nachricht wiederholt gesendet.
		msg.Ack()
	}
}

RabbitMQ (AMQP) Beispiel

Vollständiger Quellcode: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/amqp/main.go

// ...
package main

import (
	"context"
	"log"
	"time"

	"github.com/ThreeDotsLabs/watermill"
	"github.com/ThreeDotsLabs/watermill-amqp/v2/pkg/amqp"
	"github.com/ThreeDotsLabs/watermill/message"
)

var amqpURI = "amqp://guest:guest@rabbitmq:5672/"

func main() {
	amqpConfig := amqp.NewDurableQueueConfig(amqpURI)

	subscriber, err := amqp.NewSubscriber(
		// Diese Konfiguration basiert auf folgendem Beispiel: https://www.rabbitmq.com/tutorials/tutorial-two-go.html
		// Es wird als einfache Warteschlange verwendet.
		//
		// Wenn Sie einen Pub/Sub-ähnlichen Dienst implementieren möchten, beachten Sie bitte
		// https://watermill.io/pubsubs/amqp/#amqp-consumer-groups
		amqpConfig,
		watermill.NewStdLogger(false, false),
	)
	if err != nil {
		panic(err)
	}

	messages, err := subscriber.Subscribe(context.Background(), "example.topic")
	if err != nil {
		panic(err)
	}

	go process(messages)
// ...
}

Vollständiger Quellcode: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/amqp/main.go

// ...
func process(messages <-chan *message.Message) {
	for msg := range messages {
		log.Printf("Nachricht erhalten: %s, Nutzlast: %s", msg.UUID, string(msg.Payload))

		// Wir müssen bestätigen, dass die Nachricht erhalten und verarbeitet wurde,
		// Andernfalls wird die Nachricht wiederholt zugestellt.
		msg.Ack()
	}
}

SQL-Beispiel

Vollständiger Quellcode: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/sql/main.go

// ...
package main

import (
	"context"
	stdSQL "database/sql"
	"log"
	"time"

	driver "github.com/go-sql-driver/mysql"

	"github.com/ThreeDotsLabs/watermill"
	"github.com/ThreeDotsLabs/watermill-sql/v2/pkg/sql"
	"github.com/ThreeDotsLabs/watermill/message"
)

func main() {
	db := createDB()
	logger := watermill.NewStdLogger(false, false)

	subscriber, err := sql.NewSubscriber(
		db,
		sql.SubscriberConfig{
			SchemaAdapter:    sql.DefaultMySQLSchema{},
			OffsetsAdapter:   sql.DefaultMySQLOffsetsAdapter{},
			InitializeSchema: true,
		},
		logger,
	)
	if err != nil {
		panic(err)
	}

	messages, err := subscriber.Subscribe(context.Background(), "example_topic")
	if err != nil {
		panic(err)
	}

	go process(messages)
// ...

Vollständiger Quellcode: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/sql/main.go

// ...
func process(messages <-chan *message.Message) {
	for msg := range messages {
		log.Printf("Nachricht erhalten: %s, Payload: %s", msg.UUID, string(msg.Payload))

		// Wir müssen bestätigen, dass die Nachricht empfangen und verarbeitet wurde,
		// Andernfalls wird die Nachricht wiederholt zugestellt.
		msg.Ack()
	}
}

Nachricht erstellen

Watermill schreibt kein Nachrichtenformat vor. NewMessage erwartet, dass der Payload ein Byte-Array ist. Sie können Strings, JSON, protobuf, Avro, gob oder ein anderes Format verwenden, das in []byte serialisiert werden kann.

Die Nachrichten-UUID ist optional, aber es wird empfohlen, da sie bei der Fehlersuche hilft.

msg := message.NewMessage(watermill.NewUUID(), []byte("Hallo, Welt!"))

Nachricht veröffentlichen

Die Publish-Methode erfordert ein Thema und eine oder mehrere Nachrichten, die veröffentlicht werden sollen.

err := publisher.Publish("beispiel.thema", msg)
if err != nil {
    panic(err)
}

Go-Kanal-Beispiel

Vollständiger Quellcode: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/go-channel/main.go

// ...
	go process(messages)

	nachrichtenVeröffentlichen(pubSub)
}

func nachrichtenVeröffentlichen(verleger message.Publisher) {
	for {
		msg := message.NewMessage(watermill.NewUUID(), []byte("Hallo, Welt!"))

		if err := verleger.Publish("beispiel.thema", msg); err != nil {
			panic(err)
		}

		time.Sleep(time.Second)
// ...

Kafka-Beispiel

Vollständiger Quellcode: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/kafka/main.go

// ...
	go process(messages)

	verleger, err := kafka.NewPublisher(
		kafka.PublisherConfig{
			Brokers:   []string{"kafka:9092"},
			Marshaler: kafka.DefaultMarshaler{},
		},
		watermill.NewStdLogger(false, false),
	)
	if err != nil {
		panic(err)
	}

	nachrichtenVeröffentlichen(verleger)
}

func nachrichtenVeröffentlichen(verleger message.Publisher) {
	for {
		msg := message.NewMessage(watermill.NewUUID(), []byte("Hallo, Welt!"))

		if err := verleger.Publish("beispiel.thema", msg); err != nil {
			panic(err)
		}

		time.Sleep(time.Second)
// ...

RabbitMQ (AMQP) Beispiel

Vollständiger Quellcode: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/amqp/main.go

// ...
	go process(messages)

	publisher, err := amqp.NewPublisher(amqpConfig, watermill.NewStdLogger(false, false))
	if err != nil {
		panic(err)
	}

	publishMessages(publisher)
}

func publishMessages(publisher message.Publisher) {
	for {
		msg := message.NewMessage(watermill.NewUUID(), []byte("Hallo, Welt!"))

		if err := publisher.Publish("example.topic", msg); err != nil {
			panic(err)
		}

		time.Sleep(time.Second)
// ...

SQL Beispiel

Vollständiger Quellcode: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/sql/main.go

// ...
	go process(messages)

	publisher, err := sql.NewPublisher(
		db,
		sql.PublisherConfig{
			SchemaAdapter: sql.DefaultMySQLSchema{},
		},
		logger,
	)
	if err != nil {
		panic(err)
	}

	publishMessages(publisher)
}

func createDB() *stdSQL.DB {
	conf := driver.NewConfig()
	conf.Net = "tcp"
	conf.User = "root"
	conf.Addr = "mysql"
	conf.DBName = "watermill"

	db, err := stdSQL.Open("mysql", conf.FormatDSN())
	if err != nil {
		panic(err)
	}

	err = db.Ping()
	if err != nil {
		panic(err)
	}

	return db
}

func publishMessages(publisher message.Publisher) {
	for {
		msg := message.NewMessage(watermill.NewUUID(), []byte(`{"message": "Hallo, Welt!"}`))

		if err := publisher.Publish("example_topic", msg); err != nil {
			panic(err)
		}

		time.Sleep(time.Second)
// ...

Verwendung des Nachrichtenrouters

Publisher und Subscriber sind Teile des Wasserfalls mit niedrigerem Niveau. In den meisten Fällen möchten Sie wahrscheinlich höherstufige Schnittstellen und Funktionen wie Korrelationen, Metriken, Poison-Queues, Wiederholungsversuche und Rate-Limitierung verwenden.

Es kann sein, dass Sie die Nachricht erst dann bestätigen möchten, wenn sie erfolgreich verarbeitet wurde. In anderen Fällen möchten Sie sie möglicherweise sofort bestätigen und dann in Betracht ziehen, sie zu verarbeiten. Manchmal möchten Sie aufgrund der eingehenden Nachricht bestimmte Aktionen ausführen und als Antwort eine andere Nachricht veröffentlichen.

Um diese Anforderungen zu erfüllen, gibt es eine Komponente namens Router.

Beispielanwendung des Nachrichtenrouters

Der Ablauf der Beispielanwendung ist wie folgt:

  1. Generieren Sie alle Sekunde eine Nachricht auf dem incoming_messages_topic.
  2. Der Listener struct_handler behandelt das incoming_messages_topic. Nach Erhalt einer Nachricht druckt er die UUID und generiert eine neue Nachricht auf dem outgoing_messages_topic.
  3. Der Handler print_incoming_messages hört auf das incoming_messages_topic und druckt die UUID, Payload und Metadaten der Nachricht.
  4. Der Handler print_outgoing_messages hört auf das outgoing_messages_topic und druckt die UUID, Payload und Metadaten der Nachricht. Die Korrelations-ID sollte mit der Nachricht auf dem incoming_messages_topic übereinstimmen.

Router-Konfiguration

Konfigurieren Sie zuerst den Router, indem Sie Plugins und Middleware hinzufügen. Legen Sie dann die Handler fest, die der Router verwenden wird. Jeder Handler wird die Nachrichten unabhängig voneinander verarbeiten.

Vollständiger Quellcode: github.com/ThreeDotsLabs/watermill/_examples/basic/3-router/main.go

// ...
package main

import (
	"context"
	"fmt
	"log"
	"time"

	"github.com/ThreeDotsLabs/watermill"
	"github.com/ThreeDotsLabs/watermill/message"
	"github.com/ThreeDotsLabs/watermill/message/router/middleware"
	"github.com/ThreeDotsLabs/watermill/message/router/plugin"
	"github.com/ThreeDotsLabs/watermill/pubsub/gochannel"
)

var (
	// Für dieses Beispiel verwenden wir eine einfache Logger-Implementierung,
	// Sie wollen möglicherweise Ihre eigene `watermill.LoggerAdapter`-Implementierung bereitstellen.
	logger = watermill.NewStdLogger(false, false)
)

func main() {
	router, err := message.NewRouter(message.RouterConfig{}, logger)
	if err != nil {
		panic(err)
	}

	// Bei Empfang des SIGTERM-Signals wird der SignalsHandler den Router sauber schließen.
	// Sie können den Router auch durch Aufruf von `r.Close()` schließen.
	router.AddPlugin(plugin.SignalsHandler)

	// Router-Level-Middleware wird für jede Nachricht, die an den Router gesendet wird, ausgeführt
	router.AddMiddleware(
		// CorrelationID kopiert die Korrelations-ID aus den Metadaten der eingehenden Nachricht in die generierte Nachricht
		middleware.CorrelationID,

		// Wenn der Handler einen Fehler zurückgibt, wird die Handlerfunktion erneut versucht.
		// Nach Erreichen von MaxRetries wird die Nachricht Nacked und PubSub ist dafür verantwortlich, sie erneut zu senden.
		middleware.Retry{
			MaxRetries:      3,
			InitialInterval: time.Millisecond * 100,
			Logger:          logger,
		}.Middleware,

		// Recoverer behandelt Paniken im Handler.
		// In diesem Fall übergibt es sie als Fehler an das Retry-Middleware.
		middleware.Recoverer,
	)

	// Für die Einfachheit verwenden wir hier das gochannel-Pub/Sub,
	// Sie können es durch jede Pub/Sub-Implementierung ersetzen, und der Effekt wird derselbe sein.
	pubSub := gochannel.NewGoChannel(gochannel.Config{}, logger)

	// Generieren Sie im Hintergrund einige eingehende Nachrichten
	go publishMessages(pubSub)

	// AddHandler gibt einen Handler zurück, der verwendet werden kann, um handler-Level-Middleware hinzuzufügen
	// oder um den Handler zu stoppen.
	handler := router.AddHandler(
		"struct_handler",          // Handlername, muss eindeutig sein
		"incoming_messages_topic", // Thema, von dem Ereignisse gelesen werden
		pubSub,
		"outgoing_messages_topic", // Thema, zu dem Ereignisse veröffentlicht werden
		pubSub,
		structHandler{}.Handler,
	)

	// Handler-Level-Middleware wird nur für einen bestimmten Handler ausgeführt
	// Diese Art von Middleware kann auf die gleiche Weise wie Router-Level-Middleware hinzugefügt werden
	handler.AddMiddleware(func(h message.HandlerFunc) message.HandlerFunc {
		return func(message *message.Message) ([]*message.Message, error) {
			log.Println("Führe handler-spezifische Middleware für", message.UUID, "aus")

			return h(message)
		}
	})

	// Zum Debuggen drucken wir alle Nachrichten aus, die auf "incoming_messages_topic" erhalten wurden
	router.AddNoPublisherHandler(
		"print_incoming_messages",
		"incoming_messages_topic",
		pubSub,
		printMessages,
	)

	// Zum Debuggen drucken wir alle Ereignisse aus, die an "outgoing_messages_topic" gesendet wurden
	router.AddNoPublisherHandler(
		"print_outgoing_messages",
		"outgoing_messages_topic",
		pubSub,
		printMessages,
	)

	// Jetzt, da alle Handler registriert sind, führen wir den Router aus.
	// Run blockiert, während der Router läuft.
	ctx := context.Background()
	if err := router.Run(ctx); err != nil {
		panic(err)
	}
}
// ...

Eingehende Nachrichten

Der struct_handler verbraucht Nachrichten aus dem incoming_messages_topic, daher simulieren wir den eingehenden Verkehr, indem wir im Hintergrund publishMessages() aufrufen. Beachten Sie, dass wir das SetCorrelationID-Middleware hinzugefügt haben. Der Router fügt allen generierten Nachrichten eine Korrelations-ID hinzu (die in den Metadaten gespeichert wird).

Vollständiger Quellcode: github.com/ThreeDotsLabs/watermill/_examples/basic/3-router/main.go

// …
func publishMessages(publisher message.Publisher) {
    for {
        msg := message.NewMessage(watermill.NewUUID(), []byte("Hallo, Welt!"))
        middleware.SetCorrelationID(watermill.NewUUID(), msg)

        log.Printf("sende Nachricht %s, Korrelations-ID: %s\n", msg.UUID, middleware.MessageCorrelationID(msg))

        if err := publisher.Publish("incoming_messages_topic", msg); err != nil {
            panic(err)
        }

        time.Sleep(time.Second)
    }
}
// …

Handler

Sie haben vielleicht bemerkt, dass es zwei Arten von Handler-Funktionen gibt:

  1. Funktion func(msg *message.Message) ([]*message.Message, error)
  2. Methode func (c structHandler) Handler(msg *message.Message) ([]*message.Message, error)

Wenn Ihr Handler eine Funktion ist, die nicht von Abhängigkeiten abhängt, ist die Verwendung der ersten Option in Ordnung. Wenn Ihr Handler jedoch einige Abhängigkeiten benötigt (wie z. B. Datenbankverbindungen, Logger usw.), ist die zweite Option nützlich.

Vollständiger Quellcode: github.com/ThreeDotsLabs/watermill/_examples/basic/3-router/main.go

// …
func printMessages(msg *message.Message) error {
    fmt.Printf(
        "\n> Empfangene Nachricht: %s\n> %s\n> Metadaten: %v\n\n",
        msg.UUID, string(msg.Payload), msg.Metadata,
    )
    return nil
}

type structHandler struct {
    // Hier können wir einige Abhängigkeiten hinzufügen
}

func (s structHandler) Handler(msg *message.Message) ([]*message.Message, error) {
    log.Println("structHandler hat die Nachricht empfangen", msg.UUID)

    msg = message.NewMessage(watermill.NewUUID(), []byte("Nachricht vom structHandler erzeugt"))
    return message.Messages{msg}, nil
}

Fertig!

Sie können dieses Beispiel mit go run main.go ausführen.