Custom Publisher/Subscriber Interface

To add support for custom publisher/subscriber, you need to implement the message.Publisher and message.Subscriber interfaces.

Complete source code: github.com/ThreeDotsLabs/watermill/message/pubsub.go

// ...
type Publisher interface {
    // Publish publishes the provided messages to the specified topic.
    //
    // Publish can be executed synchronously or asynchronously, depending on the implementation.
    //
    // Most publisher implementations do not support atomic message publishing.
    // This means that if one message fails to publish, the next message will not be published.
    //
    // Publish must be thread-safe.
    Publish(topic string, messages ...*Message) error
    // If the publisher is asynchronous, Close should flush unsent messages.
    Close() error
}

// Subscriber is the consuming part of publisher/subscriber.
type Subscriber interface {
    // Subscribe returns an output channel for messages received from the provided topic.
    // The channel will close when Close() is called on the subscriber.
    //
    // To receive the next message, Ack() must be called on the received message.
    // If processing the message fails and the message should be redelivered, Nack() should be called.
    //
    // When the provided ctx is canceled, the subscriber will close the subscription and the output channel.
    // The provided ctx is set on all generated messages.
    // When Nack or Ack is called on the message, the message's context is canceled.
    Subscribe(ctx context.Context, topic string) (

To-Do List

Here are a few points you shouldn't forget:

  1. Logging (good messages and appropriate levels).
  2. Replaceable and configurable message encoders.
  3. Implementation of Close() for publishers and subscribers should be:
    • Idempotent
    • Able to work properly when the publisher or subscriber is blocked (e.g., waiting for Ack)
    • Able to work properly when the subscriber's output channel is blocked (because there's no one listening to it)
  4. Support for Ack() and Nack() for consumed messages.
  5. Support for redelivery of consumed messages using Nack().
  6. Use generic publisher/subscriber testing. You should refer to the testing troubleshooting guide for debugging tips.
  7. Performance optimization.
  8. GoDocs, Markdown documentation, and introductory examples.