Installation
go get -u github.com/ThreeDotsLabs/watermill
Vorbereitung
Die grundlegende Idee hinter ereignisgesteuerten Anwendungen ist immer die gleiche: das Lauschen auf eingehende Nachrichten und das Reagieren darauf. Watermill unterstützt die Implementierung dieses Verhaltens für mehrere Publisher und Subscriber.
Der Kern von Watermill ist die Message, die genauso wichtig ist wie http.Request
im http
-Paket. Die meisten Watermill-Funktionen verwenden diese Struktur in irgendeiner Weise.
Trotz der komplexen Funktionen, die von der PubSub-Bibliothek bereitgestellt werden, ist es für Watermill nur notwendig, zwei Schnittstellen zu implementieren, um sie zu verwenden: Publisher
und Subscriber
.
type Publisher interface {
Publish(topic string, messages ...*Message) error
Close() error
}
type Subscriber interface {
Subscribe(ctx context.Context, topic string) (<-chan *Message, error)
}
Abonnieren von Nachrichten
Lassen Sie uns mit dem Abonnieren beginnen. Subscribe
erwartet einen Themenname und gibt einen Kanal zurück, um eingehende Nachrichten zu empfangen. Die spezifische Bedeutung des Themas hängt von der PubSub-Implementierung ab.
messages, err := subscriber.Subscribe(ctx, "example.topic")
if err != nil {
panic(err)
}
for msg := range messages {
fmt.Printf("Nachricht erhalten: %s, Payload: %s\n", msg.UUID, string(msg.Payload))
msg.Ack()
}
Für ausführliche Beispiele zu unterstützten PubSub siehe den folgenden Inhalt.
Go-Channel-Beispiel
Vollständiger Beispielcode: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/go-channel/main.go
// ...
package main
import (
"context"
"fmt"
"time"
"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill/message"
"github.com/ThreeDotsLabs/watermill/pubsub/gochannel"
)
func main() {
pubSub := gochannel.NewGoChannel(
gochannel.Config{},
watermill.NewStdLogger(false, false),
)
messages, err := pubSub.Subscribe(context.Background(), "example.topic")
if err != nil {
panic(err)
}
go process(messages)
// ...
Vollständiger Beispielcode: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/go-channel/main.go
// ...
func process(messages <-chan *message.Message) {
for msg := range messages {
fmt.Printf("Nachricht erhalten: %s, Payload: %s\n", msg.UUID, string(msg.Payload))
// Wir müssen bestätigen, dass wir die Nachricht erhalten und verarbeitet haben,
// ansonsten wird sie mehrmals erneut gesendet.
msg.Ack()
}
}
Kafka Beispiel
Vollständiger Quellcode: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/kafka/main.go
// ...
package main
import (
"context"
"log"
"time"
"github.com/Shopify/sarama"
"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill-kafka/v2/pkg/kafka"
"github.com/ThreeDotsLabs/watermill/message"
)
func main() {
saramaSubscriberConfig := kafka.DefaultSaramaSubscriberConfig()
// Entsprechend auto.offset.reset: earliest
saramaSubscriberConfig.Consumer.Offsets.Initial = sarama.OffsetOldest
subscriber, err := kafka.NewSubscriber(
kafka.SubscriberConfig{
Brokers: []string{"kafka:9092"},
Unmarshaler: kafka.DefaultMarshaler{},
OverwriteSaramaConfig: saramaSubscriberConfig,
ConsumerGroup: "test_consumer_group",
},
watermill.NewStdLogger(false, false),
)
if err != nil {
panic(err)
}
messages, err := subscriber.Subscribe(context.Background(), "example.topic")
if err != nil {
panic(err)
}
go process(messages)
// ...
}
Vollständiger Quellcode: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/kafka/main.go
// ...
func process(messages <-chan *message.Message) {
for msg := range messages {
log.Printf("Nachricht erhalten: %s, Nutzlast: %s", msg.UUID, string(msg.Payload))
// Wir müssen bestätigen, dass wir die Nachricht erhalten und verarbeitet haben,
// Andernfalls wird die Nachricht wiederholt gesendet.
msg.Ack()
}
}
RabbitMQ (AMQP) Beispiel
Vollständiger Quellcode: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/amqp/main.go
// ...
package main
import (
"context"
"log"
"time"
"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill-amqp/v2/pkg/amqp"
"github.com/ThreeDotsLabs/watermill/message"
)
var amqpURI = "amqp://guest:guest@rabbitmq:5672/"
func main() {
amqpConfig := amqp.NewDurableQueueConfig(amqpURI)
subscriber, err := amqp.NewSubscriber(
// Diese Konfiguration basiert auf folgendem Beispiel: https://www.rabbitmq.com/tutorials/tutorial-two-go.html
// Es wird als einfache Warteschlange verwendet.
//
// Wenn Sie einen Pub/Sub-ähnlichen Dienst implementieren möchten, beachten Sie bitte
// https://watermill.io/pubsubs/amqp/#amqp-consumer-groups
amqpConfig,
watermill.NewStdLogger(false, false),
)
if err != nil {
panic(err)
}
messages, err := subscriber.Subscribe(context.Background(), "example.topic")
if err != nil {
panic(err)
}
go process(messages)
// ...
}
Vollständiger Quellcode: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/amqp/main.go
// ...
func process(messages <-chan *message.Message) {
for msg := range messages {
log.Printf("Nachricht erhalten: %s, Nutzlast: %s", msg.UUID, string(msg.Payload))
// Wir müssen bestätigen, dass die Nachricht erhalten und verarbeitet wurde,
// Andernfalls wird die Nachricht wiederholt zugestellt.
msg.Ack()
}
}
SQL-Beispiel
Vollständiger Quellcode: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/sql/main.go
// ...
package main
import (
"context"
stdSQL "database/sql"
"log"
"time"
driver "github.com/go-sql-driver/mysql"
"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill-sql/v2/pkg/sql"
"github.com/ThreeDotsLabs/watermill/message"
)
func main() {
db := createDB()
logger := watermill.NewStdLogger(false, false)
subscriber, err := sql.NewSubscriber(
db,
sql.SubscriberConfig{
SchemaAdapter: sql.DefaultMySQLSchema{},
OffsetsAdapter: sql.DefaultMySQLOffsetsAdapter{},
InitializeSchema: true,
},
logger,
)
if err != nil {
panic(err)
}
messages, err := subscriber.Subscribe(context.Background(), "example_topic")
if err != nil {
panic(err)
}
go process(messages)
// ...
Vollständiger Quellcode: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/sql/main.go
// ...
func process(messages <-chan *message.Message) {
for msg := range messages {
log.Printf("Nachricht erhalten: %s, Payload: %s", msg.UUID, string(msg.Payload))
// Wir müssen bestätigen, dass die Nachricht empfangen und verarbeitet wurde,
// Andernfalls wird die Nachricht wiederholt zugestellt.
msg.Ack()
}
}
Nachricht erstellen
Watermill schreibt kein Nachrichtenformat vor. NewMessage
erwartet, dass der Payload ein Byte-Array ist. Sie können Strings, JSON, protobuf, Avro, gob oder ein anderes Format verwenden, das in []byte
serialisiert werden kann.
Die Nachrichten-UUID ist optional, aber es wird empfohlen, da sie bei der Fehlersuche hilft.
msg := message.NewMessage(watermill.NewUUID(), []byte("Hallo, Welt!"))
Nachricht veröffentlichen
Die Publish
-Methode erfordert ein Thema und eine oder mehrere Nachrichten, die veröffentlicht werden sollen.
err := publisher.Publish("beispiel.thema", msg)
if err != nil {
panic(err)
}
Go-Kanal-Beispiel
Vollständiger Quellcode: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/go-channel/main.go
// ...
go process(messages)
nachrichtenVeröffentlichen(pubSub)
}
func nachrichtenVeröffentlichen(verleger message.Publisher) {
for {
msg := message.NewMessage(watermill.NewUUID(), []byte("Hallo, Welt!"))
if err := verleger.Publish("beispiel.thema", msg); err != nil {
panic(err)
}
time.Sleep(time.Second)
// ...
Kafka-Beispiel
Vollständiger Quellcode: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/kafka/main.go
// ...
go process(messages)
verleger, err := kafka.NewPublisher(
kafka.PublisherConfig{
Brokers: []string{"kafka:9092"},
Marshaler: kafka.DefaultMarshaler{},
},
watermill.NewStdLogger(false, false),
)
if err != nil {
panic(err)
}
nachrichtenVeröffentlichen(verleger)
}
func nachrichtenVeröffentlichen(verleger message.Publisher) {
for {
msg := message.NewMessage(watermill.NewUUID(), []byte("Hallo, Welt!"))
if err := verleger.Publish("beispiel.thema", msg); err != nil {
panic(err)
}
time.Sleep(time.Second)
// ...
RabbitMQ (AMQP) Beispiel
Vollständiger Quellcode: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/amqp/main.go
// ...
go process(messages)
publisher, err := amqp.NewPublisher(amqpConfig, watermill.NewStdLogger(false, false))
if err != nil {
panic(err)
}
publishMessages(publisher)
}
func publishMessages(publisher message.Publisher) {
for {
msg := message.NewMessage(watermill.NewUUID(), []byte("Hallo, Welt!"))
if err := publisher.Publish("example.topic", msg); err != nil {
panic(err)
}
time.Sleep(time.Second)
// ...
SQL Beispiel
Vollständiger Quellcode: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/sql/main.go
// ...
go process(messages)
publisher, err := sql.NewPublisher(
db,
sql.PublisherConfig{
SchemaAdapter: sql.DefaultMySQLSchema{},
},
logger,
)
if err != nil {
panic(err)
}
publishMessages(publisher)
}
func createDB() *stdSQL.DB {
conf := driver.NewConfig()
conf.Net = "tcp"
conf.User = "root"
conf.Addr = "mysql"
conf.DBName = "watermill"
db, err := stdSQL.Open("mysql", conf.FormatDSN())
if err != nil {
panic(err)
}
err = db.Ping()
if err != nil {
panic(err)
}
return db
}
func publishMessages(publisher message.Publisher) {
for {
msg := message.NewMessage(watermill.NewUUID(), []byte(`{"message": "Hallo, Welt!"}`))
if err := publisher.Publish("example_topic", msg); err != nil {
panic(err)
}
time.Sleep(time.Second)
// ...
Verwendung des Nachrichtenrouters
Publisher und Subscriber sind Teile des Wasserfalls mit niedrigerem Niveau. In den meisten Fällen möchten Sie wahrscheinlich höherstufige Schnittstellen und Funktionen wie Korrelationen, Metriken, Poison-Queues, Wiederholungsversuche und Rate-Limitierung verwenden.
Es kann sein, dass Sie die Nachricht erst dann bestätigen möchten, wenn sie erfolgreich verarbeitet wurde. In anderen Fällen möchten Sie sie möglicherweise sofort bestätigen und dann in Betracht ziehen, sie zu verarbeiten. Manchmal möchten Sie aufgrund der eingehenden Nachricht bestimmte Aktionen ausführen und als Antwort eine andere Nachricht veröffentlichen.
Um diese Anforderungen zu erfüllen, gibt es eine Komponente namens Router.
Beispielanwendung des Nachrichtenrouters
Der Ablauf der Beispielanwendung ist wie folgt:
- Generieren Sie alle Sekunde eine Nachricht auf dem
incoming_messages_topic
. - Der Listener
struct_handler
behandelt dasincoming_messages_topic
. Nach Erhalt einer Nachricht druckt er die UUID und generiert eine neue Nachricht auf demoutgoing_messages_topic
. - Der Handler
print_incoming_messages
hört auf dasincoming_messages_topic
und druckt die UUID, Payload und Metadaten der Nachricht. - Der Handler
print_outgoing_messages
hört auf dasoutgoing_messages_topic
und druckt die UUID, Payload und Metadaten der Nachricht. Die Korrelations-ID sollte mit der Nachricht auf demincoming_messages_topic
übereinstimmen.
Router-Konfiguration
Konfigurieren Sie zuerst den Router, indem Sie Plugins und Middleware hinzufügen. Legen Sie dann die Handler fest, die der Router verwenden wird. Jeder Handler wird die Nachrichten unabhängig voneinander verarbeiten.
Vollständiger Quellcode: github.com/ThreeDotsLabs/watermill/_examples/basic/3-router/main.go
// ...
package main
import (
"context"
"fmt
"log"
"time"
"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill/message"
"github.com/ThreeDotsLabs/watermill/message/router/middleware"
"github.com/ThreeDotsLabs/watermill/message/router/plugin"
"github.com/ThreeDotsLabs/watermill/pubsub/gochannel"
)
var (
// Für dieses Beispiel verwenden wir eine einfache Logger-Implementierung,
// Sie wollen möglicherweise Ihre eigene `watermill.LoggerAdapter`-Implementierung bereitstellen.
logger = watermill.NewStdLogger(false, false)
)
func main() {
router, err := message.NewRouter(message.RouterConfig{}, logger)
if err != nil {
panic(err)
}
// Bei Empfang des SIGTERM-Signals wird der SignalsHandler den Router sauber schließen.
// Sie können den Router auch durch Aufruf von `r.Close()` schließen.
router.AddPlugin(plugin.SignalsHandler)
// Router-Level-Middleware wird für jede Nachricht, die an den Router gesendet wird, ausgeführt
router.AddMiddleware(
// CorrelationID kopiert die Korrelations-ID aus den Metadaten der eingehenden Nachricht in die generierte Nachricht
middleware.CorrelationID,
// Wenn der Handler einen Fehler zurückgibt, wird die Handlerfunktion erneut versucht.
// Nach Erreichen von MaxRetries wird die Nachricht Nacked und PubSub ist dafür verantwortlich, sie erneut zu senden.
middleware.Retry{
MaxRetries: 3,
InitialInterval: time.Millisecond * 100,
Logger: logger,
}.Middleware,
// Recoverer behandelt Paniken im Handler.
// In diesem Fall übergibt es sie als Fehler an das Retry-Middleware.
middleware.Recoverer,
)
// Für die Einfachheit verwenden wir hier das gochannel-Pub/Sub,
// Sie können es durch jede Pub/Sub-Implementierung ersetzen, und der Effekt wird derselbe sein.
pubSub := gochannel.NewGoChannel(gochannel.Config{}, logger)
// Generieren Sie im Hintergrund einige eingehende Nachrichten
go publishMessages(pubSub)
// AddHandler gibt einen Handler zurück, der verwendet werden kann, um handler-Level-Middleware hinzuzufügen
// oder um den Handler zu stoppen.
handler := router.AddHandler(
"struct_handler", // Handlername, muss eindeutig sein
"incoming_messages_topic", // Thema, von dem Ereignisse gelesen werden
pubSub,
"outgoing_messages_topic", // Thema, zu dem Ereignisse veröffentlicht werden
pubSub,
structHandler{}.Handler,
)
// Handler-Level-Middleware wird nur für einen bestimmten Handler ausgeführt
// Diese Art von Middleware kann auf die gleiche Weise wie Router-Level-Middleware hinzugefügt werden
handler.AddMiddleware(func(h message.HandlerFunc) message.HandlerFunc {
return func(message *message.Message) ([]*message.Message, error) {
log.Println("Führe handler-spezifische Middleware für", message.UUID, "aus")
return h(message)
}
})
// Zum Debuggen drucken wir alle Nachrichten aus, die auf "incoming_messages_topic" erhalten wurden
router.AddNoPublisherHandler(
"print_incoming_messages",
"incoming_messages_topic",
pubSub,
printMessages,
)
// Zum Debuggen drucken wir alle Ereignisse aus, die an "outgoing_messages_topic" gesendet wurden
router.AddNoPublisherHandler(
"print_outgoing_messages",
"outgoing_messages_topic",
pubSub,
printMessages,
)
// Jetzt, da alle Handler registriert sind, führen wir den Router aus.
// Run blockiert, während der Router läuft.
ctx := context.Background()
if err := router.Run(ctx); err != nil {
panic(err)
}
}
// ...
Eingehende Nachrichten
Der struct_handler
verbraucht Nachrichten aus dem incoming_messages_topic
, daher simulieren wir den eingehenden Verkehr, indem wir im Hintergrund publishMessages()
aufrufen. Beachten Sie, dass wir das SetCorrelationID
-Middleware hinzugefügt haben. Der Router fügt allen generierten Nachrichten eine Korrelations-ID hinzu (die in den Metadaten gespeichert wird).
Vollständiger Quellcode: github.com/ThreeDotsLabs/watermill/_examples/basic/3-router/main.go
// …
func publishMessages(publisher message.Publisher) {
for {
msg := message.NewMessage(watermill.NewUUID(), []byte("Hallo, Welt!"))
middleware.SetCorrelationID(watermill.NewUUID(), msg)
log.Printf("sende Nachricht %s, Korrelations-ID: %s\n", msg.UUID, middleware.MessageCorrelationID(msg))
if err := publisher.Publish("incoming_messages_topic", msg); err != nil {
panic(err)
}
time.Sleep(time.Second)
}
}
// …
Handler
Sie haben vielleicht bemerkt, dass es zwei Arten von Handler-Funktionen gibt:
- Funktion
func(msg *message.Message) ([]*message.Message, error)
- Methode
func (c structHandler) Handler(msg *message.Message) ([]*message.Message, error)
Wenn Ihr Handler eine Funktion ist, die nicht von Abhängigkeiten abhängt, ist die Verwendung der ersten Option in Ordnung. Wenn Ihr Handler jedoch einige Abhängigkeiten benötigt (wie z. B. Datenbankverbindungen, Logger usw.), ist die zweite Option nützlich.
Vollständiger Quellcode: github.com/ThreeDotsLabs/watermill/_examples/basic/3-router/main.go
// …
func printMessages(msg *message.Message) error {
fmt.Printf(
"\n> Empfangene Nachricht: %s\n> %s\n> Metadaten: %v\n\n",
msg.UUID, string(msg.Payload), msg.Metadata,
)
return nil
}
type structHandler struct {
// Hier können wir einige Abhängigkeiten hinzufügen
}
func (s structHandler) Handler(msg *message.Message) ([]*message.Message, error) {
log.Println("structHandler hat die Nachricht empfangen", msg.UUID)
msg = message.NewMessage(watermill.NewUUID(), []byte("Nachricht vom structHandler erzeugt"))
return message.Messages{msg}, nil
}
Fertig!
Sie können dieses Beispiel mit go run main.go
ausführen.