Instalação
go get -u github.com/ThreeDotsLabs/watermill
Preparação
A ideia fundamental por trás de aplicações orientadas a eventos é sempre a mesma: ouvir mensagens recebidas e reagir a elas. O Watermill suporta a implementação desse comportamento para múltiplos editores e assinantes.
A parte central do Watermill é a Message, que é tão importante quanto http.Request
no pacote http
. A maioria dos recursos do Watermill utiliza essa estrutura de alguma forma.
Apesar dos recursos complexos fornecidos pela biblioteca PubSub, para o Watermill, apenas é necessário implementar duas interfaces para começar a usá-las: Publisher
e Subscriber
.
type Publisher interface {
Publish(topic string, messages ...*Message) error
Close() error
}
type Subscriber interface {
Subscribe(ctx context.Context, topic string) (<-chan *Message, error)
}
Assinatura de Mensagens
Vamos começar com a assinatura. O Subscribe
espera um nome de tópico e retorna um canal para receber mensagens recebidas. O significado específico de tópico depende da implementação do PubSub.
messages, err := subscriber.Subscribe(ctx, "example.topic")
if err != nil {
panic(err)
}
for msg := range messages {
fmt.Printf("received message: %s, payload: %s\n", msg.UUID, string(msg.Payload))
msg.Ack()
}
Para exemplos detalhados de PubSub suportados, consulte o seguinte conteúdo.
Exemplo de Canal Go
Código de exemplo completo: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/go-channel/main.go
// ...
package main
import (
"context"
"fmt"
"time"
"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill/message"
"github.com/ThreeDotsLabs/watermill/pubsub/gochannel"
)
func main() {
pubSub := gochannel.NewGoChannel(
gochannel.Config{},
watermill.NewStdLogger(false, false),
)
messages, err := pubSub.Subscribe(context.Background(), "example.topic")
if err != nil {
panic(err)
}
go process(messages)
// ...
Código de exemplo completo: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/go-channel/main.go
// ...
func process(messages <-chan *message.Message) {
for msg := range messages {
fmt.Printf("received message: %s, payload: %s\n", msg.UUID, string(msg.Payload))
// Precisamos confirmar que recebemos e processamos a mensagem,
// caso contrário, ela será redistribuída várias vezes.
msg.Ack()
}
}
Exemplo Kafka
Código-fonte completo: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/kafka/main.go
// ...
package main
import (
"context"
"log"
"time"
"github.com/Shopify/sarama"
"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill-kafka/v2/pkg/kafka"
"github.com/ThreeDotsLabs/watermill/message"
)
func main() {
saramaSubscriberConfig := kafka.DefaultSaramaSubscriberConfig()
// Equivalente a auto.offset.reset: earliest
saramaSubscriberConfig.Consumer.Offsets.Initial = sarama.OffsetOldest
subscriber, err := kafka.NewSubscriber(
kafka.SubscriberConfig{
Brokers: []string{"kafka:9092"},
Unmarshaler: kafka.DefaultMarshaler{},
OverwriteSaramaConfig: saramaSubscriberConfig,
ConsumerGroup: "test_consumer_group",
},
watermill.NewStdLogger(false, false),
)
if err != nil {
panic(err)
}
messages, err := subscriber.Subscribe(context.Background(), "example.topic")
if err != nil {
panic(err)
}
go process(messages)
// ...
}
Código-fonte completo: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/kafka/main.go
// ...
func process(messages <-chan *message.Message) {
for msg := range messages {
log.Printf("mensagem recebida: %s, carga: %s", msg.UUID, string(msg.Payload))
// Precisamos reconhecer que recebemos e processamos a mensagem,
// caso contrário, a mensagem será reenviada repetidamente.
msg.Ack()
}
}
Exemplo RabbitMQ (AMQP)
Código-fonte completo: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/amqp/main.go
// ...
package main
import (
"context"
"log"
"time"
"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill-amqp/v2/pkg/amqp"
"github.com/ThreeDotsLabs/watermill/message"
)
var amqpURI = "amqp://guest:guest@rabbitmq:5672/"
func main() {
amqpConfig := amqp.NewDurableQueueConfig(amqpURI)
subscriber, err := amqp.NewSubscriber(
// Esta configuração é baseada no seguinte exemplo: https://www.rabbitmq.com/tutorials/tutorial-two-go.html
// É utilizado como uma fila simples.
//
// Se deseja implementar um serviço no estilo de Pub/Sub, consulte
// https://watermill.io/pubsubs/amqp/#amqp-consumer-groups
amqpConfig,
watermill.NewStdLogger(false, false),
)
if err != nil {
panic(err)
}
messages, err := subscriber.Subscribe(context.Background(), "example.topic")
if err != nil {
panic(err)
}
go process(messages)
// ...
}
Código-fonte completo: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/amqp/main.go
// ...
func process(messages <-chan *message.Message) {
for msg := range messages {
log.Printf("Mensagem recebida: %s, carga: %s", msg.UUID, string(msg.Payload))
// Precisamos reconhecer que a mensagem foi recebida e processada,
// caso contrário, a mensagem será reenviada repetidamente.
msg.Ack()
}
}
Exemplo de SQL
Código-fonte completo: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/sql/main.go
// ...
package main
import (
"context"
stdSQL "database/sql"
"log"
"time"
driver "github.com/go-sql-driver/mysql"
"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill-sql/v2/pkg/sql"
"github.com/ThreeDotsLabs/watermill/message"
)
func main() {
db := createDB()
logger := watermill.NewStdLogger(false, false)
subscriber, err := sql.NewSubscriber(
db,
sql.SubscriberConfig{
SchemaAdapter: sql.DefaultMySQLSchema{},
OffsetsAdapter: sql.DefaultMySQLOffsetsAdapter{},
InitializeSchema: true,
},
logger,
)
if err != nil {
panic(err)
}
messages, err := subscriber.Subscribe(context.Background(), "example_topic")
if err != nil {
panic(err)
}
go process(messages)
// ...
Código-fonte completo: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/sql/main.go
// ...
func process(messages <-chan *message.Message) {
for msg := range messages {
log.Printf("Mensagem recebida: %s, carga: %s", msg.UUID, string(msg.Payload))
// Precisamos confirmar que a mensagem foi recebida e processada,
// caso contrário, a mensagem será redelivered repetidamente.
msg.Ack()
}
}
Criar Mensagem
Watermill não impõe nenhum formato de mensagem. O NewMessage
espera que a carga seja uma matriz de bytes. Você pode usar strings, JSON, protobuf, Avro, gob ou qualquer outro formato que possa ser serializado em []byte
.
UUID da mensagem é opcional, mas é recomendado, pois ajuda na depuração.
msg := message.NewMessage(watermill.NewUUID(), []byte("Olá, mundo!"))
Publicar Mensagem
O método Publish
requer um tópico e uma ou mais mensagens para publicar.
err := publisher.Publish("example.topic", msg)
if err != nil {
panic(err)
}
Exemplo de Canal Go
Código-fonte completo: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/go-channel/main.go
// ...
go process(messages)
publishMessages(pubSub)
}
func publishMessages(publisher message.Publisher) {
for {
msg := message.NewMessage(watermill.NewUUID(), []byte("Olá, mundo!"))
if err := publisher.Publish("example.topic", msg); err != nil {
panic(err)
}
time.Sleep(time.Second)
// ...
Exemplo de Kafka
Código-fonte completo: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/kafka/main.go
// ...
go process(messages)
publisher, err := kafka.NewPublisher(
kafka.PublisherConfig{
Brokers: []string{"kafka:9092"},
Marshaler: kafka.DefaultMarshaler{},
},
watermill.NewStdLogger(false, false),
)
if err != nil {
panic(err)
}
publishMessages(publisher)
}
func publishMessages(publisher message.Publisher) {
for {
msg := message.NewMessage(watermill.NewUUID(), []byte("Olá, mundo!"))
if err := publisher.Publish("example.topic", msg); err != nil {
panic(err)
}
time.Sleep(time.Second)
// ...
Exemplo RabbitMQ (AMQP)
Código Fonte Completo: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/amqp/main.go
// ...
go process(messages)
publisher, err := amqp.NewPublisher(amqpConfig, watermill.NewStdLogger(false, false))
if err != nil {
panic(err)
}
publicarMensagens(publisher)
}
func publicarMensagens(publisher message.Publisher) {
for {
msg := message.NewMessage(watermill.NewUUID(), []byte("Olá, mundo!"))
if err := publisher.Publish("exemplo.tópico", msg); err != nil {
panic(err)
}
time.Sleep(time.Second)
// ...
Exemplo SQL
Código Fonte Completo: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/sql/main.go
// ...
go process(messages)
publisher, err := sql.NewPublisher(
db,
sql.PublisherConfig{
SchemaAdapter: sql.DefaultMySQLSchema{},
},
logger,
)
if err != nil {
panic(err)
}
publicarMensagens(publisher)
}
func criarBD() *stdSQL.DB {
conf := driver.NewConfig()
conf.Net = "tcp"
conf.User = "root"
conf.Addr = "mysql"
conf.DBName = "watermill"
db, err := stdSQL.Open("mysql", conf.FormatDSN())
if err != nil {
panic(err)
}
err = db.Ping()
if err != nil {
panic(err)
}
return db
}
func publicarMensagens(publisher message.Publisher) {
for {
msg := message.NewMessage(watermill.NewUUID(), []byte(`{"message": "Olá, mundo!"}`))
if err := publisher.Publish("exemplo_tópico", msg); err != nil {
panic(err)
}
time.Sleep(time.Second)
// ...
Utilizando o Message Router
Publishers and Subscribers são partes de baixo nível do Watermill. Na maioria dos casos, você normalmente deseja utilizar interfaces e recursos de nível mais alto, como correlações, métricas, filas de erro, tentativas e limitação de taxa.
Você pode querer reconhecer a mensagem somente após o processamento ser concluído com sucesso. Em outros casos, pode desejar reconhecê-la imediatamente e então considerar o processamento. Às vezes, pode desejar realizar certas ações com base na mensagem recebida e publicar outra mensagem em resposta.
Para atender a esses requisitos, existe um componente chamado Router.
Exemplo de Aplicação do Message Router
O fluxo do exemplo do aplicativo é o seguinte:
- Gerar uma mensagem no tópico
incoming_messages_topic
a cada segundo. - O ouvinte
struct_handler
lida com oincoming_messages_topic
. Ao receber uma mensagem, imprime o UUID e gera uma nova mensagem nooutgoing_messages_topic
. - O ouvinte
print_incoming_messages
escuta oincoming_messages_topic
e imprime o UUID, carga útil e metadados da mensagem. - O ouvinte
print_outgoing_messages
escuta ooutgoing_messages_topic
e imprime o UUID, carga útil e metadados da mensagem. O ID de correlação deve ser o mesmo que o da mensagem noincoming_messages_topic
.
Configuração do Roteador
Primeiro, configure o roteador adicionando plugins e middleware. Em seguida, defina os manipuladores que o roteador usará. Cada manipulador processará as mensagens de forma independente.
Código fonte completo: github.com/ThreeDotsLabs/watermill/_examples/basic/3-router/main.go
// ...
package main
import (
"context"
"fmt"
"log"
"time"
"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill/message"
"github.com/ThreeDotsLabs/watermill/message/router/middleware"
"github.com/ThreeDotsLabs/watermill/message/router/plugin"
"github.com/ThreeDotsLabs/watermill/pubsub/gochannel"
)
var (
// Para este exemplo, usamos uma implementação simples de logger,
// você pode querer fornecer sua própria implementação de `watermill.LoggerAdapter`.
logger = watermill.NewStdLogger(false, false)
)
func main() {
router, err := message.NewRouter(message.RouterConfig{}, logger)
if err != nil {
panic(err)
}
// Ao receber o sinal SIGTERM, o SignalsHandler fechará o Roteador de forma apropriada.
// Você também pode fechar o roteador chamando `r.Close()`.
router.AddPlugin(plugin.SignalsHandler)
// O middleware de nível de roteador será executado para cada mensagem enviada para o roteador
router.AddMiddleware(
// CorrelationID copia o ID de correlação dos metadados da mensagem de entrada para a mensagem gerada
middleware.CorrelationID,
// Se o manipulador retornar um erro, tente a função do manipulador novamente.
// Após atingir MaxRetries, a mensagem é Nacked, e o PubSub é responsável por reenviá-la.
middleware.Retry{
MaxRetries: 3,
InitialInterval: time.Millisecond * 100,
Logger: logger,
}.Middleware,
// Recoverer lida com panics no manipulador.
// Neste caso, os passa como erros para o middleware de Retry.
middleware.Recoverer,
)
// Por simplicidade, usamos o Pub/Sub gochannel aqui,
// você pode substituí-lo por qualquer implementação de Pub/Sub que o efeito será o mesmo.
pubSub := gochannel.NewGoChannel(gochannel.Config{}, logger)
// Gere algumas mensagens de entrada em segundo plano
go publishMessages(pubSub)
// AddHandler retorna um manipulador que pode ser usado para adicionar middleware de nível de manipulador
// ou para parar o manipulador.
handler := router.AddHandler(
"struct_handler", // Nome do manipulador, deve ser único
"incoming_messages_topic", // Tópico de onde os eventos são lidos
pubSub,
"outgoing_messages_topic", // Tópico para onde os eventos são publicados
pubSub,
structHandler{}.Handler,
)
// O middleware de nível do manipulador é executado apenas para um manipulador específico
// Este tipo de middleware pode ser adicionado da mesma forma que o middleware de nível de roteador
handler.AddMiddleware(func(h message.HandlerFunc) message.HandlerFunc {
return func(message *message.Message) ([]*message.Message, error) {
log.Println("Executando middleware específico do manipulador para", message.UUID)
return h(message)
}
})
// Para fins de depuração, imprimimos todas as mensagens recebidas no "incoming_messages_topic"
router.AddNoPublisherHandler(
"print_incoming_messages",
"incoming_messages_topic",
pubSub,
printMessages,
)
// Para fins de depuração, imprimimos todos os eventos enviados para "outgoing_messages_topic"
router.AddNoPublisherHandler(
"print_outgoing_messages",
"outgoing_messages_topic",
pubSub,
printMessages,
)
// Agora que todos os manipuladores estão registrados, estamos executando o roteador.
// Run bloqueia enquanto o roteador está em execução.
ctx := context.Background()
if err := router.Run(ctx); err != nil {
panic(err)
}
}
// ...
Mensagens Recebidas
O struct_handler
consome mensagens do tópico incoming_messages_topic
, então simulamos tráfego de entrada chamando publishMessages()
em segundo plano. Note que adicionamos o middleware SetCorrelationID
. O roteador adicionará um ID de correlação a todas as mensagens geradas (armazenadas nos metadados).
Código fonte completo: github.com/ThreeDotsLabs/watermill/_examples/basic/3-router/main.go
// …
func publishMessages(publisher message.Publisher) {
for {
msg := message.NewMessage(watermill.NewUUID(), []byte("Olá, mundo!"))
middleware.SetCorrelationID(watermill.NewUUID(), msg)
log.Printf("enviando mensagem %s, ID de correlação: %s\n", msg.UUID, middleware.MessageCorrelationID(msg))
if err := publisher.Publish("incoming_messages_topic", msg); err != nil {
panic(err)
}
time.Sleep(time.Second)
}
}
// …
Manipuladores
Pode ter notado que existem dois tipos de funções manipuladoras:
- Função
func(msg *message.Message) ([]*message.Message, error)
- Método
func (c structHandler) Handler(msg *message.Message) ([]*message.Message, error)
Se seu manipulador for uma função que não depende de nenhuma dependência, usar a primeira opção é aceitável. Quando seu manipulador requer algumas dependências (como manipuladores de banco de dados, loggers, etc.), a segunda opção é útil.
Código fonte completo: github.com/ThreeDotsLabs/watermill/_examples/basic/3-router/main.go
// …
func printMessages(msg *message.Message) error {
fmt.Printf(
"\n> Mensagem recebida: %s\n> %s\n> metadados: %v\n\n",
msg.UUID, string(msg.Payload), msg.Metadata,
)
return nil
}
type structHandler struct {
// Aqui podemos adicionar algumas dependências
}
func (s structHandler) Handler(msg *message.Message) ([]*message.Message, error) {
log.Println("structHandler recebeu mensagem", msg.UUID)
msg = message.NewMessage(watermill.NewUUID(), []byte("mensagem produzida por structHandler"))
return message.Messages{msg}, nil
}
Feito!
Você pode executar este exemplo com go run main.go
.