Publisher and Subscriber son las partes de nivel inferior de Watermill. En aplicaciones prácticas, generalmente se desea utilizar interfaces y funciones de alto nivel, como asociaciones, métricas, colas de mensajes envenenados, reintentos, limitación de velocidad, etc.

A veces, es posible que no desee enviar un Ack cuando el procesamiento sea exitoso. O bien, es posible que desee enviar un mensaje después de que se procese otro mensaje.

Para cumplir con estos requisitos, existe un componente llamado Router.

Enrutador de Watermill

Configuración

Código fuente completo: github.com/ThreeDotsLabs/watermill/message/router.go

// ...
type RouterConfig struct {
	// CloseTimeout determina cuánto tiempo debe trabajar el enrutador para los handlers al cerrarse.
	CloseTimeout time.Duration
}

func (c *RouterConfig) setDefaults() {
	if c.CloseTimeout == 0 {
		c.CloseTimeout = time.Second * 30
	}
}

// Validate comprueba si hay algún error en la configuración del enrutador.
func (c RouterConfig) Validate() error {
	return nil
}
// ...

Controlador

En primer lugar, es necesario implementar la función HandlerFunc:

Código fuente completo: github.com/ThreeDotsLabs/watermill/message/router.go

// ...

// HandlerFunc es la función que se llama cuando se recibe un mensaje.
// 
// Cuando HandlerFunc no devuelve un error, se llamará automáticamente a msg.Ack().
// 
// Cuando HandlerFunc devuelve un error, se llamará a msg.Nack().
// 
// Cuando se llama a msg.Ack() en el controlador y HandlerFunc devuelve un error,
// no se enviará msg.Nack() porque ya se ha enviado el Ack.
// 
// Al recibir varios mensajes (debido a que se envía msg.Ack() en HandlerFunc o el Suscriptor admite múltiples consumidores),
// HandlerFunc se ejecutará de forma concurrente.
type HandlerFunc func(msg *Message) ([]*Message, error)

// ...

A continuación, es necesario utilizar Router.AddHandler para añadir un nuevo controlador:

Código fuente completo: github.com/ThreeDotsLabs/watermill/message/router.go

// ...

// AddHandler añade un nuevo controlador.

// handlerName debe ser único. Actualmente, solo se utiliza para la depuración.

// subscribeTopic es el tema desde el que el controlador recibirá mensajes.

// publishTopic es el tema desde el que los mensajes devueltos por el controlador serán generados por el Router.

// Cuando el controlador necesita publicar en varios temas,

// se recomienda inyectar solo el Publicador en el controlador o implementar un middleware,

// que pueda capturar mensajes basados en metadatos y publicar en temas específicos.

// Si se añade un controlador mientras el enrutador ya está en ejecución, es necesario llamar explícitamente a RunHandlers().

func (r *Router) AddHandler(

	handlerName string,

	subscribeTopic string,

	subscriber Subscriber,

	publishTopic string,

	publisher Publisher,

	handlerFunc HandlerFunc,

) *Handler {

	r.logger.Info("Añadiendo controlador", watermill.LogFields{

		"nombre_controlador": handlerName,

		"tema":        subscribeTopic,

	})

	r.handlersLock.Lock()

	defer r.handlersLock.Unlock()

	if _, ok := r.handlers[handlerName]; ok {

		panic(DuplicateHandlerNameError{handlerName})

	}

	publisherName, subscriberName := internal.StructName(publisher), internal.StructName(subscriber)

	newHandler := &handler{

		nombre:   handlerName,

		registrador: r.registrador,

		suscriptor:     subscriber,

		temaSuscripción: subscribeTopic,

		nombreSuscriptor: subscriberName,

		publicador:     publisher,

		temaPublicación:  publishTopic,

		nombrePublicador: publisherName,

		handlerFunc: handlerFunc,

		runningHandlersWg:     r.runningHandlersWg,

		runningHandlersWgLock: r.runningHandlersWgLock,

		messagesCh:     nil,

		routersCloseCh: r.closingInProgressCh,

		startedCh: make(chan struct{}),

	}

	r.handlersWg.Add(1)

	r.handlers[handlerName] = newHandler

	select {

	case r.handlerAdded struct{}{}:

	default:

		// closeWhenAllHandlersStopped no siempre está esperando handlerAdded

	}

	return &Handler{

		router:  r,

		controlador: newHandler,

	}

}

// AddNoPublisherHandler añade un nuevo controlador.

// Este controlador no puede devolver mensajes.

// Cuando devuelve un mensaje, ocurre un error y se envía un Nack.

//

// handlerName debe ser único. Actualmente, solo se utiliza para la depuración.

// subscribeTopic es el tema desde el que el controlador recibirá mensajes.

// subscriber es un suscriptor utilizado para consumir mensajes.

// Si se añade un controlador mientras el enrutador ya está en ejecución, es necesario llamar explícitamente a RunHandlers().

func (r *Router) AddNoPublisherHandler(

	handlerName string,

	subscribeTopic string,

	subscriber Subscriber,

handlerFunc NoPublishHandlerFunc,

) *Handler {

handlerFuncAdapter := func(msg *Message) ([]*Message, error) {
// ...

Haga referencia al ejemplo de uso en "Introducción". Código fuente completo: github.com/ThreeDotsLabs/watermill/_examples/basic/3-router/main.go

// ...
	// AddHandler devuelve un controlador que se puede utilizar para agregar middleware a nivel de controlador o detener controladores.
	controlador := router.AddHandler(
		"struct_handler",          // nombre del controlador, debe ser único
		"incoming_messages_topic", // tema desde el cual se leen los eventos
		pubSub,
		"outgoing_messages_topic", // tema para publicar eventos
		pubSub,
		structHandler{}.Handler,
	)

	// El middleware a nivel de controlador solo se ejecuta para controladores específicos
	// Este middleware se puede agregar de la misma manera que el middleware a nivel de enrutador
	controlador.AddMiddleware(func(h message.HandlerFunc) message.HandlerFunc {
		return func(mensaje *message.Message) ([]*message.Message, error) {
			log.Println("Ejecutando middleware específico del controlador, UUID del mensaje: ", mensaje.UUID)

			return h(mensaje)
		}
	})
// ...

Sin gestor de publicaciones

No todos los gestores generarán un nuevo mensaje. Puede usar Router.AddNoPublisherHandler para agregar este tipo de gestor:

Código fuente completo: github.com/ThreeDotsLabs/watermill/message/router.go

// ...
// AddNoPublisherHandler agrega un nuevo gestor.
// Este gestor no puede devolver mensajes.
// Cuando devuelve un mensaje, se producirá un error y se enviará un Nack.
//
// handlerName debe ser único y actualmente solo se usa con fines de depuración.
//
// subscribeTopic es el tema en el que el gestor recibirá mensajes.
//
// subscriber se usa para consumir mensajes.
//
// Si agrega un gestor a un enrutador que ya está en funcionamiento, debe llamar explícitamente a RunHandlers().
func (r *Router) AddNoPublisherHandler(
	handlerName string,
	subscribeTopic string,
	subscriber Subscriber,
	handlerFunc NoPublishHandlerFunc,
) *Handler {
	handlerFuncAdapter := func(msg *Message) ([]*Message, error) {
// ...
}

Reconocimiento

Por defecto, cuando HanderFunc no devuelve un error, se llamará a msg.Ack(). Si se devuelve un error, se llamará a msg.Nack(). Por lo tanto, después de manejar el mensaje, no es necesario llamar a msg.Ack() o msg.Nack (por supuesto, puedes hacerlo si quieres).

Producción de mensajes

Cuando el gestor devuelve varios mensajes, tenga en cuenta que la mayoría de las implementaciones de Publisher no admiten la publicación atómica de mensajes. Si el broker o el almacenamiento no está disponible, es posible que solo se generen algunos mensajes y se enviará msg.Nack().

Si esto es un problema, considere hacer que cada gestor publique solo un mensaje.

Ejecución del enrutador

Para ejecutar el enrutador, debe llamar a Run().

Código fuente completo: github.com/ThreeDotsLabs/watermill/message/router.go

// ...
// Run ejecuta todos los complementos y gestores, y comienza a suscribirse a los temas dados.
// Esta llamada bloquea mientras el enrutador está en funcionamiento.
//
// Cuando todos los gestores se detienen (por ejemplo, porque la suscripción se ha cerrado), el enrutador también se detendrá.
//
// Para detener Run(), debe llamar a Close() en el enrutador.
//
// ctx se propagará a todos los suscriptores.
//
// Cuando todos los gestores se detienen (por ejemplo, por conexiones cerradas), Run() también se detendrá.
func (r *Router) Run(ctx context.Context) (err error) {
// ...
}

Asegurando que el enrutador esté en funcionamiento

Puede ser útil comprender si el enrutador está en funcionamiento. Puede lograr esto utilizando el método Running().

Código fuente completo: github.com/ThreeDotsLabs/watermill/message/router.go

// ...
// Running se cierra cuando el enrutador está en funcionamiento.
// En otras palabras, puede esperar a que el enrutador esté en funcionamiento de la siguiente manera:

// 	fmt.Println("Iniciando enrutador")
//	go r.Run(ctx)
//	//	fmt.Println("El enrutador está en funcionamiento")

// Advertencia: Por razones históricas, este canal no conoce el apagado del enrutador; se cerrará si el enrutador sigue en funcionamiento y luego se apaga.
func (r *Router) Running() chan struct{} {
// ...
}

También puede utilizar la función IsRunning que devuelve un valor booleano:

Código fuente completo: github.com/ThreeDotsLabs/watermill/message/router.go

// ...
// IsRunning devuelve true cuando el enrutador está en funcionamiento.
//
// Advertencia: Por razones históricas, este método no conoce el estado cerrado del enrutador.
// Si desea saber si el enrutador ha sido cerrado, use IsClosed.
func (r *Router) IsRunning() bool {
// ...
}

Apagar el enrutador

Para apagar el enrutador, es necesario llamar a Close().

Código fuente completo: github.com/ThreeDotsLabs/watermill/message/router.go

// ...
// Close cierra el enrutador de manera apropiada con un tiempo de espera proporcionado en la configuración.
func (r *Router) Close() error {
	r.closedLock.Lock()
// ...

Close() apagará todos los publicadores y suscriptores, y esperará a que todos los manejadores finalicen.

Close() esperará al tiempo de espera establecido en RouterConfig.CloseTimeout en la configuración. Si se alcanza el tiempo de espera, Close() devolverá un error.

Agregar manejadores después de iniciar el enrutador

Se puede agregar un nuevo manejador cuando el enrutador ya está en funcionamiento. Para hacerlo, es necesario llamar a AddNoPublisherHandler o AddHandler, y luego llamar a RunHandlers.

Código fuente completo: github.com/ThreeDotsLabs/watermill/message/router.go

// ...
// RunHandlers ejecuta todos los manejadores que se agregaron después de Run().
// RunHandlers es idempotente, por lo que se puede llamar de manera segura varias veces.
func (r *Router) RunHandlers(ctx context.Context) error {
// ...

Detener los manejadores en ejecución

Se puede detener solo un manejador en ejecución llamando a Stop().

Por favor, tenga en cuenta que el enrutador se apagará cuando no haya manejadores en ejecución.

Código fuente completo: github.com/ThreeDotsLabs/watermill/message/router.go

// ...
// Stop detiene el manejador.
// Stop es asíncrono.
// Se puede comprobar si el manejador se detuvo con la función Stopped().
func (h *Handler) Stop() {
// ...

Modelo de ejecución

Los suscriptores pueden consumir un solo mensaje de forma secuencial o múltiples mensajes en paralelo.

  • El flujo de mensaje único es el método más simple, lo que significa que los suscriptores no recibirán nuevos mensajes hasta que se llame a msg.Ack().
  • El flujo de múltiples mensajes es compatible solo para ciertos suscriptores. Al suscribirse a múltiples particiones de tema simultáneamente, se pueden consumir múltiples mensajes en paralelo, incluso mensajes que no fueron previamente reconocidos (por ejemplo, cómo trabajan los suscriptores de Kafka). El enrutador procesa este modelo ejecutando HandlerFunc en paralelo.

Consulte la documentación seleccionada de Pub/Sub para entender los modelos de ejecución admitidos.

Middleware

Código fuente completo: github.com/ThreeDotsLabs/watermill/message/router.go

// ...
// HandlerMiddleware nos permite escribir algo similar a un decorador para HandlerFunc.
// Puede ejecutar algunas operaciones antes (por ejemplo, modificar el mensaje consumido) o después del manejador (modificar el mensaje generado, reconocer/no reconocer el mensaje consumido, manejar errores, registrar, etc.).
//
// Se puede adjuntar al enrutador utilizando el método `AddMiddleware`.
//
// Ejemplo:
//
// 	func ExampleMiddleware(h message.HandlerFunc) message.HandlerFunc {
// 		return func(message *message.Message) ([]*message.Message, error) {
// 			fmt.Println("ejecutado antes del manejador")
// 			mensajesProducidos, err := h(message)
// 			fmt.Println("ejecutado después del manejador")
//
// 			return mensajesProducidos, err
// 		}
// 	}
type HandlerMiddleware func(h HandlerFunc) HandlerFunc

// ...

La lista completa de middleware estándar se puede encontrar en Middlewares.

Plugins

Código fuente completo: github.com/ThreeDotsLabs/watermill/message/router.go

// ...
// RouterPlugin es una función ejecutada cuando se inicia el enrutador.
type RouterPlugin func(*Router) error

// ...

La lista completa de plugins estándar se puede encontrar en message/router/plugin.

Contexto

Algunos valores útiles se almacenan en el context para cada mensaje recibido por el manejador:

Código fuente completo: github.com/ThreeDotsLabs/watermill/message/router_context.go

// ...
// HandlerNameFromCtx devuelve el nombre del manejador de mensajes en el enrutador que consumió el mensaje del contexto.
func HandlerNameFromCtx(ctx context.Context) string {
	return valFromCtx(ctx, handlerNameKey)
}

// PublisherNameFromCtx devuelve el nombre del tipo de publicador de mensajes en el enrutador desde el contexto.
// Por ejemplo, para Kafka, será `kafka.Publisher`.
func PublisherNameFromCtx(ctx context.Context) string {
	return valFromCtx(ctx, publisherNameKey)
}

// SubscriberNameFromCtx devuelve el nombre del tipo de suscriptor de mensajes en el enrutador desde el contexto.
// Por ejemplo, para Kafka, será `kafka.Subscriber`.
func SubscriberNameFromCtx(ctx context.Context) string {
	return valFromCtx(ctx, subscriberNameKey)
}

// SubscribeTopicFromCtx devuelve el tema desde el cual se recibió el mensaje en el enrutador desde el contexto.
func SubscribeTopicFromCtx(ctx context.Context) string {
	return valFromCtx(ctx, subscribeTopicKey)
}

// PublishTopicFromCtx devuelve el tema al cual se publicará el mensaje en el enrutador desde el contexto.
func PublishTopicFromCtx(ctx context.Context) string {
	return valFromCtx(ctx, publishTopicKey)
}
// ...