Instalación

go get -u github.com/ThreeDotsLabs/watermill

Preparación

La idea fundamental detrás de las aplicaciones basadas en eventos siempre es la misma: escuchar mensajes entrantes y reaccionar a ellos. Watermill admite la implementación de este comportamiento para múltiples publicadores y suscriptores.

La parte central de Watermill es el Message, que es tan importante como http.Request en el paquete http. La mayoría de las características de Watermill utilizan esta estructura de alguna manera.

A pesar de las características complejas proporcionadas por la biblioteca PubSub, para Watermill es solo necesario implementar dos interfaces para comenzar a usarlas: Publisher y Subscriber.

type Publisher interface {
	Publish(topic string, messages ...*Message) error
	Close() error
}

type Subscriber interface {
	Subscribe(ctx context.Context, topic string) (<-chan *Message, error)
}

Suscripción a Mensajes

Comencemos con la suscripción. Subscribe espera un nombre de tema y devuelve un canal para recibir mensajes entrantes. El significado específico de tema depende de la implementación de PubSub.

messages, err := subscriber.Subscribe(ctx, "example.topic")
if err != nil {
	panic(err)
}

for msg := range messages {
	fmt.Printf("mensaje recibido: %s, carga útil: %s\n", msg.UUID, string(msg.Payload))
	msg.Ack()
}

Para ejemplos detallados de PubSub compatibles, consulte el siguiente contenido.

Ejemplo de Canal Go

Código de ejemplo completo: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/go-channel/main.go

// ...
package main

import (
	"context"
	"fmt"
	"time"

	"github.com/ThreeDotsLabs/watermill"
	"github.com/ThreeDotsLabs/watermill/message"
	"github.com/ThreeDotsLabs/watermill/pubsub/gochannel"
)

func main() {
	pubSub := gochannel.NewGoChannel(
		gochannel.Config{},
		watermill.NewStdLogger(false, false),
	)

messages, err := pubSub.Subscribe(context.Background(), "example.topic")
if err != nil {
	panic(err)
}

go process(messages)
// ...

Código de ejemplo completo: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/go-channel/main.go

// ...
func process(messages <-chan *message.Message) {
	for msg := range messages {
		fmt.Printf("mensaje recibido: %s, carga útil: %s\n", msg.UUID, string(msg.Payload))

	// Necesitamos confirmar que hemos recibido y procesado el mensaje,
	// de lo contrario será reenviado varias veces.
		msg.Ack()
	}
}

Ejemplo de Kafka

Código fuente completo: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/kafka/main.go

// ...
package main

import (
	"context
	"log"
	"time"

	"github.com/Shopify/sarama"
	"github.com/ThreeDotsLabs/watermill"
	"github.com/ThreeDotsLabs/watermill-kafka/v2/pkg/kafka"
	"github.com/ThreeDotsLabs/watermill/message"
)

func main() {
	saramaSubscriberConfig := kafka.DefaultSaramaSubscriberConfig()
	// Equivalente a auto.offset.reset: earliest
	saramaSubscriberConfig.Consumer.Offsets.Initial = sarama.OffsetOldest

	subscriber, err := kafka.NewSubscriber(
		kafka.SubscriberConfig{
			Brokers:               []string{"kafka:9092"},
			Unmarshaler:           kafka.DefaultMarshaler{},
			OverwriteSaramaConfig: saramaSubscriberConfig,
			ConsumerGroup:         "test_consumer_group",
		},
		watermill.NewStdLogger(false, false),
	)
	if err != nil {
		panic(err)
	}

	messages, err := subscriber.Subscribe(context.Background(), "example.topic")
	if err != nil {
		panic(err)
	}

	go process(messages)
// ...
}

Código fuente completo: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/kafka/main.go

// ...
func process(messages <-chan *message.Message) {
	for msg := range messages {
		log.Printf("mensaje recibido: %s, carga útil: %s", msg.UUID, string(msg.Payload))

		// Necesitamos confirmar que hemos recibido y procesado el mensaje,
		// de lo contrario el mensaje será reenviado repetidamente.
		msg.Ack()
	}
}

Ejemplo de RabbitMQ (AMQP)

Código fuente completo: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/amqp/main.go

// ...
package main

import (
	"context
	"log
	"time"

	"github.com/ThreeDotsLabs/watermill"
	"github.com/ThreeDotsLabs/watermill-amqp/v2/pkg/amqp"
	"github.com/ThreeDotsLabs/watermill/message"
)

var amqpURI = "amqp://guest:guest@rabbitmq:5672/"

func main() {
	amqpConfig := amqp.NewDurableQueueConfig(amqpURI)

	subscriber, err := amqp.NewSubscriber(
		// Esta configuración se basa en el siguiente ejemplo: https://www.rabbitmq.com/tutorials/tutorial-two-go.html
		// Se utiliza como una cola simple.
		//
		// Si desea implementar un servicio de estilo Pub/Sub, consulte
		// https://watermill.io/pubsubs/amqp/#amqp-consumer-groups
		amqpConfig,
		watermill.NewStdLogger(false, false),
	)
	if err != nil {
		panic(err)
	}

	messages, err := subscriber.Subscribe(context.Background(), "example.topic")
	if err != nil {
		panic(err)
	}

	go process(messages)
// ...
}

Código fuente completo: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/amqp/main.go

// ...
func process(messages <-chan *message.Message) {
	for msg := range messages {
		log.Printf("Mensaje recibido: %s, carga útil: %s", msg.UUID, string(msg.Payload))

		// Necesitamos confirmar que el mensaje ha sido recibido y procesado,
		// de lo contrario, el mensaje será redirigido repetidamente.
		msg.Ack()
	}
}

Ejemplo de SQL

Código fuente completo: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/sql/main.go

// ...
package main

import (
	"context"
	stdSQL "database/sql"
	"log"
	"time"

	driver "github.com/go-sql-driver/mysql"

	"github.com/ThreeDotsLabs/watermill"
	"github.com/ThreeDotsLabs/watermill-sql/v2/pkg/sql"
	"github.com/ThreeDotsLabs/watermill/message"
)

func main() {
	db := createDB()
	logger := watermill.NewStdLogger(false, false)

	subscriber, err := sql.NewSubscriber(
		db,
		sql.SubscriberConfig{
			SchemaAdapter:    sql.DefaultMySQLSchema{},
			OffsetsAdapter:   sql.DefaultMySQLOffsetsAdapter{},
			InitializeSchema: true,
		},
		logger,
	)
	if err != nil {
		panic(err)
	}

	messages, err := subscriber.Subscribe(context.Background(), "example_topic")
	if err != nil {
		panic(err)
	}

	go process(messages)
// ...

Código fuente completo: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/sql/main.go

// ...
func process(messages <-chan *message.Message) {
	for msg := range messages {
		log.Printf("Mensaje recibido: %s, carga: %s", msg.UUID, string(msg.Payload))

		// Necesitamos confirmar que el mensaje ha sido recibido y procesado,
		// de lo contrario, el mensaje se entregará repetidamente.
		msg.Ack()
	}
}

Crear mensaje

Watermill no impone ningún formato de mensaje. NewMessage espera que la carga útil sea una serie de bytes. Puede utilizar cadenas, JSON, protobuf, Avro, gob u cualquier otro formato que pueda ser serializado en []byte.

El UUID del mensaje es opcional, pero se recomienda, ya que ayuda en la depuración.

msg := message.NewMessage(watermill.NewUUID(), []byte("¡Hola, mundo!"))

Publicar mensaje

El método Publish requiere un tema y uno o más mensajes para publicar.

err := publisher.Publish("ejemplo.tema", msg)
if err != nil {
    panic(err)
}

Ejemplo de canal de Go

Código fuente completo: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/go-channel/main.go

// ...
	go process(messages)

	publicarMensajes(pubSub)
}

func publicarMensajes(publisher message.Publisher) {
	for {
		msg := message.NewMessage(watermill.NewUUID(), []byte("¡Hola, mundo!"))

		if err := publisher.Publish("ejemplo.tema", msg); err != nil {
			panic(err)
		}

		time.Sleep(time.Second)
// ...

Ejemplo de Kafka

Código fuente completo: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/kafka/main.go

// ...
	go process(messages)

	publisher, err := kafka.NewPublisher(
		kafka.PublisherConfig{
			Brokers:   []string{"kafka:9092"},
			Marshaler: kafka.DefaultMarshaler{},
		},
		watermill.NewStdLogger(false, false),
	)
	if err != nil {
		panic(err)
	}

	publicarMensajes(publisher)
}

func publicarMensajes(publisher message.Publisher) {
	for {
		msg := message.NewMessage(watermill.NewUUID(), []byte("¡Hola, mundo!"))

		if err := publisher.Publish("ejemplo.tema", msg); err != nil {
			panic(err)
		}

		time.Sleep(time.Second)
// ...
### Ejemplo de RabbitMQ (AMQP)

Código fuente completo: [github.com/ThreeDotsLabs/watermill/\_examples/pubsubs/amqp/main.go](https://github.com/ThreeDotsLabs/watermill/tree/master/_examples/pubsubs/amqp/main.go#L37)

```go
// ...
	go process(messages)

	publisher, err := amqp.NewPublisher(amqpConfig, watermill.NewStdLogger(false, false))
	if err != nil {
		panic(err)
	}

	publicarMensajes(publisher)
}

func publicarMensajes(publisher message.Publisher) {
	for {
		msg := message.NewMessage(watermill.NewUUID(), []byte("¡Hola, mundo!"))

		if err := publisher.Publish("ejemplo.tema", msg); err != nil {
			panic(err)
		}

		time.Sleep(time.Second)
// ...

Ejemplo SQL

Código fuente completo: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/sql/main.go

// ...
	go process(messages)

	publisher, err := sql.NewPublisher(
		db,
		sql.PublisherConfig{
			SchemaAdapter: sql.DefaultMySQLSchema{},
		},
		logger,
	)
	if err != nil {
		panic(err)
	}

	publicarMensajes(publisher)
}

func crearDB() *stdSQL.DB {
	conf := driver.NewConfig()
	conf.Net = "tcp"
	conf.User = "root"
	conf.Addr = "mysql"
	conf.DBName = "watermill"

	db, err := stdSQL.Open("mysql", conf.FormatDSN())
	if err != nil {
		panic(err)
	}

	err = db.Ping()
	if err != nil {
		panic(err)
	}

	return db
}

func publicarMensajes(publisher message.Publisher) {
	for {
		msg := message.NewMessage(watermill.NewUUID(), []byte(`{"mensaje": "¡Hola, mundo!"}`))

		if err := publisher.Publish("ejemplo_tema", msg); err != nil {
			panic(err)
		}

		time.Sleep(time.Second)
// ...

Usando Message Router

Publishers and Subscribers son partes de nivel inferior de Watermill. En la mayoría de los casos, es probable que desee utilizar interfaces y funciones de nivel superior, como correlaciones, métricas, colas de errores, reintentos y limitación de velocidad.

Es posible que solo desee confirmar el mensaje una vez que se haya procesado correctamente. En otros casos, es posible que desee confirmarlo inmediatamente y luego considerar su procesamiento. A veces, puede que desee realizar ciertas acciones en función del mensaje entrante y publicar otro mensaje en respuesta.

Para cumplir con estos requisitos, existe un componente llamado Router.

Ejemplo de Aplicación de Message Router

El flujo del ejemplo de la aplicación es el siguiente:

  1. Generar un mensaje en el incoming_messages_topic cada segundo.
  2. El listener struct_handler maneja el incoming_messages_topic. Al recibir un mensaje, imprime el UUID y genera un nuevo mensaje en el outgoing_messages_topic.
  3. El handler print_incoming_messages escucha en el incoming_messages_topic e imprime el UUID, carga útil y metadatos del mensaje.
  4. El handler print_outgoing_messages escucha en el outgoing_messages_topic e imprime el UUID, carga útil y metadatos del mensaje. El ID de correlación debería ser el mismo que el mensaje en el incoming_messages_topic.
## Configuración del enrutador

Primero, configure el enrutador agregando complementos y middleware. Luego, establezca los controladores que usará el enrutador. Cada controlador procesará de forma independiente los mensajes.

Código fuente completo: [github.com/ThreeDotsLabs/watermill/_examples/basic/3-router/main.go](https://github.com/ThreeDotsLabs/watermill/tree/master/_examples/basic/3-router/main.go#L2)
```go
// ...
package main

import (
	"context"
	"fmt
	"log"
	"time"

	"github.com/ThreeDotsLabs/watermill"
	"github.com/ThreeDotsLabs/watermill/message"
	"github.com/ThreeDotsLabs/watermill/message/router/middleware"
	"github.com/ThreeDotsLabs/watermill/message/router/plugin"
	"github.com/ThreeDotsLabs/watermill/pubsub/gochannel"
)

var (
	// Para este ejemplo, usamos una implementación simple de registro,
	// es posible que desee proporcionar su propia implementación de `watermill.LoggerAdapter`.
	logger = watermill.NewStdLogger(false, false)
)

func main() {
	router, err := message.NewRouter(message.RouterConfig{}, logger)
	if err != nil {
		panic(err)
	}

	// Al recibir la señal SIGTERM, SignalsHandler cerrará el enrutador de forma segura.
	// También puede cerrar el enrutador llamando a `r.Close()`.
	router.AddPlugin(plugin.SignalsHandler)

	// El middleware a nivel de enrutador se ejecutará para cada mensaje enviado al enrutador.
	router.AddMiddleware(
		// CorrelationID copia el ID de correlación de los metadatos del mensaje entrante al mensaje generado.
		middleware.CorrelationID,

		// Si el controlador devuelve un error, reintente la función del controlador.
		// Después de alcanzar MaxRetries, el mensaje se rechaza y PubSub es responsable de reenviarlo.
		middleware.Retry{
			MaxRetries:      3,
			InitialInterval: time.Millisecond * 100,
			Logger:          logger,
		}.Middleware,

		// Recoverer maneja los fallos en el controlador.
		// En este caso, los pasa como errores al middleware Retry.
		middleware.Recoverer,
	)

	// Por simplicidad, usamos gochannel Pub/Sub aquí,
	// puedes reemplazarlo con cualquier implementación de Pub/Sub y el efecto será el mismo.
	pubSub := gochannel.NewGoChannel(gochannel.Config{}, logger)

	// Generar algunos mensajes entrantes en segundo plano
	go publishMessages(pubSub)

	// AddHandler devuelve un controlador que se puede utilizar para agregar middleware a nivel de controlador
	// o para detener el controlador.
	handler := router.AddHandler(
		"struct_handler",          // Nombre del controlador, debe ser único
		"incoming_messages_topic", // Tema desde el cual se leen los eventos
		pubSub,
		"outgoing_messages_topic", // Tema al cual se publican los eventos
		pubSub,
		structHandler{}.Handler,
	)

	// El middleware a nivel de controlador se ejecuta solo para un controlador específico
	// Este tipo de middleware se puede agregar de la misma manera que el middleware a nivel de enrutador
	handler.AddMiddleware(func(h message.HandlerFunc) message.HandlerFunc {
		return func(message *message.Message) ([]*message.Message, error) {
			log.Println("Ejecutando middleware específico del controlador para", message.UUID)

			return h(message)
		}
	})

	// Por propósitos de depuración, imprimimos todos los mensajes recibidos en "incoming_messages_topic"
	router.AddNoPublisherHandler(
		"print_incoming_messages",
		"incoming_messages_topic",
		pubSub,
		printMessages,
	)

	// Por propósitos de depuración, imprimimos todos los eventos enviados a "outgoing_messages_topic"
	router.AddNoPublisherHandler(
		"print_outgoing_messages",
		"outgoing_messages_topic",
		pubSub,
		printMessages,
	)

	// Ahora que todos los controladores están registrados, ejecutamos el enrutador.
	// Run bloquea mientras el enrutador está en ejecución.
	ctx := context.Background()
	if err := router.Run(ctx); err != nil {
		panic(err)
	}
}
// ...

Mensajes entrantes

El struct_handler consume mensajes del incoming_messages_topic, por lo que simulamos tráfico entrante llamando a publishMessages() en segundo plano. Tenga en cuenta que hemos agregado el middleware SetCorrelationID. El enrutador agregará un ID de correlación a todos los mensajes generados (almacenados en los metadatos).

Código fuente completo: github.com/ThreeDotsLabs/watermill/_examples/basic/3-router/main.go

// …
func publishMessages(publisher message.Publisher) {
    for {
        msg := message.NewMessage(watermill.NewUUID(), []byte("¡Hola, mundo!"))
        middleware.SetCorrelationID(watermill.NewUUID(), msg)

        log.Printf("enviando mensaje %s, ID de correlación: %s\n", msg.UUID, middleware.MessageCorrelationID(msg))

        if err := publisher.Publish("incoming_messages_topic", msg); err != nil {
            panic(err)
        }

        time.Sleep(time.Second)
    }
}
// …

Controladores

Quizás haya notado que hay dos tipos de funciones controladoras:

  1. Función func(msg *message.Message) ([]*message.Message, error)
  2. Método func (c structHandler) Handler(msg *message.Message) ([]*message.Message, error)

Si su controlador es una función que no depende de ninguna dependencia, usar la primera opción está bien. Cuando su controlador requiere algunas dependencias (como manejadores de base de datos, registradores, etc.), la segunda opción es útil.

Código fuente completo: github.com/ThreeDotsLabs/watermill/_examples/basic/3-router/main.go

// …
func printMessages(msg *message.Message) error {
    fmt.Printf(
        "\n> Mensaje recibido: %s\n> %s\n> metadatos: %v\n\n",
        msg.UUID, string(msg.Payload), msg.Metadata,
    )
    return nil
}

type structHandler struct {
    // Aquí podemos agregar algunas dependencias
}

func (s structHandler) Handler(msg *message.Message) ([]*message.Message, error) {
    log.Println("structHandler ha recibido el mensaje", msg.UUID)

    msg = message.NewMessage(watermill.NewUUID(), []byte("mensaje producido por structHandler"))
    return message.Messages{msg}, nil
}

¡Hecho!

Puede ejecutar este ejemplo con go run main.go.