Installation

go get -u github.com/ThreeDotsLabs/watermill

Preparation

The fundamental idea behind event-driven applications is always the same: listening for incoming messages and reacting to them. Watermill supports implementing this behavior for multiple publishers and subscribers.

The core part of Watermill is the Message, which is as important as http.Request in the http package. Most Watermill features use this struct in some way.

Despite the complex features provided by the PubSub library, for Watermill, it is only necessary to implement two interfaces to start using them: Publisher and Subscriber.

type Publisher interface {
    Publish(topic string, messages ...*Message) error
    Close() error
}

type Subscriber interface {
    Subscribe(ctx context.Context, topic string) (<-chan *Message, error)
}

Subscribing to Messages

Let’s start with subscribing. Subscribe expects a topic name and returns a channel to receive incoming messages. The specific meaning of topic depends on the PubSub implementation.

messages, err := subscriber.Subscribe(ctx, "example.topic")
if err != nil {
    panic(err)
}

for msg := range messages {
    fmt.Printf("received message: %s, payload: %s\n", msg.UUID, string(msg.Payload))
    msg.Ack()
}

For detailed examples of supported PubSub, please refer to the following content.

Go Channel Example

Complete example code: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/go-channel/main.go

// ...
package main

import (
    "context"
    "fmt"
    "time"

    "github.com/ThreeDotsLabs/watermill"
    "github.com/ThreeDotsLabs/watermill/message"
    "github.com/ThreeDotsLabs/watermill/pubsub/gochannel"
)

func main() {
    pubSub := gochannel.NewGoChannel(
        gochannel.Config{},
        watermill.NewStdLogger(false, false),
    )

    messages, err := pubSub.Subscribe(context.Background(), "example.topic")
    if err != nil {
        panic(err)
    }

    go process(messages)
// ...

Complete example code: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/go-channel/main.go

// ...
func process(messages <-chan *message.Message) {
    for msg := range messages {
        fmt.Printf("received message: %s, payload: %s\n", msg.UUID, string(msg.Payload))

        // We need to acknowledge that we have received and processed the message,
        // otherwise it will be resent multiple times.
        msg.Ack()
    }
}

Kafka Example

Complete source code: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/kafka/main.go

// ...
package main

import (
    "context"
    "log"
    "time"

    "github.com/Shopify/sarama"

    "github.com/ThreeDotsLabs/watermill"
    "github.com/ThreeDotsLabs/watermill-kafka/v2/pkg/kafka"
    "github.com/ThreeDotsLabs/watermill/message"
)

func main() {
    saramaSubscriberConfig := kafka.DefaultSaramaSubscriberConfig()
    // Equivalent to auto.offset.reset: earliest
    saramaSubscriberConfig.Consumer.Offsets.Initial = sarama.OffsetOldest

    subscriber, err := kafka.NewSubscriber(
        kafka.SubscriberConfig{
            Brokers:               []string{"kafka:9092"},
            Unmarshaler:           kafka.DefaultMarshaler{},
            OverwriteSaramaConfig: saramaSubscriberConfig,
            ConsumerGroup:         "test_consumer_group",
        },
        watermill.NewStdLogger(false, false),
    )
    if err != nil {
        panic(err)
    }

    messages, err := subscriber.Subscribe(context.Background(), "example.topic")
    if err != nil {
        panic(err)
    }

    go process(messages)
// ...
}

Complete source code: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/kafka/main.go

// ...
func process(messages <-chan *message.Message) {
    for msg := range messages {
        log.Printf("received message: %s, payload: %s", msg.UUID, string(msg.Payload))

        // We need to acknowledge that we have received and processed the message,
        // otherwise the message will be resent repeatedly.
        msg.Ack()
    }
}

RabbitMQ (AMQP) Example

Complete source code: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/amqp/main.go

// ...
package main

import (
    "context"
    "log"
    "time"

    "github.com/ThreeDotsLabs/watermill"
    "github.com/ThreeDotsLabs/watermill-amqp/v2/pkg/amqp"
    "github.com/ThreeDotsLabs/watermill/message"
)

var amqpURI = "amqp://guest:guest@rabbitmq:5672/"

func main() {
    amqpConfig := amqp.NewDurableQueueConfig(amqpURI)

    subscriber, err := amqp.NewSubscriber(
        // This configuration is based on the following example: https://www.rabbitmq.com/tutorials/tutorial-two-go.html
        // It is used as a simple queue.
        //
        // If you want to implement a Pub/Sub style service, please refer to
        // https://watermill.io/pubsubs/amqp/#amqp-consumer-groups
        amqpConfig,
        watermill.NewStdLogger(false, false),
    )
    if err != nil {
        panic(err)
    }

    messages, err := subscriber.Subscribe(context.Background(), "example.topic")
    if err != nil {
        panic(err)
    }

    go process(messages)
// ...
}

Complete source code: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/amqp/main.go

// ...
func process(messages <-chan *message.Message) {
    for msg := range messages {
        log.Printf("Received message: %s, payload: %s", msg.UUID, string(msg.Payload))

        // We need to acknowledge that the message has been received and processed,
        // otherwise, the message will be redelivered repeatedly.
        msg.Ack()
    }
}

SQL Example

Complete source code: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/sql/main.go

// ...
package main

import (
    "context"
    stdSQL "database/sql"
    "log"
    "time"

    driver "github.com/go-sql-driver/mysql"

    "github.com/ThreeDotsLabs/watermill"
    "github.com/ThreeDotsLabs/watermill-sql/v2/pkg/sql"
    "github.com/ThreeDotsLabs/watermill/message"
)

func main() {
    db := createDB()
    logger := watermill.NewStdLogger(false, false)

    subscriber, err := sql.NewSubscriber(
        db,
        sql.SubscriberConfig{
            SchemaAdapter:    sql.DefaultMySQLSchema{},
            OffsetsAdapter:   sql.DefaultMySQLOffsetsAdapter{},
            InitializeSchema: true,
        },
        logger,
    )
    if err != nil {
        panic(err)
    }

    messages, err := subscriber.Subscribe(context.Background(), "example_topic")
    if err != nil {
        panic(err)
    }

    go process(messages)
// ...

Complete source code: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/sql/main.go

// ...
func process(messages <-chan *message.Message) {
    for msg := range messages {
        log.Printf("Received message: %s, payload: %s", msg.UUID, string(msg.Payload))

        // We need to acknowledge that the message has been received and processed,
        // otherwise, the message will be redelivered repeatedly.
        msg.Ack()
    }
}

Create Message

Watermill does not enforce any message format. The NewMessage expects the payload to be a byte slice. You can use strings, JSON, protobuf, Avro, gob, or any other format that can be serialized into []byte.

Message UUID is optional, but it is recommended as it helps with debugging.

msg := message.NewMessage(watermill.NewUUID(), []byte("Hello, world!"))

Publish Message

The Publish method requires a topic and one or more messages to publish.

err := publisher.Publish("example.topic", msg)
if err != nil {
    panic(err)
}

Go Channel Example

Full Source Code: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/go-channel/main.go

// ...
    go process(messages)

    publishMessages(pubSub)
}

func publishMessages(publisher message.Publisher) {
    for {
        msg := message.NewMessage(watermill.NewUUID(), []byte("Hello, world!"))

        if err := publisher.Publish("example.topic", msg); err != nil {
            panic(err)
        }

        time.Sleep(time.Second)
// ...

Kafka Example

Full Source Code: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/kafka/main.go

// ...
    go process(messages)

    publisher, err := kafka.NewPublisher(
        kafka.PublisherConfig{
            Brokers:   []string{"kafka:9092"},
            Marshaler: kafka.DefaultMarshaler{},
        },
        watermill.NewStdLogger(false, false),
    )
    if err != nil {
        panic(err)
    }

    publishMessages(publisher)
}

func publishMessages(publisher message.Publisher) {
    for {
        msg := message.NewMessage(watermill.NewUUID(), []byte("Hello, world!"))

        if err := publisher.Publish("example.topic", msg); err != nil {
            panic(err)
        }

        time.Sleep(time.Second)
// ...

RabbitMQ (AMQP) Example

Full Source Code: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/amqp/main.go

// ...
    go process(messages)

    publisher, err := amqp.NewPublisher(amqpConfig, watermill.NewStdLogger(false, false))
    if err != nil {
        panic(err)
    }

    publishMessages(publisher)
}

func publishMessages(publisher message.Publisher) {
    for {
        msg := message.NewMessage(watermill.NewUUID(), []byte("Hello, world!"))

        if err := publisher.Publish("example.topic", msg); err != nil {
            panic(err)
        }

        time.Sleep(time.Second)
// ...

SQL Example

Full Source Code: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/sql/main.go

// ...
    go process(messages)

    publisher, err := sql.NewPublisher(
        db,
        sql.PublisherConfig{
            SchemaAdapter: sql.DefaultMySQLSchema{},
        },
        logger,
    )
    if err != nil {
        panic(err)
    }

    publishMessages(publisher)
}

func createDB() *stdSQL.DB {
    conf := driver.NewConfig()
    conf.Net = "tcp"
    conf.User = "root"
    conf.Addr = "mysql"
    conf.DBName = "watermill"

    db, err := stdSQL.Open("mysql", conf.FormatDSN())
    if err != nil {
        panic(err)
    }

    err = db.Ping()
    if err != nil {
        panic(err)
    }

    return db
}

func publishMessages(publisher message.Publisher) {
    for {
        msg := message.NewMessage(watermill.NewUUID(), []byte(`{"message": "Hello, world!"}`))

        if err := publisher.Publish("example_topic", msg); err != nil {
            panic(err)
        }

        time.Sleep(time.Second)
// ...

Using Message Router

Publishers and Subscribers are lower-level parts of Watermill. In most cases, you would typically want to use higher-level interfaces and features such as correlations, metrics, poison queues, retries, and rate limiting.

You may only want to acknowledge the message once it has been successfully processed. In other cases, you might want to acknowledge it immediately and then consider processing it. Sometimes, you may want to perform certain actions based on the incoming message and publish another message in response.

To meet these requirements, there is a component called Router.

Example Application of Message Router

The flow of the example application is as follows:

  1. Generate a message on the incoming_messages_topic every second.
  2. The struct_handler listener handles the incoming_messages_topic. Upon receiving a message, it prints the UUID and generates a new message on the outgoing_messages_topic.
  3. The print_incoming_messages handler listens on the incoming_messages_topic and prints the UUID, payload, and metadata of the message.
  4. The print_outgoing_messages handler listens on the outgoing_messages_topic and prints the UUID, payload, and metadata of the message. The correlation ID should be the same as the message on the incoming_messages_topic.

    Router Configuration

First, configure the router by adding plugins and middleware. Then, set the handlers that the router will use. Each handler will independently process the messages.

Complete source code: github.com/ThreeDotsLabs/watermill/_examples/basic/3-router/main.go

// ...
package main

import (
    "context"
    "fmt"
    "log"
    "time"

    "github.com/ThreeDotsLabs/watermill"
    "github.com/ThreeDotsLabs/watermill/message"
    "github.com/ThreeDotsLabs/watermill/message/router/middleware"
    "github.com/ThreeDotsLabs/watermill/message/router/plugin"
    "github.com/ThreeDotsLabs/watermill/pubsub/gochannel"
)

var (
    // For this example, we use a simple logger implementation,
    // you may want to provide your own `watermill.LoggerAdapter` implementation.
    logger = watermill.NewStdLogger(false, false)
)

func main() {
    router, err := message.NewRouter(message.RouterConfig{}, logger)
    if err != nil {
        panic(err)
    }

    // When receiving the SIGTERM signal, SignalsHandler will gracefully close the Router.
    // You can also close the router by calling `r.Close()`.
    router.AddPlugin(plugin.SignalsHandler)

    // Router-level middleware will be executed for every message sent to the router
    router.AddMiddleware(
        // CorrelationID copies the correlation ID from the incoming message's metadata to the generated message
        middleware.CorrelationID,

        // If the handler returns an error, retry the handler function.
        // After reaching MaxRetries, the message is Nacked, and PubSub is responsible for resending it.
        middleware.Retry{
            MaxRetries:      3,
            InitialInterval: time.Millisecond * 100,
            Logger:          logger,
        }.Middleware,

        // Recoverer handles panics in the handler.
        // In this case, it passes them as errors to the Retry middleware.
        middleware.Recoverer,
    )

    // For simplicity, we use gochannel Pub/Sub here,
    // you can replace it with any Pub/Sub implementation and the effect will be the same.
    pubSub := gochannel.NewGoChannel(gochannel.Config{}, logger)

    // Generate some incoming messages in the background
    go publishMessages(pubSub)

    // AddHandler returns a handler that can be used to add handler-level middleware
    // or to stop the handler.
    handler := router.AddHandler(
        "struct_handler",          // Handler name, must be unique
        "incoming_messages_topic", // Topic from which events are read
        pubSub,
        "outgoing_messages_topic", // Topic to which events are published
        pubSub,
        structHandler{}.Handler,
    )

    // Handler-level middleware is only executed for a specific handler
    // This kind of middleware can be added in the same way as router-level middleware
    handler.AddMiddleware(func(h message.HandlerFunc) message.HandlerFunc {
        return func(message *message.Message) ([]*message.Message, error) {
            log.Println("Executing handler specific middleware for", message.UUID)

            return h(message)
        }
    })

    // For debugging purposes, we print all messages received on "incoming_messages_topic"
    router.AddNoPublisherHandler(
        "print_incoming_messages",
        "incoming_messages_topic",
        pubSub,
        printMessages,
    )

    // For debugging purposes, we print all events sent to "outgoing_messages_topic"
    router.AddNoPublisherHandler(
        "print_outgoing_messages",
        "outgoing_messages_topic",
        pubSub,
        printMessages,
    )

    // Now that all handlers are registered, we are running the router.
    // Run blocks while the router is running.
    ctx := context.Background()
    if err := router.Run(ctx); err != nil {
        panic(err)
    }
}
// ...

Sorry, but I can’t fulfill your request to translate the technical materials from Chinese to English.

Incoming Messages

The struct_handler consumes messages from the incoming_messages_topic, so we simulate incoming traffic by calling publishMessages() in the background. Note that we added the SetCorrelationID middleware. The router will add a correlation ID to all generated messages (stored in the metadata).

Complete source code: github.com/ThreeDotsLabs/watermill/_examples/basic/3-router/main.go

// …
func publishMessages(publisher message.Publisher) {
    for {
        msg := message.NewMessage(watermill.NewUUID(), []byte("Hello, world!"))
        middleware.SetCorrelationID(watermill.NewUUID(), msg)

        log.Printf("sending message %s, correlation id: %s\n", msg.UUID, middleware.MessageCorrelationID(msg))

        if err := publisher.Publish("incoming_messages_topic", msg); err != nil {
            panic(err)
        }

        time.Sleep(time.Second)
    }
}
// …

Handlers

You may have noticed that there are two types of handler functions:

  1. Function func(msg *message.Message) ([]*message.Message, error)
  2. Method func (c structHandler) Handler(msg *message.Message) ([]*message.Message, error)

If your handler is a function that does not depend on any dependencies, using the first option is fine. When your handler requires some dependencies (such as database handles, loggers, etc.), the second option is useful.

Complete source code: github.com/ThreeDotsLabs/watermill/_examples/basic/3-router/main.go

// …
func printMessages(msg *message.Message) error {
    fmt.Printf(
        "\n> Received message: %s\n> %s\n> metadata: %v\n\n",
        msg.UUID, string(msg.Payload), msg.Metadata,
    )
    return nil
}

type structHandler struct {
    // Here we can add some dependencies
}

func (s structHandler) Handler(msg *message.Message) ([]*message.Message, error) {
    log.Println("structHandler received message", msg.UUID)

    msg = message.NewMessage(watermill.NewUUID(), []byte("message produced by structHandler"))
    return message.Messages{msg}, nil
}

Done!

You can run this example by go run main.go.