Издатель и подписчик являются более низкоуровневыми частями Watermill. В практических приложениях вы обычно хотите использовать высокоуровневые интерфейсы и функции, такие как ассоциации, метрики, очереди ядовитых сообщений, повторы, ограничение скорости и т. д.
Иногда вам может не потребоваться отправлять подтверждение (Ack), когда обработка прошла успешно. Иногда вы можете захотеть отправить сообщение после обработки другого сообщения.
Для удовлетворения этих требований существует компонент, называемый Маршрутизатор (Router).
Конфигурация
Полный исходный код: github.com/ThreeDotsLabs/watermill/message/router.go
// ...
type RouterConfig struct {
// CloseTimeout определяет, как долго маршрутизатор должен работать для обработчиков при закрытии.
CloseTimeout time.Duration
}
func (c *RouterConfig) setDefaults() {
if c.CloseTimeout == 0 {
c.CloseTimeout = time.Second * 30
}
}
// Validate проверяет, есть ли ошибки в конфигурации маршрутизатора.
func (c RouterConfig) Validate() error {
return nil
}
// ...
Обработчик
Сначала вам нужно реализовать функцию HandlerFunc
:
Полный исходный код: github.com/ThreeDotsLabs/watermill/message/router.go
// ...
// HandlerFunc - это функция, вызываемая при получении сообщения.
//
// Когда HandlerFunc не возвращает ошибку, автоматически вызывается msg.Ack().
//
// Когда HandlerFunc возвращает ошибку, вызывается msg.Nack().
//
// Когда msg.Ack() вызывается в обработчике и HandlerFunc возвращает ошибку,
// msg.Nack() не будет отправлен, потому что Ack уже был отправлен.
//
// При получении нескольких сообщений (из-за отправки msg.Ack() в HandlerFunc или поддержки нескольких потребителей подписчиком),
// HandlerFunc будет выполняться параллельно.
type HandlerFunc func(msg *Message) ([]*Message, error)
// ...
Затем вам нужно использовать Router.AddHandler
, чтобы добавить новый обработчик:
Полный исходный код: github.com/ThreeDotsLabs/watermill/message/router.go
// ...
// AddHandler добавляет новый обработчик.
// handlerName должен быть уникальным. В настоящее время он используется только для отладки.
// subscribeTopic - это тема, из которой обработчик будет получать сообщения.
// publishTopic - это тема, из которой возвращаемые сообщения обработчика будут создаваться Router'ом.
// Когда обработчику нужно публиковать в несколько тем,
// рекомендуется либо внедрить Publisher в обработчик, либо реализовать промежуточное ПО,
// которое может захватывать сообщения на основе метаданных и публиковать их в конкретные темы.
// Если обработчик добавляется в то время, когда маршрутизатор уже запущен, необходимо явно вызвать RunHandlers().
func (r *Router) AddHandler(
handlerName string,
subscribeTopic string,
subscriber Subscriber,
publishTopic string,
publisher Publisher,
handlerFunc HandlerFunc,
) *Handler {
r.logger.Info("Добавление обработчика", watermill.LogFields{
"handler_name": handlerName,
"topic": subscribeTopic,
})
r.handlersLock.Lock()
defer r.handlersLock.Unlock()
if _, ok := r.handlers[handlerName]; ok {
panic(DuplicateHandlerNameError{handlerName})
}
publisherName, subscriberName := internal.StructName(publisher), internal.StructName(subscriber)
newHandler := &handler{
name: handlerName,
logger: r.logger,
subscriber: subscriber,
subscribeTopic: subscribeTopic,
subscriberName: subscriberName,
publisher: publisher,
publishTopic: publishTopic,
publisherName: publisherName,
handlerFunc: handlerFunc,
runningHandlersWg: r.runningHandlersWg,
runningHandlersWgLock: r.runningHandlersWgLock,
messagesCh: nil,
routersCloseCh: r.closingInProgressCh,
startedCh: make(chan struct{}),
}
r.handlersWg.Add(1)
r.handlers[handlerName] = newHandler
select {
case r.handlerAdded struct{}{}:
default:
// closeWhenAllHandlersStopped не всегда ждет handlerAdded
}
return &Handler{
router: r,
handler: newHandler,
}
}
// AddNoPublisherHandler добавляет новый обработчик.
// Этот обработчик не может возвращать сообщения.
// Когда он возвращает сообщение, возникает ошибка, и отправляется Nack.
//
// handlerName должен быть уникальным. В настоящее время он используется только для отладки.
// subscribeTopic - это тема, из которой обработчик будет получать сообщения.
// subscriber - это подписчик, используемый для потребления сообщений.
// Если обработчик добавляется в то время, когда маршрутизатор уже запущен, необходимо явно вызвать RunHandlers().
func (r *Router) AddNoPublisherHandler(
handlerName string,
subscribeTopic string,
subscriber Subscriber,
handlerFunc NoPublishHandlerFunc,
) *Handler {
handlerFuncAdapter := func(msg *Message) ([]*Message, error) {
// ...
Обратитесь к примеру использования в "Начало работы". Полный исходный код: github.com/ThreeDotsLabs/watermill/_examples/basic/3-router/main.go
// ...
// AddHandler возвращает обработчик, который можно использовать для добавления промежуточного ПО на уровне обработчика или остановки обработчиков.
handler := router.AddHandler(
"struct_handler", // имя обработчика, должно быть уникальным
"incoming_messages_topic", // тема, из которой считываются события
pubSub,
"outgoing_messages_topic", // тема для публикации событий
pubSub,
structHandler{}.Handler,
)
// Промежуточное ПО на уровне обработчика выполняется только для конкретных обработчиков
// Это промежуточное ПО можно добавить таким же способом, как и на уровне маршрутизатора
handler.AddMiddleware(func(h message.HandlerFunc) message.HandlerFunc {
return func(message *message.Message) ([]*message.Message, error) {
log.Println("Выполнение промежуточного ПО на уровне обработчика, UUID сообщения: ", message.UUID)
return h(message)
}
})
// ...
Обработчик без издателя
Не каждый обработчик будет генерировать новое сообщение. Вы можете использовать Router.AddNoPublisherHandler
, чтобы добавить этот тип обработчика:
Полный исходный код: github.com/ThreeDotsLabs/watermill/message/router.go
// ...
// AddNoPublisherHandler добавляет новый обработчик.
// Этот обработчик не может возвращать сообщения.
// Когда он возвращает сообщение, происходит ошибка и будет отправлено Nack.
//
// handlerName должен быть уникальным и в настоящее время используется только для целей отладки.
//
// subscribeTopic - тема, по которой обработчик будет получать сообщения.
//
// подписчик используется для потребления сообщений.
//
// Если вы добавляете обработчик в роутер, который уже запущен, вам нужно явно вызвать RunHandlers ().
func (r *Router) AddNoPublisherHandler(
handlerName string,
subscribeTopic string,
subscriber Subscriber,
handlerFunc NoPublishHandlerFunc,
) *Handler {
handlerFuncAdapter := func(msg *Message) ([]*Message, error) {
// ...
}
Подтверждение
По умолчанию, когда HanderFunc
не возвращает ошибку, будет вызван msg.Ack()
. Если возникает ошибка, будет вызван msg.Nack()
. Так что после обработки сообщения вам не нужно вызывать msg.Ack()
или msg.Nack
(конечно, вы можете, если хотите).
Генерация сообщений
Когда обработчик возвращает несколько сообщений, обратите внимание, что большинство реализаций издателей не поддерживают атомарную публикацию сообщений. Если брокер или хранилище недоступны, могут быть сгенерированы только некоторые сообщения, и будет отправлено msg.Nack()
.
Если это проблема, рассмотрите возможность публикации только одного сообщения каждым обработчиком.
Запуск роутера
Чтобы запустить роутер, вам нужно вызвать Run()
.
Полный исходный код: github.com/ThreeDotsLabs/watermill/message/router.go
// ...
// Run запускает все плагины и обработчики и начинает подписываться на указанные темы.
// Этот вызов блокируется, пока роутер работает.
//
// Когда все обработчики останавливаются (например, потому что подписка была закрыта), роутер также остановится.
//
// Чтобы остановить Run(), вы должны вызвать Close() на роутере.
//
// ctx будет распространяться на все подписчики.
//
// Когда все обработчики останавливаются (например, из-за закрытых соединений), Run() также остановится.
func (r *Router) Run(ctx context.Context) (err error) {
// ...
}
Подтверждение работы роутера
Понимание того, работает ли роутер, может быть полезно. Для этого вы можете использовать метод Running()
.
Полный исходный код: github.com/ThreeDotsLabs/watermill/message/router.go
// ...
// Running закрывается, когда роутер работает.
// Другими словами, вы можете ждать, пока роутер начнет работать, таким образом:
// fmt.Println("Starting router")
// go r.Run(ctx)
// // fmt.Println("Router is running")
// Предупреждение: По историческим причинам этот канал не знает о закрытии роутера - он закроется, если роутер продолжает работать и затем завершается.
func (r *Router) Running() chan struct{} {
// ...
}
Вы также можете использовать функцию IsRunning
, которая возвращает булево значение:
Полный исходный код: github.com/ThreeDotsLabs/watermill/message/router.go
// ...
// IsRunning возвращает true, когда роутер работает.
//
// Предупреждение: По историческим причинам этот метод не знает о закрытом состоянии роутера.
// Если вы хотите знать, был ли роутер закрыт, используйте IsClosed.
func (r *Router) IsRunning() bool {
// ...
}
Выключение маршрутизатора
Чтобы выключить маршрутизатор, вам нужно вызвать Close()
.
Полный исходный код: github.com/ThreeDotsLabs/watermill/message/router.go
// ...
// Close грациозно закрывает маршрутизатор с заданным временем ожидания в конфигурации.
func (r *Router) Close() error {
r.closedLock.Lock()
// ...
Close()
выключит все издатели и подписчиков, и дождется завершения всех обработчиков.
Close()
будет ждать истечения времени, установленного в RouterConfig.CloseTimeout
в конфигурации. Если время ожидания истекло, Close()
вернет ошибку.
Добавление обработчиков после запуска маршрутизатора
Вы можете добавить новый обработчик, когда маршрутизатор уже запущен. Для этого вам нужно вызвать AddNoPublisherHandler
или AddHandler
, а затем вызвать RunHandlers
.
Полный исходный код: github.com/ThreeDotsLabs/watermill/message/router.go
// ...
// RunHandlers запускает все обработчики, которые были добавлены после Run().
// RunHandlers идемпотентен, поэтому его можно вызывать несколько раз безопасно.
func (r *Router) RunHandlers(ctx context.Context) error {
// ...
Остановка запущенных обработчиков
Можно остановить только один запущенный обработчик, вызвав Stop()
.
Обратите внимание, что маршрутизатор выключится, когда не будет запущенных обработчиков.
Полный исходный код: github.com/ThreeDotsLabs/watermill/message/router.go
// ...
// Stop останавливает обработчик.
// Stop выполняется асинхронно.
// Вы можете проверить, был ли остановлен обработчик, используя функцию Stopped().
func (h *Handler) Stop() {
// ...
Модель выполнения
Подписчики могут потреблять одно сообщение последовательно или несколько сообщений параллельно.
-
Поток одного сообщения - самый простой метод, который означает, что подписчики не будут получать новые сообщения, пока не будет вызвано
msg.Ack()
. -
Поток нескольких сообщений поддерживается только определенными подписчиками. Подписавшись на несколько тематических разделов одновременно, можно потреблять несколько сообщений параллельно, даже сообщения, которые ранее не были подтверждены (например, как работают подписчики Kafka). Маршрутизатор обрабатывает эту модель, запуская
HandlerFunc
параллельно.
Для понимания поддерживаемых моделей выполнения обратитесь к выбранной документации Pub/Sub.
Промежуточное программное обеспечение
Полный исходный код: github.com/ThreeDotsLabs/watermill/message/router.go
// ...
// HandlerMiddleware позволяет нам написать что-то подобное декоратору для HandlerFunc.
// Он может выполнить некоторые операции до (например, изменить полученное сообщение) или после обработчика (изменить сгенерированное сообщение, подтвердить/отклонить полученное сообщение, обработать ошибки, залогировать и т. д.).
//
// Его можно присоединить к маршрутизатору, используя метод `AddMiddleware`.
//
// Пример:
//
// func ExampleMiddleware(h message.HandlerFunc) message.HandlerFunc {
// return func(message *message.Message) ([]*message.Message, error) {
// fmt.Println("выполнено перед обработчиком")
// сгенерированныеСообщения, ошибка := h(message)
// fmt.Println("выполнено после обработчика")
//
// return сгенерированныеСообщения, ошибка
// }
// }
type HandlerMiddleware func(h HandlerFunc) HandlerFunc
// ...
Полный список стандартного промежуточного программного обеспечения можно найти в Middlewares.
Плагины
Полный исходный код: github.com/ThreeDotsLabs/watermill/message/router.go
// ...
// RouterPlugin - это функция, выполняемая при запуске маршрутизатора.
type RouterPlugin func(*Router) error
// ...
Полный список стандартных плагинов можно найти в message/router/plugin.
Контекст
Некоторые полезные значения хранятся в контексте
для каждого сообщения, полученного обработчиком:
Полный исходный код: github.com/ThreeDotsLabs/watermill/message/router_context.go
// ...
// HandlerNameFromCtx возвращает имя обработчика сообщений в маршрутизаторе, который обработал сообщение из контекста.
func HandlerNameFromCtx(ctx context.Context) string {
return valFromCtx(ctx, handlerNameKey)
}
// PublisherNameFromCtx возвращает имя типа издателя сообщений в маршрутизаторе из контекста.
// Например, для Kafka это будет `kafka.Publisher`.
func PublisherNameFromCtx(ctx context.Context) string {
return valFromCtx(ctx, publisherNameKey)
}
// SubscriberNameFromCtx возвращает имя типа подписчика сообщений в маршрутизаторе из контекста.
// Например, для Kafka это будет `kafka.Subscriber`.
func SubscriberNameFromCtx(ctx context.Context) string {
return valFromCtx(ctx, subscriberNameKey)
}
// SubscribeTopicFromCtx возвращает тему, из которой было получено сообщение в маршрутизаторе из контекста.
func SubscribeTopicFromCtx(ctx context.Context) string {
return valFromCtx(ctx, subscribeTopicKey)
}
// PublishTopicFromCtx возвращает тему, в которую будет опубликовано сообщение в маршрутизаторе из контекста.
func PublishTopicFromCtx(ctx context.Context) string {
return valFromCtx(ctx, publishTopicKey)
}
// ...