Instalação

go get -u github.com/ThreeDotsLabs/watermill

Preparação

A ideia fundamental por trás de aplicações orientadas a eventos é sempre a mesma: ouvir mensagens recebidas e reagir a elas. O Watermill suporta a implementação desse comportamento para múltiplos editores e assinantes.

A parte central do Watermill é a Message, que é tão importante quanto http.Request no pacote http. A maioria dos recursos do Watermill utiliza essa estrutura de alguma forma.

Apesar dos recursos complexos fornecidos pela biblioteca PubSub, para o Watermill, apenas é necessário implementar duas interfaces para começar a usá-las: Publisher e Subscriber.

type Publisher interface {
	Publish(topic string, messages ...*Message) error
	Close() error
}

type Subscriber interface {
	Subscribe(ctx context.Context, topic string) (<-chan *Message, error)
}

Assinatura de Mensagens

Vamos começar com a assinatura. O Subscribe espera um nome de tópico e retorna um canal para receber mensagens recebidas. O significado específico de tópico depende da implementação do PubSub.

messages, err := subscriber.Subscribe(ctx, "example.topic")
if err != nil {
	panic(err)
}

for msg := range messages {
	fmt.Printf("received message: %s, payload: %s\n", msg.UUID, string(msg.Payload))
	msg.Ack()
}

Para exemplos detalhados de PubSub suportados, consulte o seguinte conteúdo.

Exemplo de Canal Go

Código de exemplo completo: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/go-channel/main.go

// ...
package main

import (
	"context"
	"fmt"
	"time"

	"github.com/ThreeDotsLabs/watermill"
	"github.com/ThreeDotsLabs/watermill/message"
	"github.com/ThreeDotsLabs/watermill/pubsub/gochannel"
)

func main() {
	pubSub := gochannel.NewGoChannel(
		gochannel.Config{},
		watermill.NewStdLogger(false, false),
	)

messages, err := pubSub.Subscribe(context.Background(), "example.topic")
if err != nil {
	panic(err)
}

go process(messages)
// ...

Código de exemplo completo: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/go-channel/main.go

// ...
func process(messages <-chan *message.Message) {
	for msg := range messages {
		fmt.Printf("received message: %s, payload: %s\n", msg.UUID, string(msg.Payload))

	// Precisamos confirmar que recebemos e processamos a mensagem,
	// caso contrário, ela será redistribuída várias vezes.
		msg.Ack()
	}
}

Exemplo Kafka

Código-fonte completo: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/kafka/main.go

// ...
package main

import (
	"context"
	"log"
	"time"

	"github.com/Shopify/sarama"

	"github.com/ThreeDotsLabs/watermill"
	"github.com/ThreeDotsLabs/watermill-kafka/v2/pkg/kafka"
	"github.com/ThreeDotsLabs/watermill/message"
)

func main() {
	saramaSubscriberConfig := kafka.DefaultSaramaSubscriberConfig()
	// Equivalente a auto.offset.reset: earliest
	saramaSubscriberConfig.Consumer.Offsets.Initial = sarama.OffsetOldest

	subscriber, err := kafka.NewSubscriber(
		kafka.SubscriberConfig{
			Brokers:               []string{"kafka:9092"},
			Unmarshaler:           kafka.DefaultMarshaler{},
			OverwriteSaramaConfig: saramaSubscriberConfig,
			ConsumerGroup:         "test_consumer_group",
		},
		watermill.NewStdLogger(false, false),
	)
	if err != nil {
		panic(err)
	}

	messages, err := subscriber.Subscribe(context.Background(), "example.topic")
	if err != nil {
		panic(err)
	}

	go process(messages)
// ...
}

Código-fonte completo: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/kafka/main.go

// ...
func process(messages <-chan *message.Message) {
	for msg := range messages {
		log.Printf("mensagem recebida: %s, carga: %s", msg.UUID, string(msg.Payload))

		// Precisamos reconhecer que recebemos e processamos a mensagem,
		// caso contrário, a mensagem será reenviada repetidamente.
		msg.Ack()
	}
}

Exemplo RabbitMQ (AMQP)

Código-fonte completo: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/amqp/main.go

// ...
package main

import (
	"context"
	"log"
	"time"

	"github.com/ThreeDotsLabs/watermill"
	"github.com/ThreeDotsLabs/watermill-amqp/v2/pkg/amqp"
	"github.com/ThreeDotsLabs/watermill/message"
)

var amqpURI = "amqp://guest:guest@rabbitmq:5672/"

func main() {
	amqpConfig := amqp.NewDurableQueueConfig(amqpURI)

	subscriber, err := amqp.NewSubscriber(
		// Esta configuração é baseada no seguinte exemplo: https://www.rabbitmq.com/tutorials/tutorial-two-go.html
		// É utilizado como uma fila simples.
		//
		// Se deseja implementar um serviço no estilo de Pub/Sub, consulte
		// https://watermill.io/pubsubs/amqp/#amqp-consumer-groups
		amqpConfig,
		watermill.NewStdLogger(false, false),
	)
	if err != nil {
		panic(err)
	}

	messages, err := subscriber.Subscribe(context.Background(), "example.topic")
	if err != nil {
		panic(err)
	}

	go process(messages)
// ...
}

Código-fonte completo: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/amqp/main.go

// ...
func process(messages <-chan *message.Message) {
	for msg := range messages {
		log.Printf("Mensagem recebida: %s, carga: %s", msg.UUID, string(msg.Payload))

		// Precisamos reconhecer que a mensagem foi recebida e processada,
		// caso contrário, a mensagem será reenviada repetidamente.
		msg.Ack()
	}
}

Exemplo de SQL

Código-fonte completo: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/sql/main.go

// ...
package main

import (
	"context"
	stdSQL "database/sql"
	"log"
	"time"

	driver "github.com/go-sql-driver/mysql"

	"github.com/ThreeDotsLabs/watermill"
	"github.com/ThreeDotsLabs/watermill-sql/v2/pkg/sql"
	"github.com/ThreeDotsLabs/watermill/message"
)

func main() {
	db := createDB()
	logger := watermill.NewStdLogger(false, false)

	subscriber, err := sql.NewSubscriber(
		db,
		sql.SubscriberConfig{
			SchemaAdapter:    sql.DefaultMySQLSchema{},
			OffsetsAdapter:   sql.DefaultMySQLOffsetsAdapter{},
			InitializeSchema: true,
		},
		logger,
	)
	if err != nil {
		panic(err)
	}

	messages, err := subscriber.Subscribe(context.Background(), "example_topic")
	if err != nil {
		panic(err)
	}

	go process(messages)
// ...

Código-fonte completo: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/sql/main.go

// ...
func process(messages <-chan *message.Message) {
	for msg := range messages {
		log.Printf("Mensagem recebida: %s, carga: %s", msg.UUID, string(msg.Payload))

		// Precisamos confirmar que a mensagem foi recebida e processada,
		// caso contrário, a mensagem será redelivered repetidamente.
		msg.Ack()
	}
}

Criar Mensagem

Watermill não impõe nenhum formato de mensagem. O NewMessage espera que a carga seja uma matriz de bytes. Você pode usar strings, JSON, protobuf, Avro, gob ou qualquer outro formato que possa ser serializado em []byte.

UUID da mensagem é opcional, mas é recomendado, pois ajuda na depuração.

msg := message.NewMessage(watermill.NewUUID(), []byte("Olá, mundo!"))

Publicar Mensagem

O método Publish requer um tópico e uma ou mais mensagens para publicar.

err := publisher.Publish("example.topic", msg)
if err != nil {
    panic(err)
}

Exemplo de Canal Go

Código-fonte completo: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/go-channel/main.go

// ...
	go process(messages)

	publishMessages(pubSub)
}

func publishMessages(publisher message.Publisher) {
	for {
		msg := message.NewMessage(watermill.NewUUID(), []byte("Olá, mundo!"))

		if err := publisher.Publish("example.topic", msg); err != nil {
			panic(err)
		}

		time.Sleep(time.Second)
// ...

Exemplo de Kafka

Código-fonte completo: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/kafka/main.go

// ...
	go process(messages)

	publisher, err := kafka.NewPublisher(
		kafka.PublisherConfig{
			Brokers:   []string{"kafka:9092"},
			Marshaler: kafka.DefaultMarshaler{},
		},
		watermill.NewStdLogger(false, false),
	)
	if err != nil {
		panic(err)
	}

	publishMessages(publisher)
}

func publishMessages(publisher message.Publisher) {
	for {
		msg := message.NewMessage(watermill.NewUUID(), []byte("Olá, mundo!"))

		if err := publisher.Publish("example.topic", msg); err != nil {
			panic(err)
		}

		time.Sleep(time.Second)
// ...

Exemplo RabbitMQ (AMQP)

Código Fonte Completo: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/amqp/main.go

// ...
	go process(messages)

	publisher, err := amqp.NewPublisher(amqpConfig, watermill.NewStdLogger(false, false))
	if err != nil {
		panic(err)
	}

	publicarMensagens(publisher)
}

func publicarMensagens(publisher message.Publisher) {
	for {
		msg := message.NewMessage(watermill.NewUUID(), []byte("Olá, mundo!"))

		if err := publisher.Publish("exemplo.tópico", msg); err != nil {
			panic(err)
		}

		time.Sleep(time.Second)
// ...

Exemplo SQL

Código Fonte Completo: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/sql/main.go

// ...
	go process(messages)

	publisher, err := sql.NewPublisher(
		db,
		sql.PublisherConfig{
			SchemaAdapter: sql.DefaultMySQLSchema{},
		},
		logger,
	)
	if err != nil {
		panic(err)
	}

	publicarMensagens(publisher)
}

func criarBD() *stdSQL.DB {
	conf := driver.NewConfig()
	conf.Net = "tcp"
	conf.User = "root"
	conf.Addr = "mysql"
	conf.DBName = "watermill"

	db, err := stdSQL.Open("mysql", conf.FormatDSN())
	if err != nil {
		panic(err)
	}

	err = db.Ping()
	if err != nil {
		panic(err)
	}

	return db
}

func publicarMensagens(publisher message.Publisher) {
	for {
		msg := message.NewMessage(watermill.NewUUID(), []byte(`{"message": "Olá, mundo!"}`))

		if err := publisher.Publish("exemplo_tópico", msg); err != nil {
			panic(err)
		}

		time.Sleep(time.Second)
// ...

Utilizando o Message Router

Publishers and Subscribers são partes de baixo nível do Watermill. Na maioria dos casos, você normalmente deseja utilizar interfaces e recursos de nível mais alto, como correlações, métricas, filas de erro, tentativas e limitação de taxa.

Você pode querer reconhecer a mensagem somente após o processamento ser concluído com sucesso. Em outros casos, pode desejar reconhecê-la imediatamente e então considerar o processamento. Às vezes, pode desejar realizar certas ações com base na mensagem recebida e publicar outra mensagem em resposta.

Para atender a esses requisitos, existe um componente chamado Router.

Exemplo de Aplicação do Message Router

O fluxo do exemplo do aplicativo é o seguinte:

  1. Gerar uma mensagem no tópico incoming_messages_topic a cada segundo.
  2. O ouvinte struct_handler lida com o incoming_messages_topic. Ao receber uma mensagem, imprime o UUID e gera uma nova mensagem no outgoing_messages_topic.
  3. O ouvinte print_incoming_messages escuta o incoming_messages_topic e imprime o UUID, carga útil e metadados da mensagem.
  4. O ouvinte print_outgoing_messages escuta o outgoing_messages_topic e imprime o UUID, carga útil e metadados da mensagem. O ID de correlação deve ser o mesmo que o da mensagem no incoming_messages_topic.

Configuração do Roteador

Primeiro, configure o roteador adicionando plugins e middleware. Em seguida, defina os manipuladores que o roteador usará. Cada manipulador processará as mensagens de forma independente.

Código fonte completo: github.com/ThreeDotsLabs/watermill/_examples/basic/3-router/main.go

// ...
package main

import (
	"context"
	"fmt"
	"log"
	"time"

	"github.com/ThreeDotsLabs/watermill"
	"github.com/ThreeDotsLabs/watermill/message"
	"github.com/ThreeDotsLabs/watermill/message/router/middleware"
	"github.com/ThreeDotsLabs/watermill/message/router/plugin"
	"github.com/ThreeDotsLabs/watermill/pubsub/gochannel"
)

var (
	// Para este exemplo, usamos uma implementação simples de logger,
	// você pode querer fornecer sua própria implementação de `watermill.LoggerAdapter`.
	logger = watermill.NewStdLogger(false, false)
)

func main() {
	router, err := message.NewRouter(message.RouterConfig{}, logger)
	if err != nil {
		panic(err)
	}

	// Ao receber o sinal SIGTERM, o SignalsHandler fechará o Roteador de forma apropriada.
	// Você também pode fechar o roteador chamando `r.Close()`.
	router.AddPlugin(plugin.SignalsHandler)

	// O middleware de nível de roteador será executado para cada mensagem enviada para o roteador
	router.AddMiddleware(
		// CorrelationID copia o ID de correlação dos metadados da mensagem de entrada para a mensagem gerada
		middleware.CorrelationID,

		// Se o manipulador retornar um erro, tente a função do manipulador novamente.
		// Após atingir MaxRetries, a mensagem é Nacked, e o PubSub é responsável por reenviá-la.
		middleware.Retry{
			MaxRetries:      3,
			InitialInterval: time.Millisecond * 100,
			Logger:          logger,
		}.Middleware,

		// Recoverer lida com panics no manipulador.
		// Neste caso, os passa como erros para o middleware de Retry.
		middleware.Recoverer,
	)

	// Por simplicidade, usamos o Pub/Sub gochannel aqui,
	// você pode substituí-lo por qualquer implementação de Pub/Sub que o efeito será o mesmo.
	pubSub := gochannel.NewGoChannel(gochannel.Config{}, logger)

	// Gere algumas mensagens de entrada em segundo plano
	go publishMessages(pubSub)

	// AddHandler retorna um manipulador que pode ser usado para adicionar middleware de nível de manipulador
	// ou para parar o manipulador.
	handler := router.AddHandler(
		"struct_handler",          // Nome do manipulador, deve ser único
		"incoming_messages_topic", // Tópico de onde os eventos são lidos
		pubSub,
		"outgoing_messages_topic", // Tópico para onde os eventos são publicados
		pubSub,
		structHandler{}.Handler,
	)

	// O middleware de nível do manipulador é executado apenas para um manipulador específico
	// Este tipo de middleware pode ser adicionado da mesma forma que o middleware de nível de roteador
	handler.AddMiddleware(func(h message.HandlerFunc) message.HandlerFunc {
		return func(message *message.Message) ([]*message.Message, error) {
			log.Println("Executando middleware específico do manipulador para", message.UUID)

			return h(message)
		}
	})

	// Para fins de depuração, imprimimos todas as mensagens recebidas no "incoming_messages_topic"
	router.AddNoPublisherHandler(
		"print_incoming_messages",
		"incoming_messages_topic",
		pubSub,
		printMessages,
	)

	// Para fins de depuração, imprimimos todos os eventos enviados para "outgoing_messages_topic"
	router.AddNoPublisherHandler(
		"print_outgoing_messages",
		"outgoing_messages_topic",
		pubSub,
		printMessages,
	)

	// Agora que todos os manipuladores estão registrados, estamos executando o roteador.
	// Run bloqueia enquanto o roteador está em execução.
	ctx := context.Background()
	if err := router.Run(ctx); err != nil {
		panic(err)
	}
}
// ...

Mensagens Recebidas

O struct_handler consome mensagens do tópico incoming_messages_topic, então simulamos tráfego de entrada chamando publishMessages() em segundo plano. Note que adicionamos o middleware SetCorrelationID. O roteador adicionará um ID de correlação a todas as mensagens geradas (armazenadas nos metadados).

Código fonte completo: github.com/ThreeDotsLabs/watermill/_examples/basic/3-router/main.go

// …
func publishMessages(publisher message.Publisher) {
    for {
        msg := message.NewMessage(watermill.NewUUID(), []byte("Olá, mundo!"))
        middleware.SetCorrelationID(watermill.NewUUID(), msg)

        log.Printf("enviando mensagem %s, ID de correlação: %s\n", msg.UUID, middleware.MessageCorrelationID(msg))

        if err := publisher.Publish("incoming_messages_topic", msg); err != nil {
            panic(err)
        }

        time.Sleep(time.Second)
    }
}
// …

Manipuladores

Pode ter notado que existem dois tipos de funções manipuladoras:

  1. Função func(msg *message.Message) ([]*message.Message, error)
  2. Método func (c structHandler) Handler(msg *message.Message) ([]*message.Message, error)

Se seu manipulador for uma função que não depende de nenhuma dependência, usar a primeira opção é aceitável. Quando seu manipulador requer algumas dependências (como manipuladores de banco de dados, loggers, etc.), a segunda opção é útil.

Código fonte completo: github.com/ThreeDotsLabs/watermill/_examples/basic/3-router/main.go

// …
func printMessages(msg *message.Message) error {
    fmt.Printf(
        "\n> Mensagem recebida: %s\n> %s\n> metadados: %v\n\n",
        msg.UUID, string(msg.Payload), msg.Metadata,
    )
    return nil
}

type structHandler struct {
    // Aqui podemos adicionar algumas dependências
}

func (s structHandler) Handler(msg *message.Message) ([]*message.Message, error) {
    log.Println("structHandler recebeu mensagem", msg.UUID)

    msg = message.NewMessage(watermill.NewUUID(), []byte("mensagem produzida por structHandler"))
    return message.Messages{msg}, nil
}

Feito!

Você pode executar este exemplo com go run main.go.