Publisher and Subscriber are the lower-level parts of Watermill. In practical applications, you usually want to use high-level interfaces and functions, such as associations, metrics, poison message queues, retries, rate limiting, etc.

Sometimes, you may not want to send Ack when processing is successful. Sometimes, you may want to send a message after another message is processed.

To meet these requirements, there is a component called Router.

Watermill Router

Configuration

Complete source code: github.com/ThreeDotsLabs/watermill/message/router.go

// ...
type RouterConfig struct {
	// CloseTimeout determines how long the router should work for handlers when closing.
	CloseTimeout time.Duration
}

func (c *RouterConfig) setDefaults() {
	if c.CloseTimeout == 0 {
		c.CloseTimeout = time.Second * 30
	}
}

// Validate checks if there are any errors in the router configuration.
func (c RouterConfig) Validate() error {
	return nil
}
// ...

Handler

First, you need to implement the HandlerFunc function:

Complete source code: github.com/ThreeDotsLabs/watermill/message/router.go

// ...

// HandlerFunc is the function called when a message is received.
// 
// When HandlerFunc does not return an error, msg.Ack() will be called automatically.
// 
// When HandlerFunc returns an error, msg.Nack() will be called.
// 
// When msg.Ack() is called in the handler and HandlerFunc returns an error,
// msg.Nack() will not be sent because Ack has already been sent.
// 
// When receiving multiple messages (due to msg.Ack() being sent in HandlerFunc or Subscriber supporting multiple consumers),
// HandlerFunc will be executed concurrently.
type HandlerFunc func(msg *Message) ([]*Message, error)

// ...

Next, you need to use Router.AddHandler to add a new handler:

Complete source code: github.com/ThreeDotsLabs/watermill/message/router.go

// ...

// AddHandler adds a new handler.

// handlerName must be unique. Currently, it is only used for debugging.

// subscribeTopic is the topic the handler will receive messages from.

// publishTopic is the topic the handler's returned messages will be generated by the Router.

// When the handler needs to publish to multiple topics,

// it is recommended to only inject the Publisher to the handler or implement middleware,

// which can capture messages based on metadata and publish to specific topics.

// If a handler is added while the router is already running, RunHandlers() needs to be explicitly called.

func (r *Router) AddHandler(

	handlerName string,

	subscribeTopic string,

	subscriber Subscriber,

	publishTopic string,

	publisher Publisher,

	handlerFunc HandlerFunc,

) *Handler {

	r.logger.Info("Adding handler", watermill.LogFields{

		"handler_name": handlerName,

		"topic":        subscribeTopic,

	})

	r.handlersLock.Lock()

	defer r.handlersLock.Unlock()

	if _, ok := r.handlers[handlerName]; ok {

		panic(DuplicateHandlerNameError{handlerName})

	}

	publisherName, subscriberName := internal.StructName(publisher), internal.StructName(subscriber)

	newHandler := &handler{

		name:   handlerName,

		logger: r.logger,

		subscriber:     subscriber,

		subscribeTopic: subscribeTopic,

		subscriberName: subscriberName,

		publisher:     publisher,

		publishTopic:  publishTopic,

		publisherName: publisherName,

		handlerFunc: handlerFunc,

		runningHandlersWg:     r.runningHandlersWg,

		runningHandlersWgLock: r.runningHandlersWgLock,

		messagesCh:     nil,

		routersCloseCh: r.closingInProgressCh,

		startedCh: make(chan struct{}),

	}

	r.handlersWg.Add(1)

	r.handlers[handlerName] = newHandler

	select {

	case r.handlerAdded struct{}{}:

	default:

		// closeWhenAllHandlersStopped is not always waiting for handlerAdded

	}

	return &Handler{

		router:  r,

		handler: newHandler,

	}

}

// AddNoPublisherHandler adds a new handler.

// This handler cannot return messages.

// When it returns a message, an error occurs and a Nack is sent.

//

// handlerName must be unique. Currently, it is only used for debugging.

// subscribeTopic is the topic the handler will receive messages from.

// subscriber is a subscriber used to consume messages.

// If a handler is added while the router is already running, RunHandlers() needs to be explicitly called.

func (r *Router) AddNoPublisherHandler(

	handlerName string,

	subscribeTopic string,

	subscriber Subscriber,

handlerFunc NoPublishHandlerFunc,

) *Handler {

handlerFuncAdapter := func(msg *Message) ([]*Message, error) {
// ...

Reference the example usage in "Getting Started". Complete source code: github.com/ThreeDotsLabs/watermill/_examples/basic/3-router/main.go

// ...
	// AddHandler returns a handler that can be used to add handler-level middleware or stop handlers.
	handler := router.AddHandler(
		"struct_handler",          // handler name, must be unique
		"incoming_messages_topic", // topic from which events are read
		pubSub,
		"outgoing_messages_topic", // topic to publish events
		pubSub,
		structHandler{}.Handler,
	)

	// Handler-level middleware is only executed for specific handlers
	// This middleware can be added in the same way as router-level middleware
	handler.AddMiddleware(func(h message.HandlerFunc) message.HandlerFunc {
		return func(message *message.Message) ([]*message.Message, error) {
			log.Println("Executing handler-specific middleware, message UUID: ", message.UUID)

			return h(message)
		}
	})
// ...

No Publisher Handler

Not every handler will generate a new message. You can use Router.AddNoPublisherHandler to add this type of handler:

Full source code: github.com/ThreeDotsLabs/watermill/message/router.go

// ...
// AddNoPublisherHandler adds a new handler.
// This handler can't return messages.
// When it returns a message, an error will occur and Nack will be sent.
//
// handlerName must be unique and currently only used for debugging purposes.
//
// subscribeTopic is the topic on which the handler will receive messages.
//
// subscriber is used to consume messages.
//
// If you add a handler to a router that is already running, you need to explicitly call RunHandlers().
func (r *Router) AddNoPublisherHandler(
	handlerName string,
	subscribeTopic string,
	subscriber Subscriber,
	handlerFunc NoPublishHandlerFunc,
) *Handler {
	handlerFuncAdapter := func(msg *Message) ([]*Message, error) {
// ...
}

Acknowledgment

By default, when HanderFunc does not return an error, msg.Ack() will be called. If an error is returned, msg.Nack() will be called. So, after handling the message, you don't need to call msg.Ack() or msg.Nack (of course, you can if you want).

Producing Messages

When multiple messages are returned by the handler, please note that most Publisher implementations do not support atomic publishing of messages. If the broker or storage is unavailable, only some messages may be generated and msg.Nack() will be sent.

If this is a problem, consider having each handler publish only one message.

Running Router

To run the router, you need to call Run().

Full source code: github.com/ThreeDotsLabs/watermill/message/router.go

// ...
// Run runs all plugins and handlers, and starts subscribing to the given topics.
// This call blocks while the router is running.
//
// When all handlers stop (for example because the subscription has been closed), the router will also stop.
//
// To stop Run(), you should call Close() on the router.
//
// ctx will be propagated to all subscribers.
//
// When all handlers stop (e.g.: because of closed connections), Run() will also stop.
func (r *Router) Run(ctx context.Context) (err error) {
// ...
}

Ensuring the Router is Running

Understanding whether the router is running might be useful. You can achieve this using the Running() method.

Full source code: github.com/ThreeDotsLabs/watermill/message/router.go

// ...
// Running closes when the router is running.
// In other words, you can wait for the router to be running like this:

// 	fmt.Println("Starting router")
//	go r.Run(ctx)
//	//	fmt.Println("Router is running")

// Warning: For historical reasons, this channel does not know about the router's shutdown - it will close if the router keeps running and then shuts down.
func (r *Router) Running() chan struct{} {
// ...
}

You can also use the IsRunning function which returns a boolean value:

Full source code: github.com/ThreeDotsLabs/watermill/message/router.go

// ...
// IsRunning returns true when the router is running.
//
// Warning: For historical reasons, this method does not know about the router's closed state.
// If you want to know if the router has been closed, use IsClosed.
func (r *Router) IsRunning() bool {
// ...
}

Shut down the router

To shut down the router, you need to call Close().

Complete source code: github.com/ThreeDotsLabs/watermill/message/router.go

// ...
// Close gracefully closes the router with a timeout provided in the configuration.
func (r *Router) Close() error {
	r.closedLock.Lock()
// ...

Close() will shut down all publishers and subscribers, and wait for all handlers to complete.

Close() will wait for the timeout set in the RouterConfig.CloseTimeout in the configuration. If the timeout is reached, Close() will return an error.

Adding handlers after starting the router

You can add a new handler when the router is already running. To do this, you need to call AddNoPublisherHandler or AddHandler, and then call RunHandlers.

Complete source code: github.com/ThreeDotsLabs/watermill/message/router.go

// ...
// RunHandlers runs all handlers that were added after Run().
// RunHandlers is idempotent, so can be called multiple times safely.
func (r *Router) RunHandlers(ctx context.Context) error {
// ...

Stopping running handlers

You can stop only one running handler by calling Stop().

Please note that the router will shut down when there are no running handlers.

Complete source code: github.com/ThreeDotsLabs/watermill/message/router.go

// ...
// Stop stops the handler.
// Stop is asynchronous.
// You can check if the handler was stopped with the Stopped() function.
func (h *Handler) Stop() {
// ...

Execution model

Subscribers can consume a single message sequentially or multiple messages in parallel.

  • The Single message flow is the simplest method, which means subscribers will not receive any new messages until msg.Ack() is called.
  • The Multiple message flow is supported by only certain subscribers. By subscribing to multiple topic partitions simultaneously, multiple messages can be consumed in parallel, even messages that were not previously acknowledged (e.g., how Kafka subscribers work). The router processes this model by running HandlerFunc in parallel.

Please refer to the selected Pub/Sub documentation to understand the supported execution models.

Middleware

Complete source code: github.com/ThreeDotsLabs/watermill/message/router.go

// ...
// HandlerMiddleware allows us to write something similar to a decorator for HandlerFunc.
// It can execute some operations before (e.g., modify the consumed message) or after the handler (modify the generated message, ack/nack the consumed message, handle errors, log, etc.).
//
// It can be attached to the router using the `AddMiddleware` method.
//
// Example:
//
// 	func ExampleMiddleware(h message.HandlerFunc) message.HandlerFunc {
// 		return func(message *message.Message) ([]*message.Message, error) {
// 			fmt.Println("executed before handler")
// 			producedMessages, err := h(message)
// 			fmt.Println("executed after handler")
//
// 			return producedMessages, err
// 		}
// 	}
type HandlerMiddleware func(h HandlerFunc) HandlerFunc

// ...

The complete list of standard middleware can be found in Middlewares.

Plugins

Complete source code: github.com/ThreeDotsLabs/watermill/message/router.go

// ...
// RouterPlugin is a function executed when the router starts.
type RouterPlugin func(*Router) error

// ...

The complete list of standard plugins can be found in message/router/plugin.

Context

Some useful values are stored in the context for each message received by the handler:

Complete source code: github.com/ThreeDotsLabs/watermill/message/router_context.go

// ...
// HandlerNameFromCtx returns the name of the message handler in the router that consumed the message from the context.
func HandlerNameFromCtx(ctx context.Context) string {
	return valFromCtx(ctx, handlerNameKey)
}

// PublisherNameFromCtx returns the name of the message publisher type in the router from the context.
// For example, for Kafka, it will be `kafka.Publisher`.
func PublisherNameFromCtx(ctx context.Context) string {
	return valFromCtx(ctx, publisherNameKey)
}

// SubscriberNameFromCtx returns the name of the message subscriber type in the router from the context.
// For example, for Kafka, it will be `kafka.Subscriber`.
func SubscriberNameFromCtx(ctx context.Context) string {
	return valFromCtx(ctx, subscriberNameKey)
}

// SubscribeTopicFromCtx returns the topic from which the message was received in the router from the context.
func SubscribeTopicFromCtx(ctx context.Context) string {
	return valFromCtx(ctx, subscribeTopicKey)
}

// PublishTopicFromCtx returns the topic to which the message will be published in the router from the context.
func PublishTopicFromCtx(ctx context.Context) string {
	return valFromCtx(ctx, publishTopicKey)
}
// ...