Пользовательский интерфейс издателя/подписчика

Для добавления поддержки пользовательского издателя/подписчика вам необходимо реализовать интерфейсы message.Publisher и message.Subscriber.

Полный исходный код: github.com/ThreeDotsLabs/watermill/message/pubsub.go

// ...
type Publisher interface {
    // Publish публикует предоставленные сообщения в указанную тему.
    //
    // Публикация может выполняться синхронно или асинхронно в зависимости от реализации.
    //
    // Большинство реализаций издателей не поддерживает атомарную публикацию сообщений.
    // Это означает, что если одно сообщение не удается опубликовать, следующее сообщение не будет опубликовано.
    //
    // Publish должен быть потокобезопасным.
    Publish(topic string, messages ...*Message) error
    // Если издатель асинхронный, Close должен сбросить неотправленные сообщения.
    Close() error
}

// Подписчик - потребляющая часть издателя/подписчика.
type Subscriber interface {
    // Subscribe возвращает выходной канал для полученных сообщений из указанной темы.
    // Канал закроется при вызове Close() на подписчике.
    //
    // Чтобы получить следующее сообщение, на полученном сообщении должен быть вызван метод Ack().
    // Если обработка сообщения завершается неудачно и сообщение должно быть возвращено, должен быть вызван метод Nack().
    //
    // Когда предоставленный контекст отменяется, подписчик закроет подписку и выходной канал.
    // Предоставленный контекст устанавливается на всех сгенерированных сообщениях.
    // При вызове методов Nack или Ack на сообщении, контекст сообщения отменяется.
    Subscribe(ctx context.Context, topic string) (

Список дел

Вот несколько моментов, которые вам следует не забывать:

  1. Логирование (хорошие сообщения и соответствующие уровни).
  2. Заменяемые и настраиваемые кодировщики сообщений.
  3. Реализация Close() для издателей и подписчиков должна быть:
    • Идемпотентной
    • Способной работать правильно, когда издатель или подписчик заблокирован (например, ожидание подтверждения)
    • Способной работать правильно, когда выходной канал подписчика заблокирован (потому что на него никто не подписан)
  4. Поддержка Ack() и Nack() для потребляемых сообщений.
  5. Поддержка возврата потребленных сообщений с использованием Nack().
  6. Использование универсального тестирования издателя/подписчика. Вы можете обратиться к руководству по устранению неполадок в тестировании для получения советов по отладке.
  7. Оптимизация производительности.
  8. GoDocs, документация в формате Markdown и вводные примеры.