Publisher and Subscriber são as partes de baixo nível do Watermill. Em aplicações práticas, geralmente você deseja usar interfaces e funções de alto nível, como associações, métricas, filas de mensagens inválidas, tentativas, limitação de taxa, etc.
Às vezes, pode não ser desejável enviar um Ack quando o processamento for bem-sucedido. Em outras situações, você pode querer enviar uma mensagem após o processamento de outra mensagem.
Para atender a esses requisitos, existe um componente chamado Router.
Configuração
Código fonte completo: github.com/ThreeDotsLabs/watermill/message/router.go
// ...
type RouterConfig struct {
// CloseTimeout determina por quanto tempo o roteador deve aguardar os manipuladores ao fechar.
CloseTimeout time.Duration
}
func (c *RouterConfig) setDefaults() {
if c.CloseTimeout == 0 {
c.CloseTimeout = time.Second * 30
}
}
// Validate verifica se há erros na configuração do roteador.
func (c RouterConfig) Validate() error {
return nil
}
// ...
Handler
Primeiro, você precisa implementar a função HandlerFunc
:
Código fonte completo: github.com/ThreeDotsLabs/watermill/message/router.go
// ...
// HandlerFunc é a função chamada quando uma mensagem é recebida.
//
// Quando HandlerFunc não retorna um erro, msg.Ack() será chamado automaticamente.
//
// Quando HandlerFunc retorna um erro, msg.Nack() será chamado.
//
// Quando msg.Ack() é chamado no manipulador e HandlerFunc retorna um erro,
// msg.Nack() não será enviado porque o Ack já foi enviado.
//
// Ao receber várias mensagens (devido ao envio de msg.Ack() em HandlerFunc ou
// Subscriber suportando múltiplos consumidores),
// HandlerFunc será executado concorrentemente.
type HandlerFunc func(msg *Message) ([]*Message, error)
// ...
Em seguida, você precisa usar Router.AddHandler
para adicionar um novo manipulador:
Código fonte completo: github.com/ThreeDotsLabs/watermill/message/router.go
// ...
// AddHandler adiciona um novo manipulador.
//
// handlerName deve ser único. Atualmente, é usado apenas para depuração.
//
// subscribeTopic é o tópico do qual o manipulador receberá mensagens.
//
// publishTopic é o tópico das mensagens retornadas pelo manipulador geradas pelo Router.
//
// Quando o manipulador precisa publicar em vários tópicos,
// é recomendado apenas injetar o Publisher para o manipulador ou implementar middleware,
// que pode capturar mensagens com base em metadados e publicar em tópicos específicos.
//
// Se um manipulador for adicionado enquanto o roteador já estiver em execução, RunHandlers() precisa ser chamado explicitamente.
func (r *Router) AddHandler(
handlerName string,
subscribeTopic string,
subscriber Subscriber,
publishTopic string,
publisher Publisher,
handlerFunc HandlerFunc,
) *Handler {
r.logger.Info("Adicionando manipulador", watermill.LogFields{
"nome_do_manipulador": handlerName,
"tópico": subscribeTopic,
})
r.handlersLock.Lock()
defer r.handlersLock.Unlock()
if _, ok := r.handlers[handlerName]; ok {
panic(DuplicateHandlerNameError{handlerName})
}
publisherName, subscriberName := internal.StructName(publisher), internal.StructName(subscriber)
newHandler := &handler{
nome: handlerName,
logger: r.logger,
subscriber: subscriber,
subscribeTopic: subscribeTopic,
subscriberName: subscriberName,
editor: publisher,
publishTopic: publishTopic,
publisherName: publisherName,
handlerFunc: handlerFunc,
runningHandlersWg: r.runningHandlersWg,
runningHandlersWgLock: r.runningHandlersWgLock,
messagesCh: nil,
routersCloseCh: r.closingInProgressCh,
startedCh: make(chan struct{}),
}
r.handlersWg.Add(1)
r.handlers[handlerName] = newHandler
selecionar {
case r.handlerAdded struct{}{}:
padrão:
// closeWhenAllHandlersStopped nem sempre está esperando por handlerAdded
}
retornar &Handler{
rota: r,
manipulador: newHandler,
}
}
// AddNoPublisherHandler adiciona um novo manipulador.
// Este manipulador não pode retornar mensagens.
// Quando ele retorna uma mensagem, um erro ocorre e um Nack é enviado.
//
// handlerName deve ser único. Atualmente, é usado apenas para depuração.
// subscribeTopic é o tópico do qual o manipulador receberá mensagens.
// subscriber é um assinante usado para consumir mensagens.
//
// Se um manipulador for adicionado enquanto o roteador já estiver em execução, RunHandlers() precisa ser chamado explicitamente.
func (r *Router) AddNoPublisherHandler(
handlerName string,
subscribeTopic string,
subscriber Subscriber,
handlerFunc NoPublishHandlerFunc,
) *Handler {
handlerFuncAdapter := func(msg *Message) ([]*Message, error) {
// ...
Consulte o exemplo de uso em "Primeiros passos". Código fonte completo: github.com/ThreeDotsLabs/watermill/_examples/basic/3-router/main.go
// ...
// AddHandler retorna um handler que pode ser usado para adicionar middleware em nível de handler ou parar handlers.
handler := router.AddHandler(
"estrutura_handler", // nome do handler, deve ser único
"tópico_de_mensagens_entrantes", // tópico de onde os eventos são lidos
pubSub,
"tópico_de_mensagens_saíntes", // tópico para publicar eventos
pubSub,
structHandler{}.Handler,
)
// O middleware em nível de handler é executado apenas para handlers específicos
// Este middleware pode ser adicionado da mesma forma que o middleware em nível de router
handler.AddMiddleware(func(h message.HandlerFunc) message.HandlerFunc {
return func(message *message.Message) ([]*message.Message, error) {
log.Println("Executando middleware específico do handler, UUID da mensagem: ", message.UUID)
return h(message)
}
})
// ...
Sem Manipulador de Publicador
Nem todo manipulador irá gerar uma nova mensagem. Você pode usar Router.AddNoPublisherHandler
para adicionar este tipo de manipulador:
Código fonte completo: github.com/ThreeDotsLabs/watermill/message/router.go
// ...
// AddNoPublisherHandler adiciona um novo manipulador.
// Este manipulador não pode retornar mensagens.
// Quando ele retorna uma mensagem, ocorrerá um erro e um Nack será enviado.
//
// handlerName deve ser único e atualmente é usado apenas para fins de depuração.
//
// subscribeTopic é o tópico no qual o manipulador irá receber mensagens.
//
// subscriber é usado para consumir mensagens.
//
// Se você adicionar um manipulador a um roteador que já está em execução, é preciso chamar explicitamente RunHandlers().
func (r *Router) AddNoPublisherHandler(
handlerName string,
subscribeTopic string,
subscriber Subscriber,
handlerFunc NoPublishHandlerFunc,
) *Handler {
handlerFuncAdapter := func(msg *Message) ([]*Message, error) {
// ...
}
Reconhecimento
Por padrão, quando HanderFunc
não retorna um erro, msg.Ack()
será chamado. Se um erro for retornado, msg.Nack()
será chamado. Portanto, após lidar com a mensagem, você não precisa chamar msg.Ack()
ou msg.Nack
(é claro, você pode se quiser).
Produzindo Mensagens
Quando várias mensagens são retornadas pelo manipulador, observe que a maioria das implementações de Publisher não suportam a publicação atômica de mensagens. Se o corretor ou armazenamento estiver indisponível, apenas algumas mensagens poderão ser geradas e msg.Nack()
será enviado.
Se isso for um problema, considere fazer com que cada manipulador publique apenas uma mensagem.
Executando o Roteador
Para executar o roteador, é necessário chamar Run()
.
Código fonte completo: github.com/ThreeDotsLabs/watermill/message/router.go
// ...
// Run executa todos os plugins e manipuladores e começa a se inscrever nos tópicos fornecidos.
// Esta chamada bloqueia enquanto o roteador está em execução.
//
// Quando todos os manipuladores pararem (por exemplo, porque a inscrição foi fechada), o roteador também parará.
//
// Para interromper Run(), você deve chamar Close() no roteador.
//
// ctx será propagado para todos os assinantes.
//
// Quando todos os manipuladores pararem (por exemplo, devido a conexões fechadas), Run() também será interrompido.
func (r *Router) Run(ctx context.Context) (err error) {
// ...
}
Garantindo que o Roteador Esteja em Execução
Entender se o roteador está em execução pode ser útil. Você pode fazer isso usando o método Running()
.
Código fonte completo: github.com/ThreeDotsLabs/watermill/message/router.go
// ...
// Running fecha quando o roteador está em execução.
// Em outras palavras, você pode esperar que o roteador esteja em execução da seguinte maneira:
// fmt.Println("Iniciando roteador")
// go r.Run(ctx)
// // fmt.Println("Roteador está em execução")
// Aviso: Por razões históricas, este canal não sabe sobre o desligamento do roteador - ele será fechado se o roteador continuar em execução e então desligar.
func (r *Router) Running() chan struct{} {
// ...
}
Você também pode usar a função IsRunning
que retorna um valor booleano:
Código fonte completo: github.com/ThreeDotsLabs/watermill/message/router.go
// ...
// IsRunning retorna true quando o roteador está em execução.
//
// Aviso: Por razões históricas, este método não sabe sobre o estado fechado do roteador.
// Se você quiser saber se o roteador foi fechado, use IsClosed.
func (r *Router) IsRunning() bool {
// ...
}
Desligar o router
Para desligar o router, é necessário chamar Close()
.
Código fonte completo: github.com/ThreeDotsLabs/watermill/message/router.go
// ...
// Close fecha o router de forma graciosa com um timeout fornecido na configuração.
func (r *Router) Close() error {
r.closedLock.Lock()
// ...
Close()
desligará todos os publicadores e assinantes, e aguardará a conclusão de todos os manipuladores.
Close()
aguardará o timeout definido em RouterConfig.CloseTimeout
na configuração. Se o timeout for atingido, Close()
retornará um erro.
Adicionando manipuladores após iniciar o router
Você pode adicionar um novo manipulador quando o router já estiver em execução. Para fazer isso, é necessário chamar AddNoPublisherHandler
ou AddHandler
, e em seguida chamar RunHandlers
.
Código fonte completo: github.com/ThreeDotsLabs/watermill/message/router.go
// ...
// RunHandlers executa todos os manipuladores que foram adicionados após Run().
// RunHandlers é idempotente, podendo ser chamado várias vezes com segurança.
func (r *Router) RunHandlers(ctx context.Context) error {
// ...
Parando manipuladores em execução
Você pode parar apenas um manipulador em execução chamando Stop()
.
Por favor, note que o router será desligado quando não houver manipuladores em execução.
Código fonte completo: github.com/ThreeDotsLabs/watermill/message/router.go
// ...
// Stop interrompe o manipulador.
// Stop é assíncrono.
// É possível verificar se o manipulador foi interrompido com a função Stopped().
func (h *Handler) Stop() {
// ...
Modelo de execução
Assinantes podem consumir uma única mensagem sequencialmente ou várias mensagens em paralelo.
- O Fluxo de mensagem única é o método mais simples, o que significa que os assinantes não receberão novas mensagens até que
msg.Ack()
seja chamado. - O Fluxo de múltiplas mensagens é suportado apenas por determinados assinantes. Ao se inscrever em várias partições de tópicos simultaneamente, várias mensagens podem ser consumidas em paralelo, até mesmo mensagens que não foram previamente reconhecidas (por exemplo, como os assinantes do Kafka funcionam). O router processa esse modelo executando
HandlerFunc
em paralelo.
Por favor, consulte a documentação selecionada de Pub/Sub para entender os modelos de execução suportados.
Middleware
Código fonte completo: github.com/ThreeDotsLabs/watermill/message/router.go
// ...
// HandlerMiddleware nos permite escrever algo semelhante a um decorador para HandlerFunc.
// Ele pode executar algumas operações antes (por exemplo, modificar a mensagem consumida) ou depois do manipulador (modificar a mensagem gerada, reconhecer/negar a mensagem consumida, lidar com erros, fazer log, etc.).
//
// Pode ser anexado ao router usando o método `AddMiddleware`.
//
// Exemplo:
//
// func ExampleMiddleware(h message.HandlerFunc) message.HandlerFunc {
// return func(message *message.Message) ([]*message.Message, error) {
// fmt.Println("executado antes do manipulador")
// mensagensGeradas, err := h(message)
// fmt.Println("executado após o manipulador")
//
// return mensagensGeradas, err
// }
// }
type HandlerMiddleware func(h HandlerFunc) HandlerFunc
// ...
A lista completa de middleware padrão pode ser encontrada em Middlewares.
Plugins
Código fonte completo: github.com/ThreeDotsLabs/watermill/message/router.go
// ...
// RouterPlugin é uma função executada quando o router inicia.
type RouterPlugin func(*Router) error
// ...
A lista completa de plugins padrão pode ser encontrada em message/router/plugin.
Contexto
Alguns valores úteis são armazenados no context
para cada mensagem recebida pelo manipulador:
Código fonte completo: github.com/ThreeDotsLabs/watermill/message/router_context.go
// ...
// HandlerNameFromCtx retorna o nome do manipulador de mensagens no roteador que consumiu a mensagem do contexto.
func HandlerNameFromCtx(ctx context.Context) string {
return valFromCtx(ctx, handlerNameKey)
}
// PublisherNameFromCtx retorna o nome do tipo de publicador de mensagens no roteador a partir do contexto.
// Por exemplo, para Kafka, será `kafka.Publisher`.
func PublisherNameFromCtx(ctx context.Context) string {
return valFromCtx(ctx, publisherNameKey)
}
// SubscriberNameFromCtx retorna o nome do tipo de assinante de mensagens no roteador a partir do contexto.
// Por exemplo, para Kafka, será `kafka.Subscriber`.
func SubscriberNameFromCtx(ctx context.Context) string {
return valFromCtx(ctx, subscriberNameKey)
}
// SubscribeTopicFromCtx retorna o tópico do qual a mensagem foi recebida no roteador a partir do contexto.
func SubscribeTopicFromCtx(ctx context.Context) string {
return valFromCtx(ctx, subscribeTopicKey)
}
// PublishTopicFromCtx retorna o tópico para o qual a mensagem será publicada no roteador a partir do contexto.
func PublishTopicFromCtx(ctx context.Context) string {
return valFromCtx(ctx, publishTopicKey)
}
// ...