Пользовательский интерфейс издателя/подписчика
Для добавления поддержки пользовательского издателя/подписчика вам необходимо реализовать интерфейсы message.Publisher
и message.Subscriber
.
Полный исходный код: github.com/ThreeDotsLabs/watermill/message/pubsub.go
// ...
type Publisher interface {
// Publish публикует предоставленные сообщения в указанную тему.
//
// Публикация может выполняться синхронно или асинхронно в зависимости от реализации.
//
// Большинство реализаций издателей не поддерживает атомарную публикацию сообщений.
// Это означает, что если одно сообщение не удается опубликовать, следующее сообщение не будет опубликовано.
//
// Publish должен быть потокобезопасным.
Publish(topic string, messages ...*Message) error
// Если издатель асинхронный, Close должен сбросить неотправленные сообщения.
Close() error
}
// Подписчик - потребляющая часть издателя/подписчика.
type Subscriber interface {
// Subscribe возвращает выходной канал для полученных сообщений из указанной темы.
// Канал закроется при вызове Close() на подписчике.
//
// Чтобы получить следующее сообщение, на полученном сообщении должен быть вызван метод Ack().
// Если обработка сообщения завершается неудачно и сообщение должно быть возвращено, должен быть вызван метод Nack().
//
// Когда предоставленный контекст отменяется, подписчик закроет подписку и выходной канал.
// Предоставленный контекст устанавливается на всех сгенерированных сообщениях.
// При вызове методов Nack или Ack на сообщении, контекст сообщения отменяется.
Subscribe(ctx context.Context, topic string) (
Список дел
Вот несколько моментов, которые вам следует не забывать:
- Логирование (хорошие сообщения и соответствующие уровни).
- Заменяемые и настраиваемые кодировщики сообщений.
- Реализация
Close()
для издателей и подписчиков должна быть:- Идемпотентной
- Способной работать правильно, когда издатель или подписчик заблокирован (например, ожидание подтверждения)
- Способной работать правильно, когда выходной канал подписчика заблокирован (потому что на него никто не подписан)
- Поддержка
Ack()
иNack()
для потребляемых сообщений. - Поддержка возврата потребленных сообщений с использованием
Nack()
. - Использование универсального тестирования издателя/подписчика. Вы можете обратиться к руководству по устранению неполадок в тестировании для получения советов по отладке.
- Оптимизация производительности.
- GoDocs, документация в формате Markdown и вводные примеры.