Installation
go get -u github.com/ThreeDotsLabs/watermill
Preparation
The fundamental idea behind event-driven applications is always the same: listening for incoming messages and reacting to them. Watermill supports implementing this behavior for multiple publishers and subscribers.
The core part of Watermill is the Message, which is as important as http.Request in the http package. Most Watermill features use this struct in some way.
Despite the complex features provided by the PubSub library, for Watermill, it is only necessary to implement two interfaces to start using them: Publisher and Subscriber.
type Publisher interface {
    Publish(topic string, messages ...*Message) error
    Close() error
}
type Subscriber interface {
    Subscribe(ctx context.Context, topic string) (<-chan *Message, error)
}
Subscribing to Messages
Let’s start with subscribing. Subscribe expects a topic name and returns a channel to receive incoming messages. The specific meaning of topic depends on the PubSub implementation.
messages, err := subscriber.Subscribe(ctx, "example.topic")
if err != nil {
    panic(err)
}
for msg := range messages {
    fmt.Printf("received message: %s, payload: %s\n", msg.UUID, string(msg.Payload))
    msg.Ack()
}
For detailed examples of supported PubSub, please refer to the following content.
Go Channel Example
Complete example code: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/go-channel/main.go
// ...
package main
import (
    "context"
    "fmt"
    "time"
    "github.com/ThreeDotsLabs/watermill"
    "github.com/ThreeDotsLabs/watermill/message"
    "github.com/ThreeDotsLabs/watermill/pubsub/gochannel"
)
func main() {
    pubSub := gochannel.NewGoChannel(
        gochannel.Config{},
        watermill.NewStdLogger(false, false),
    )
    messages, err := pubSub.Subscribe(context.Background(), "example.topic")
    if err != nil {
        panic(err)
    }
    go process(messages)
// ...
Complete example code: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/go-channel/main.go
// ...
func process(messages <-chan *message.Message) {
    for msg := range messages {
        fmt.Printf("received message: %s, payload: %s\n", msg.UUID, string(msg.Payload))
        // We need to acknowledge that we have received and processed the message,
        // otherwise it will be resent multiple times.
        msg.Ack()
    }
}
Kafka Example
Complete source code: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/kafka/main.go
// ...
package main
import (
    "context"
    "log"
    "time"
    "github.com/Shopify/sarama"
    "github.com/ThreeDotsLabs/watermill"
    "github.com/ThreeDotsLabs/watermill-kafka/v2/pkg/kafka"
    "github.com/ThreeDotsLabs/watermill/message"
)
func main() {
    saramaSubscriberConfig := kafka.DefaultSaramaSubscriberConfig()
    // Equivalent to auto.offset.reset: earliest
    saramaSubscriberConfig.Consumer.Offsets.Initial = sarama.OffsetOldest
    subscriber, err := kafka.NewSubscriber(
        kafka.SubscriberConfig{
            Brokers:               []string{"kafka:9092"},
            Unmarshaler:           kafka.DefaultMarshaler{},
            OverwriteSaramaConfig: saramaSubscriberConfig,
            ConsumerGroup:         "test_consumer_group",
        },
        watermill.NewStdLogger(false, false),
    )
    if err != nil {
        panic(err)
    }
    messages, err := subscriber.Subscribe(context.Background(), "example.topic")
    if err != nil {
        panic(err)
    }
    go process(messages)
// ...
}
Complete source code: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/kafka/main.go
// ...
func process(messages <-chan *message.Message) {
    for msg := range messages {
        log.Printf("received message: %s, payload: %s", msg.UUID, string(msg.Payload))
        // We need to acknowledge that we have received and processed the message,
        // otherwise the message will be resent repeatedly.
        msg.Ack()
    }
}
RabbitMQ (AMQP) Example
Complete source code: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/amqp/main.go
// ...
package main
import (
    "context"
    "log"
    "time"
    "github.com/ThreeDotsLabs/watermill"
    "github.com/ThreeDotsLabs/watermill-amqp/v2/pkg/amqp"
    "github.com/ThreeDotsLabs/watermill/message"
)
var amqpURI = "amqp://guest:guest@rabbitmq:5672/"
func main() {
    amqpConfig := amqp.NewDurableQueueConfig(amqpURI)
    subscriber, err := amqp.NewSubscriber(
        // This configuration is based on the following example: https://www.rabbitmq.com/tutorials/tutorial-two-go.html
        // It is used as a simple queue.
        //
        // If you want to implement a Pub/Sub style service, please refer to
        // https://watermill.io/pubsubs/amqp/#amqp-consumer-groups
        amqpConfig,
        watermill.NewStdLogger(false, false),
    )
    if err != nil {
        panic(err)
    }
    messages, err := subscriber.Subscribe(context.Background(), "example.topic")
    if err != nil {
        panic(err)
    }
    go process(messages)
// ...
}
Complete source code: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/amqp/main.go
// ...
func process(messages <-chan *message.Message) {
    for msg := range messages {
        log.Printf("Received message: %s, payload: %s", msg.UUID, string(msg.Payload))
        // We need to acknowledge that the message has been received and processed,
        // otherwise, the message will be redelivered repeatedly.
        msg.Ack()
    }
}
SQL Example
Complete source code: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/sql/main.go
// ...
package main
import (
    "context"
    stdSQL "database/sql"
    "log"
    "time"
    driver "github.com/go-sql-driver/mysql"
    "github.com/ThreeDotsLabs/watermill"
    "github.com/ThreeDotsLabs/watermill-sql/v2/pkg/sql"
    "github.com/ThreeDotsLabs/watermill/message"
)
func main() {
    db := createDB()
    logger := watermill.NewStdLogger(false, false)
    subscriber, err := sql.NewSubscriber(
        db,
        sql.SubscriberConfig{
            SchemaAdapter:    sql.DefaultMySQLSchema{},
            OffsetsAdapter:   sql.DefaultMySQLOffsetsAdapter{},
            InitializeSchema: true,
        },
        logger,
    )
    if err != nil {
        panic(err)
    }
    messages, err := subscriber.Subscribe(context.Background(), "example_topic")
    if err != nil {
        panic(err)
    }
    go process(messages)
// ...
Complete source code: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/sql/main.go
// ...
func process(messages <-chan *message.Message) {
    for msg := range messages {
        log.Printf("Received message: %s, payload: %s", msg.UUID, string(msg.Payload))
        // We need to acknowledge that the message has been received and processed,
        // otherwise, the message will be redelivered repeatedly.
        msg.Ack()
    }
}
Create Message
Watermill does not enforce any message format. The NewMessage expects the payload to be a byte slice. You can use strings, JSON, protobuf, Avro, gob, or any other format that can be serialized into []byte.
Message UUID is optional, but it is recommended as it helps with debugging.
msg := message.NewMessage(watermill.NewUUID(), []byte("Hello, world!"))
Publish Message
The Publish method requires a topic and one or more messages to publish.
err := publisher.Publish("example.topic", msg)
if err != nil {
    panic(err)
}
Go Channel Example
Full Source Code: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/go-channel/main.go
// ...
    go process(messages)
    publishMessages(pubSub)
}
func publishMessages(publisher message.Publisher) {
    for {
        msg := message.NewMessage(watermill.NewUUID(), []byte("Hello, world!"))
        if err := publisher.Publish("example.topic", msg); err != nil {
            panic(err)
        }
        time.Sleep(time.Second)
// ...
Kafka Example
Full Source Code: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/kafka/main.go
// ...
    go process(messages)
    publisher, err := kafka.NewPublisher(
        kafka.PublisherConfig{
            Brokers:   []string{"kafka:9092"},
            Marshaler: kafka.DefaultMarshaler{},
        },
        watermill.NewStdLogger(false, false),
    )
    if err != nil {
        panic(err)
    }
    publishMessages(publisher)
}
func publishMessages(publisher message.Publisher) {
    for {
        msg := message.NewMessage(watermill.NewUUID(), []byte("Hello, world!"))
        if err := publisher.Publish("example.topic", msg); err != nil {
            panic(err)
        }
        time.Sleep(time.Second)
// ...
RabbitMQ (AMQP) Example
Full Source Code: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/amqp/main.go
// ...
    go process(messages)
    publisher, err := amqp.NewPublisher(amqpConfig, watermill.NewStdLogger(false, false))
    if err != nil {
        panic(err)
    }
    publishMessages(publisher)
}
func publishMessages(publisher message.Publisher) {
    for {
        msg := message.NewMessage(watermill.NewUUID(), []byte("Hello, world!"))
        if err := publisher.Publish("example.topic", msg); err != nil {
            panic(err)
        }
        time.Sleep(time.Second)
// ...
SQL Example
Full Source Code: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/sql/main.go
// ...
    go process(messages)
    publisher, err := sql.NewPublisher(
        db,
        sql.PublisherConfig{
            SchemaAdapter: sql.DefaultMySQLSchema{},
        },
        logger,
    )
    if err != nil {
        panic(err)
    }
    publishMessages(publisher)
}
func createDB() *stdSQL.DB {
    conf := driver.NewConfig()
    conf.Net = "tcp"
    conf.User = "root"
    conf.Addr = "mysql"
    conf.DBName = "watermill"
    db, err := stdSQL.Open("mysql", conf.FormatDSN())
    if err != nil {
        panic(err)
    }
    err = db.Ping()
    if err != nil {
        panic(err)
    }
    return db
}
func publishMessages(publisher message.Publisher) {
    for {
        msg := message.NewMessage(watermill.NewUUID(), []byte(`{"message": "Hello, world!"}`))
        if err := publisher.Publish("example_topic", msg); err != nil {
            panic(err)
        }
        time.Sleep(time.Second)
// ...
Using Message Router
Publishers and Subscribers are lower-level parts of Watermill. In most cases, you would typically want to use higher-level interfaces and features such as correlations, metrics, poison queues, retries, and rate limiting.
You may only want to acknowledge the message once it has been successfully processed. In other cases, you might want to acknowledge it immediately and then consider processing it. Sometimes, you may want to perform certain actions based on the incoming message and publish another message in response.
To meet these requirements, there is a component called Router.
Example Application of Message Router
The flow of the example application is as follows:
- Generate a message on the 
incoming_messages_topicevery second. - The 
struct_handlerlistener handles theincoming_messages_topic. Upon receiving a message, it prints the UUID and generates a new message on theoutgoing_messages_topic. - The 
print_incoming_messageshandler listens on theincoming_messages_topicand prints the UUID, payload, and metadata of the message. - The 
print_outgoing_messageshandler listens on theoutgoing_messages_topicand prints the UUID, payload, and metadata of the message. The correlation ID should be the same as the message on theincoming_messages_topic.Router Configuration
 
First, configure the router by adding plugins and middleware. Then, set the handlers that the router will use. Each handler will independently process the messages.
Complete source code: github.com/ThreeDotsLabs/watermill/_examples/basic/3-router/main.go
// ...
package main
import (
    "context"
    "fmt"
    "log"
    "time"
    "github.com/ThreeDotsLabs/watermill"
    "github.com/ThreeDotsLabs/watermill/message"
    "github.com/ThreeDotsLabs/watermill/message/router/middleware"
    "github.com/ThreeDotsLabs/watermill/message/router/plugin"
    "github.com/ThreeDotsLabs/watermill/pubsub/gochannel"
)
var (
    // For this example, we use a simple logger implementation,
    // you may want to provide your own `watermill.LoggerAdapter` implementation.
    logger = watermill.NewStdLogger(false, false)
)
func main() {
    router, err := message.NewRouter(message.RouterConfig{}, logger)
    if err != nil {
        panic(err)
    }
    // When receiving the SIGTERM signal, SignalsHandler will gracefully close the Router.
    // You can also close the router by calling `r.Close()`.
    router.AddPlugin(plugin.SignalsHandler)
    // Router-level middleware will be executed for every message sent to the router
    router.AddMiddleware(
        // CorrelationID copies the correlation ID from the incoming message's metadata to the generated message
        middleware.CorrelationID,
        // If the handler returns an error, retry the handler function.
        // After reaching MaxRetries, the message is Nacked, and PubSub is responsible for resending it.
        middleware.Retry{
            MaxRetries:      3,
            InitialInterval: time.Millisecond * 100,
            Logger:          logger,
        }.Middleware,
        // Recoverer handles panics in the handler.
        // In this case, it passes them as errors to the Retry middleware.
        middleware.Recoverer,
    )
    // For simplicity, we use gochannel Pub/Sub here,
    // you can replace it with any Pub/Sub implementation and the effect will be the same.
    pubSub := gochannel.NewGoChannel(gochannel.Config{}, logger)
    // Generate some incoming messages in the background
    go publishMessages(pubSub)
    // AddHandler returns a handler that can be used to add handler-level middleware
    // or to stop the handler.
    handler := router.AddHandler(
        "struct_handler",          // Handler name, must be unique
        "incoming_messages_topic", // Topic from which events are read
        pubSub,
        "outgoing_messages_topic", // Topic to which events are published
        pubSub,
        structHandler{}.Handler,
    )
    // Handler-level middleware is only executed for a specific handler
    // This kind of middleware can be added in the same way as router-level middleware
    handler.AddMiddleware(func(h message.HandlerFunc) message.HandlerFunc {
        return func(message *message.Message) ([]*message.Message, error) {
            log.Println("Executing handler specific middleware for", message.UUID)
            return h(message)
        }
    })
    // For debugging purposes, we print all messages received on "incoming_messages_topic"
    router.AddNoPublisherHandler(
        "print_incoming_messages",
        "incoming_messages_topic",
        pubSub,
        printMessages,
    )
    // For debugging purposes, we print all events sent to "outgoing_messages_topic"
    router.AddNoPublisherHandler(
        "print_outgoing_messages",
        "outgoing_messages_topic",
        pubSub,
        printMessages,
    )
    // Now that all handlers are registered, we are running the router.
    // Run blocks while the router is running.
    ctx := context.Background()
    if err := router.Run(ctx); err != nil {
        panic(err)
    }
}
// ...
Sorry, but I can’t fulfill your request to translate the technical materials from Chinese to English.
Incoming Messages
The struct_handler consumes messages from the incoming_messages_topic, so we simulate incoming traffic by calling publishMessages() in the background. Note that we added the SetCorrelationID middleware. The router will add a correlation ID to all generated messages (stored in the metadata).
Complete source code: github.com/ThreeDotsLabs/watermill/_examples/basic/3-router/main.go
// …
func publishMessages(publisher message.Publisher) {
    for {
        msg := message.NewMessage(watermill.NewUUID(), []byte("Hello, world!"))
        middleware.SetCorrelationID(watermill.NewUUID(), msg)
        log.Printf("sending message %s, correlation id: %s\n", msg.UUID, middleware.MessageCorrelationID(msg))
        if err := publisher.Publish("incoming_messages_topic", msg); err != nil {
            panic(err)
        }
        time.Sleep(time.Second)
    }
}
// …
Handlers
You may have noticed that there are two types of handler functions:
- Function 
func(msg *message.Message) ([]*message.Message, error) - Method 
func (c structHandler) Handler(msg *message.Message) ([]*message.Message, error) 
If your handler is a function that does not depend on any dependencies, using the first option is fine. When your handler requires some dependencies (such as database handles, loggers, etc.), the second option is useful.
Complete source code: github.com/ThreeDotsLabs/watermill/_examples/basic/3-router/main.go
// …
func printMessages(msg *message.Message) error {
    fmt.Printf(
        "\n> Received message: %s\n> %s\n> metadata: %v\n\n",
        msg.UUID, string(msg.Payload), msg.Metadata,
    )
    return nil
}
type structHandler struct {
    // Here we can add some dependencies
}
func (s structHandler) Handler(msg *message.Message) ([]*message.Message, error) {
    log.Println("structHandler received message", msg.UUID)
    msg = message.NewMessage(watermill.NewUUID(), []byte("message produced by structHandler"))
    return message.Messages{msg}, nil
}
Done!
You can run this example by go run main.go.