Instalación
go get -u github.com/ThreeDotsLabs/watermill
Preparación
La idea fundamental detrás de las aplicaciones basadas en eventos siempre es la misma: escuchar mensajes entrantes y reaccionar a ellos. Watermill admite la implementación de este comportamiento para múltiples publicadores y suscriptores.
La parte central de Watermill es el Message, que es tan importante como http.Request
en el paquete http
. La mayoría de las características de Watermill utilizan esta estructura de alguna manera.
A pesar de las características complejas proporcionadas por la biblioteca PubSub, para Watermill es solo necesario implementar dos interfaces para comenzar a usarlas: Publisher
y Subscriber
.
type Publisher interface {
Publish(topic string, messages ...*Message) error
Close() error
}
type Subscriber interface {
Subscribe(ctx context.Context, topic string) (<-chan *Message, error)
}
Suscripción a Mensajes
Comencemos con la suscripción. Subscribe
espera un nombre de tema y devuelve un canal para recibir mensajes entrantes. El significado específico de tema depende de la implementación de PubSub.
messages, err := subscriber.Subscribe(ctx, "example.topic")
if err != nil {
panic(err)
}
for msg := range messages {
fmt.Printf("mensaje recibido: %s, carga útil: %s\n", msg.UUID, string(msg.Payload))
msg.Ack()
}
Para ejemplos detallados de PubSub compatibles, consulte el siguiente contenido.
Ejemplo de Canal Go
Código de ejemplo completo: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/go-channel/main.go
// ...
package main
import (
"context"
"fmt"
"time"
"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill/message"
"github.com/ThreeDotsLabs/watermill/pubsub/gochannel"
)
func main() {
pubSub := gochannel.NewGoChannel(
gochannel.Config{},
watermill.NewStdLogger(false, false),
)
messages, err := pubSub.Subscribe(context.Background(), "example.topic")
if err != nil {
panic(err)
}
go process(messages)
// ...
Código de ejemplo completo: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/go-channel/main.go
// ...
func process(messages <-chan *message.Message) {
for msg := range messages {
fmt.Printf("mensaje recibido: %s, carga útil: %s\n", msg.UUID, string(msg.Payload))
// Necesitamos confirmar que hemos recibido y procesado el mensaje,
// de lo contrario será reenviado varias veces.
msg.Ack()
}
}
Ejemplo de Kafka
Código fuente completo: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/kafka/main.go
// ...
package main
import (
"context
"log"
"time"
"github.com/Shopify/sarama"
"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill-kafka/v2/pkg/kafka"
"github.com/ThreeDotsLabs/watermill/message"
)
func main() {
saramaSubscriberConfig := kafka.DefaultSaramaSubscriberConfig()
// Equivalente a auto.offset.reset: earliest
saramaSubscriberConfig.Consumer.Offsets.Initial = sarama.OffsetOldest
subscriber, err := kafka.NewSubscriber(
kafka.SubscriberConfig{
Brokers: []string{"kafka:9092"},
Unmarshaler: kafka.DefaultMarshaler{},
OverwriteSaramaConfig: saramaSubscriberConfig,
ConsumerGroup: "test_consumer_group",
},
watermill.NewStdLogger(false, false),
)
if err != nil {
panic(err)
}
messages, err := subscriber.Subscribe(context.Background(), "example.topic")
if err != nil {
panic(err)
}
go process(messages)
// ...
}
Código fuente completo: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/kafka/main.go
// ...
func process(messages <-chan *message.Message) {
for msg := range messages {
log.Printf("mensaje recibido: %s, carga útil: %s", msg.UUID, string(msg.Payload))
// Necesitamos confirmar que hemos recibido y procesado el mensaje,
// de lo contrario el mensaje será reenviado repetidamente.
msg.Ack()
}
}
Ejemplo de RabbitMQ (AMQP)
Código fuente completo: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/amqp/main.go
// ...
package main
import (
"context
"log
"time"
"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill-amqp/v2/pkg/amqp"
"github.com/ThreeDotsLabs/watermill/message"
)
var amqpURI = "amqp://guest:guest@rabbitmq:5672/"
func main() {
amqpConfig := amqp.NewDurableQueueConfig(amqpURI)
subscriber, err := amqp.NewSubscriber(
// Esta configuración se basa en el siguiente ejemplo: https://www.rabbitmq.com/tutorials/tutorial-two-go.html
// Se utiliza como una cola simple.
//
// Si desea implementar un servicio de estilo Pub/Sub, consulte
// https://watermill.io/pubsubs/amqp/#amqp-consumer-groups
amqpConfig,
watermill.NewStdLogger(false, false),
)
if err != nil {
panic(err)
}
messages, err := subscriber.Subscribe(context.Background(), "example.topic")
if err != nil {
panic(err)
}
go process(messages)
// ...
}
Código fuente completo: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/amqp/main.go
// ...
func process(messages <-chan *message.Message) {
for msg := range messages {
log.Printf("Mensaje recibido: %s, carga útil: %s", msg.UUID, string(msg.Payload))
// Necesitamos confirmar que el mensaje ha sido recibido y procesado,
// de lo contrario, el mensaje será redirigido repetidamente.
msg.Ack()
}
}
Ejemplo de SQL
Código fuente completo: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/sql/main.go
// ...
package main
import (
"context"
stdSQL "database/sql"
"log"
"time"
driver "github.com/go-sql-driver/mysql"
"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill-sql/v2/pkg/sql"
"github.com/ThreeDotsLabs/watermill/message"
)
func main() {
db := createDB()
logger := watermill.NewStdLogger(false, false)
subscriber, err := sql.NewSubscriber(
db,
sql.SubscriberConfig{
SchemaAdapter: sql.DefaultMySQLSchema{},
OffsetsAdapter: sql.DefaultMySQLOffsetsAdapter{},
InitializeSchema: true,
},
logger,
)
if err != nil {
panic(err)
}
messages, err := subscriber.Subscribe(context.Background(), "example_topic")
if err != nil {
panic(err)
}
go process(messages)
// ...
Código fuente completo: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/sql/main.go
// ...
func process(messages <-chan *message.Message) {
for msg := range messages {
log.Printf("Mensaje recibido: %s, carga: %s", msg.UUID, string(msg.Payload))
// Necesitamos confirmar que el mensaje ha sido recibido y procesado,
// de lo contrario, el mensaje se entregará repetidamente.
msg.Ack()
}
}
Crear mensaje
Watermill no impone ningún formato de mensaje. NewMessage
espera que la carga útil sea una serie de bytes. Puede utilizar cadenas, JSON, protobuf, Avro, gob u cualquier otro formato que pueda ser serializado en []byte
.
El UUID del mensaje es opcional, pero se recomienda, ya que ayuda en la depuración.
msg := message.NewMessage(watermill.NewUUID(), []byte("¡Hola, mundo!"))
Publicar mensaje
El método Publish
requiere un tema y uno o más mensajes para publicar.
err := publisher.Publish("ejemplo.tema", msg)
if err != nil {
panic(err)
}
Ejemplo de canal de Go
Código fuente completo: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/go-channel/main.go
// ...
go process(messages)
publicarMensajes(pubSub)
}
func publicarMensajes(publisher message.Publisher) {
for {
msg := message.NewMessage(watermill.NewUUID(), []byte("¡Hola, mundo!"))
if err := publisher.Publish("ejemplo.tema", msg); err != nil {
panic(err)
}
time.Sleep(time.Second)
// ...
Ejemplo de Kafka
Código fuente completo: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/kafka/main.go
// ...
go process(messages)
publisher, err := kafka.NewPublisher(
kafka.PublisherConfig{
Brokers: []string{"kafka:9092"},
Marshaler: kafka.DefaultMarshaler{},
},
watermill.NewStdLogger(false, false),
)
if err != nil {
panic(err)
}
publicarMensajes(publisher)
}
func publicarMensajes(publisher message.Publisher) {
for {
msg := message.NewMessage(watermill.NewUUID(), []byte("¡Hola, mundo!"))
if err := publisher.Publish("ejemplo.tema", msg); err != nil {
panic(err)
}
time.Sleep(time.Second)
// ...
### Ejemplo de RabbitMQ (AMQP)
Código fuente completo: [github.com/ThreeDotsLabs/watermill/\_examples/pubsubs/amqp/main.go](https://github.com/ThreeDotsLabs/watermill/tree/master/_examples/pubsubs/amqp/main.go#L37)
```go
// ...
go process(messages)
publisher, err := amqp.NewPublisher(amqpConfig, watermill.NewStdLogger(false, false))
if err != nil {
panic(err)
}
publicarMensajes(publisher)
}
func publicarMensajes(publisher message.Publisher) {
for {
msg := message.NewMessage(watermill.NewUUID(), []byte("¡Hola, mundo!"))
if err := publisher.Publish("ejemplo.tema", msg); err != nil {
panic(err)
}
time.Sleep(time.Second)
// ...
Ejemplo SQL
Código fuente completo: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/sql/main.go
// ...
go process(messages)
publisher, err := sql.NewPublisher(
db,
sql.PublisherConfig{
SchemaAdapter: sql.DefaultMySQLSchema{},
},
logger,
)
if err != nil {
panic(err)
}
publicarMensajes(publisher)
}
func crearDB() *stdSQL.DB {
conf := driver.NewConfig()
conf.Net = "tcp"
conf.User = "root"
conf.Addr = "mysql"
conf.DBName = "watermill"
db, err := stdSQL.Open("mysql", conf.FormatDSN())
if err != nil {
panic(err)
}
err = db.Ping()
if err != nil {
panic(err)
}
return db
}
func publicarMensajes(publisher message.Publisher) {
for {
msg := message.NewMessage(watermill.NewUUID(), []byte(`{"mensaje": "¡Hola, mundo!"}`))
if err := publisher.Publish("ejemplo_tema", msg); err != nil {
panic(err)
}
time.Sleep(time.Second)
// ...
Usando Message Router
Publishers and Subscribers son partes de nivel inferior de Watermill. En la mayoría de los casos, es probable que desee utilizar interfaces y funciones de nivel superior, como correlaciones, métricas, colas de errores, reintentos y limitación de velocidad.
Es posible que solo desee confirmar el mensaje una vez que se haya procesado correctamente. En otros casos, es posible que desee confirmarlo inmediatamente y luego considerar su procesamiento. A veces, puede que desee realizar ciertas acciones en función del mensaje entrante y publicar otro mensaje en respuesta.
Para cumplir con estos requisitos, existe un componente llamado Router.
Ejemplo de Aplicación de Message Router
El flujo del ejemplo de la aplicación es el siguiente:
- Generar un mensaje en el
incoming_messages_topic
cada segundo. - El listener
struct_handler
maneja elincoming_messages_topic
. Al recibir un mensaje, imprime el UUID y genera un nuevo mensaje en eloutgoing_messages_topic
. - El handler
print_incoming_messages
escucha en elincoming_messages_topic
e imprime el UUID, carga útil y metadatos del mensaje. - El handler
print_outgoing_messages
escucha en eloutgoing_messages_topic
e imprime el UUID, carga útil y metadatos del mensaje. El ID de correlación debería ser el mismo que el mensaje en elincoming_messages_topic
.
## Configuración del enrutador
Primero, configure el enrutador agregando complementos y middleware. Luego, establezca los controladores que usará el enrutador. Cada controlador procesará de forma independiente los mensajes.
Código fuente completo: [github.com/ThreeDotsLabs/watermill/_examples/basic/3-router/main.go](https://github.com/ThreeDotsLabs/watermill/tree/master/_examples/basic/3-router/main.go#L2)
```go
// ...
package main
import (
"context"
"fmt
"log"
"time"
"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill/message"
"github.com/ThreeDotsLabs/watermill/message/router/middleware"
"github.com/ThreeDotsLabs/watermill/message/router/plugin"
"github.com/ThreeDotsLabs/watermill/pubsub/gochannel"
)
var (
// Para este ejemplo, usamos una implementación simple de registro,
// es posible que desee proporcionar su propia implementación de `watermill.LoggerAdapter`.
logger = watermill.NewStdLogger(false, false)
)
func main() {
router, err := message.NewRouter(message.RouterConfig{}, logger)
if err != nil {
panic(err)
}
// Al recibir la señal SIGTERM, SignalsHandler cerrará el enrutador de forma segura.
// También puede cerrar el enrutador llamando a `r.Close()`.
router.AddPlugin(plugin.SignalsHandler)
// El middleware a nivel de enrutador se ejecutará para cada mensaje enviado al enrutador.
router.AddMiddleware(
// CorrelationID copia el ID de correlación de los metadatos del mensaje entrante al mensaje generado.
middleware.CorrelationID,
// Si el controlador devuelve un error, reintente la función del controlador.
// Después de alcanzar MaxRetries, el mensaje se rechaza y PubSub es responsable de reenviarlo.
middleware.Retry{
MaxRetries: 3,
InitialInterval: time.Millisecond * 100,
Logger: logger,
}.Middleware,
// Recoverer maneja los fallos en el controlador.
// En este caso, los pasa como errores al middleware Retry.
middleware.Recoverer,
)
// Por simplicidad, usamos gochannel Pub/Sub aquí,
// puedes reemplazarlo con cualquier implementación de Pub/Sub y el efecto será el mismo.
pubSub := gochannel.NewGoChannel(gochannel.Config{}, logger)
// Generar algunos mensajes entrantes en segundo plano
go publishMessages(pubSub)
// AddHandler devuelve un controlador que se puede utilizar para agregar middleware a nivel de controlador
// o para detener el controlador.
handler := router.AddHandler(
"struct_handler", // Nombre del controlador, debe ser único
"incoming_messages_topic", // Tema desde el cual se leen los eventos
pubSub,
"outgoing_messages_topic", // Tema al cual se publican los eventos
pubSub,
structHandler{}.Handler,
)
// El middleware a nivel de controlador se ejecuta solo para un controlador específico
// Este tipo de middleware se puede agregar de la misma manera que el middleware a nivel de enrutador
handler.AddMiddleware(func(h message.HandlerFunc) message.HandlerFunc {
return func(message *message.Message) ([]*message.Message, error) {
log.Println("Ejecutando middleware específico del controlador para", message.UUID)
return h(message)
}
})
// Por propósitos de depuración, imprimimos todos los mensajes recibidos en "incoming_messages_topic"
router.AddNoPublisherHandler(
"print_incoming_messages",
"incoming_messages_topic",
pubSub,
printMessages,
)
// Por propósitos de depuración, imprimimos todos los eventos enviados a "outgoing_messages_topic"
router.AddNoPublisherHandler(
"print_outgoing_messages",
"outgoing_messages_topic",
pubSub,
printMessages,
)
// Ahora que todos los controladores están registrados, ejecutamos el enrutador.
// Run bloquea mientras el enrutador está en ejecución.
ctx := context.Background()
if err := router.Run(ctx); err != nil {
panic(err)
}
}
// ...
Mensajes entrantes
El struct_handler
consume mensajes del incoming_messages_topic
, por lo que simulamos tráfico entrante llamando a publishMessages()
en segundo plano. Tenga en cuenta que hemos agregado el middleware SetCorrelationID
. El enrutador agregará un ID de correlación a todos los mensajes generados (almacenados en los metadatos).
Código fuente completo: github.com/ThreeDotsLabs/watermill/_examples/basic/3-router/main.go
// …
func publishMessages(publisher message.Publisher) {
for {
msg := message.NewMessage(watermill.NewUUID(), []byte("¡Hola, mundo!"))
middleware.SetCorrelationID(watermill.NewUUID(), msg)
log.Printf("enviando mensaje %s, ID de correlación: %s\n", msg.UUID, middleware.MessageCorrelationID(msg))
if err := publisher.Publish("incoming_messages_topic", msg); err != nil {
panic(err)
}
time.Sleep(time.Second)
}
}
// …
Controladores
Quizás haya notado que hay dos tipos de funciones controladoras:
- Función
func(msg *message.Message) ([]*message.Message, error)
- Método
func (c structHandler) Handler(msg *message.Message) ([]*message.Message, error)
Si su controlador es una función que no depende de ninguna dependencia, usar la primera opción está bien. Cuando su controlador requiere algunas dependencias (como manejadores de base de datos, registradores, etc.), la segunda opción es útil.
Código fuente completo: github.com/ThreeDotsLabs/watermill/_examples/basic/3-router/main.go
// …
func printMessages(msg *message.Message) error {
fmt.Printf(
"\n> Mensaje recibido: %s\n> %s\n> metadatos: %v\n\n",
msg.UUID, string(msg.Payload), msg.Metadata,
)
return nil
}
type structHandler struct {
// Aquí podemos agregar algunas dependencias
}
func (s structHandler) Handler(msg *message.Message) ([]*message.Message, error) {
log.Println("structHandler ha recibido el mensaje", msg.UUID)
msg = message.NewMessage(watermill.NewUUID(), []byte("mensaje producido por structHandler"))
return message.Messages{msg}, nil
}
¡Hecho!
Puede ejecutar este ejemplo con go run main.go
.