Installazione
go get -u github.com/ThreeDotsLabs/watermill
Preparazione
L'idea fondamentale dietro alle applicazioni basate su eventi è sempre la stessa: ascoltare i messaggi in arrivo e reagire ad essi. Watermill supporta l'implementazione di questo comportamento per più publisher e subscriber.
La parte centrale di Watermill è il Message, che è tanto importante quanto http.Request
nel pacchetto http
. La maggior parte delle funzionalità di Watermill utilizza questa struttura in qualche modo.
Nonostante le funzionalità complesse fornite dalla libreria PubSub, per Watermill è necessario implementare solo due interfacce per iniziare a usarle: Publisher
e Subscriber
.
type Publisher interface {
Publish(topic string, messages ...*Message) error
Close() error
}
type Subscriber interface {
Subscribe(ctx context.Context, topic string) (<-chan *Message, error)
}
Sottoscrizione ai messaggi
Iniziamo con la sottoscrizione. Subscribe
si aspetta un nome di topic e restituisce un canale per ricevere i messaggi in arrivo. Il significato specifico di topic dipende dall'implementazione di PubSub.
messages, err := subscriber.Subscribe(ctx, "esempio.topic")
if err != nil {
panic(err)
}
for msg := range messages {
fmt.Printf("messaggio ricevuto: %s, payload: %s\n", msg.UUID, string(msg.Payload))
msg.Ack()
}
Per esempi dettagliati sui PubSub supportati, si prega di fare riferimento al contenuto seguente.
Esempio di Go Channel
Codice completo dell'esempio: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/go-channel/main.go
// ...
package main
import (
"context"
"fmt"
"time"
"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill/message"
"github.com/ThreeDotsLabs/watermill/pubsub/gochannel"
)
func main() {
pubSub := gochannel.NewGoChannel(
gochannel.Config{},
watermill.NewStdLogger(false, false),
)
messages, err := pubSub.Subscribe(context.Background(), "esempio.topic")
if err != nil {
panic(err)
}
go process(messages)
// ...
Codice completo dell'esempio: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/go-channel/main.go
// ...
func process(messages <-chan *message.Message) {
for msg := range messages {
fmt.Printf("messaggio ricevuto: %s, payload: %s\n", msg.UUID, string(msg.Payload))
// Dobbiamo confermare di aver ricevuto e elaborato il messaggio,
// altrimenti verrà rinviato più volte.
msg.Ack()
}
}
Esempio Kafka
Codice sorgente completo: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/kafka/main.go
// ...
package main
import (
"context"
"log"
"time"
"github.com/Shopify/sarama"
"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill-kafka/v2/pkg/kafka"
"github.com/ThreeDotsLabs/watermill/message"
)
func main() {
saramaSubscriberConfig := kafka.DefaultSaramaSubscriberConfig()
// Equivalente a auto.offset.reset: earliest
saramaSubscriberConfig.Consumer.Offsets.Initial = sarama.OffsetOldest
subscriber, err := kafka.NewSubscriber(
kafka.SubscriberConfig{
Brokers: []string{"kafka:9092"},
Unmarshaler: kafka.DefaultMarshaler{},
OverwriteSaramaConfig: saramaSubscriberConfig,
ConsumerGroup: "test_consumer_group",
},
watermill.NewStdLogger(false, false),
)
if err != nil {
panic(err)
}
messages, err := subscriber.Subscribe(context.Background(), "example.topic")
if err != nil {
panic(err)
}
go process(messages)
// ...
}
Codice sorgente completo: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/kafka/main.go
// ...
func process(messages <-chan *message.Message) {
for msg := range messages {
log.Printf("messaggio ricevuto: %s, payload: %s", msg.UUID, string(msg.Payload))
// È necessario confermare la ricezione e l'elaborazione del messaggio,
// altrimenti il messaggio verrà inviato nuovamente ripetutamente.
msg.Ack()
}
}
Esempio RabbitMQ (AMQP)
Codice sorgente completo: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/amqp/main.go
// ...
package main
import (
"context"
"log"
"time"
"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill-amqp/v2/pkg/amqp"
"github.com/ThreeDotsLabs/watermill/message"
)
var amqpURI = "amqp://guest:guest@rabbitmq:5672/"
func main() {
amqpConfig := amqp.NewDurableQueueConfig(amqpURI)
subscriber, err := amqp.NewSubscriber(
// Questa configurazione si basa sull'esempio seguente: https://www.rabbitmq.com/tutorials/tutorial-two-go.html
// Viene utilizzato come coda semplice.
//
// Se si desidera implementare un servizio stile Pub/Sub, fare riferimento a
// https://watermill.io/pubsubs/amqp/#amqp-consumer-groups
amqpConfig,
watermill.NewStdLogger(false, false),
)
if err != nil {
panic(err)
}
messages, err := subscriber.Subscribe(context.Background(), "example.topic")
if err != nil {
panic(err)
}
go process(messages)
// ...
}
Codice sorgente completo: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/amqp/main.go
// ...
func process(messages <-chan *message.Message) {
for msg := range messages {
log.Printf("Messaggio ricevuto: %s, payload: %s", msg.UUID, string(msg.Payload))
// È necessario confermare che il messaggio sia stato ricevuto ed elaborato,
// altrimenti il messaggio verrà nuovamente recapitato ripetutamente.
msg.Ack()
}
}
Esempio SQL
Codice sorgente completo: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/sql/main.go
// ...
package main
import (
"context"
stdSQL "database/sql"
"log"
"time"
driver "github.com/go-sql-driver/mysql"
"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill-sql/v2/pkg/sql"
"github.com/ThreeDotsLabs/watermill/message"
)
func main() {
db := createDB()
logger := watermill.NewStdLogger(false, false)
subscriber, err := sql.NewSubscriber(
db,
sql.SubscriberConfig{
SchemaAdapter: sql.DefaultMySQLSchema{},
OffsetsAdapter: sql.DefaultMySQLOffsetsAdapter{},
InitializeSchema: true,
},
logger,
)
if err != nil {
panic(err)
}
messages, err := subscriber.Subscribe(context.Background(), "example_topic")
if err != nil {
panic(err)
}
go process(messages)
// ...
Codice sorgente completo: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/sql/main.go
// ...
func process(messages <-chan *message.Message) {
for msg := range messages {
log.Printf("Messaggio ricevuto: %s, payload: %s", msg.UUID, string(msg.Payload))
// È necessario confermare che il messaggio è stato ricevuto e elaborato,
// altrimenti il messaggio verrà nuovamente consegnato ripetutamente.
msg.Ack()
}
}
Creare un Messaggio
Watermill non impone alcun formato di messaggio. NewMessage
si aspetta che il payload sia una slice di byte. È possibile utilizzare stringhe, JSON, protobuf, Avro, gob o qualsiasi altro formato che possa essere serializzato in []byte
.
L'UUID del messaggio è facoltativo, ma è consigliato in quanto aiuta nel debug.
msg := message.NewMessage(watermill.NewUUID(), []byte("Ciao, mondo!"))
Pubblicare un Messaggio
Il metodo Publish
richiede un argomento e uno o più messaggi da pubblicare.
err := publisher.Publish("example.topic", msg)
if err != nil {
panic(err)
}
Esempio di Go Channel
Codice sorgente completo: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/go-channel/main.go
// ...
go process(messages)
publishMessages(pubSub)
}
func publishMessages(publisher message.Publisher) {
for {
msg := message.NewMessage(watermill.NewUUID(), []byte("Ciao, mondo!"))
if err := publisher.Publish("example.topic", msg); err != nil {
panic(err)
}
time.Sleep(time.Second)
// ...
Esempio Kafka
Codice sorgente completo: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/kafka/main.go
// ...
go process(messages)
publisher, err := kafka.NewPublisher(
kafka.PublisherConfig{
Brokers: []string{"kafka:9092"},
Marshaler: kafka.DefaultMarshaler{},
},
watermill.NewStdLogger(false, false),
)
if err != nil {
panic(err)
}
publishMessages(publisher)
}
func publishMessages(publisher message.Publisher) {
for {
msg := message.NewMessage(watermill.NewUUID(), []byte("Ciao, mondo!"))
if err := publisher.Publish("example.topic", msg); err != nil {
panic(err)
}
time.Sleep(time.Second)
// ...
Esempio di RabbitMQ (AMQP)
Codice Sorgente Completo: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/amqp/main.go
// ...
go process(messages)
publisher, err := amqp.NewPublisher(amqpConfig, watermill.NewStdLogger(false, false))
if err != nil {
panic(err)
}
publishMessages(publisher)
}
func publishMessages(publisher message.Publisher) {
for {
msg := message.NewMessage(watermill.NewUUID(), []byte("Ciao, mondo!"))
if err := publisher.Publish("example.topic", msg); err != nil {
panic(err)
}
time.Sleep(time.Second)
// ...
Esempio di SQL
Codice Sorgente Completo: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/sql/main.go
// ...
go process(messages)
publisher, err := sql.NewPublisher(
db,
sql.PublisherConfig{
SchemaAdapter: sql.DefaultMySQLSchema{},
},
logger,
)
if err != nil {
panic(err)
}
publishMessages(publisher)
}
func createDB() *stdSQL.DB {
conf := driver.NewConfig()
conf.Net = "tcp"
conf.User = "root"
conf.Addr = "mysql"
conf.DBName = "watermill"
db, err := stdSQL.Open("mysql", conf.FormatDSN())
if err != nil {
panic(err)
}
err = db.Ping()
if err != nil {
panic(err)
}
return db
}
func publishMessages(publisher message.Publisher) {
for {
msg := message.NewMessage(watermill.NewUUID(), []byte(`{"message": "Ciao, mondo!"}`))
if err := publisher.Publish("example_topic", msg); err != nil {
panic(err)
}
time.Sleep(time.Second)
// ...
Utilizzo di Message Router
Publisher e Subscriber sono componenti di livello inferiore di Watermill. Nella maggior parte dei casi, probabilmente desidereresti utilizzare interfacce e funzionalità di livello superiore come correlazioni, metriche, code di sicurezza, ritentativi e limitazioni di velocità.
Potresti voler riconoscere il messaggio solo una volta che è stato elaborato con successo. In altri casi, potresti volerlo riconoscere immediatamente e poi considerare l'elaborazione. A volte, potresti voler eseguire determinate azioni in base al messaggio in arrivo e pubblicare un altro messaggio in risposta.
Per soddisfare tali requisiti, esiste un componente chiamato Router.
Applicazione di Esempio di Message Router
Il flusso dell'applicazione di esempio è il seguente:
- Genera un messaggio sul topic
incoming_messages_topic
ogni secondo. - Il listener
struct_handler
gestisce ilincoming_messages_topic
. Al ricevere un messaggio, stampa l'UUID e genera un nuovo messaggio sul topicoutgoing_messages_topic
. - Il gestore
print_incoming_messages
ascolta ilincoming_messages_topic
e stampa l'UUID, il payload e i metadati del messaggio. - Il gestore
print_outgoing_messages
ascolta iloutgoing_messages_topic
e stampa l'UUID, il payload e i metadati del messaggio. L'ID di correlazione dovrebbe essere lo stesso del messaggio sulincoming_messages_topic
.
Configurazione del router
Per prima cosa, configura il router aggiungendo plugin e middleware. Successivamente, imposta gli handler che il router utilizzerà. Ogni handler elaborerà in modo indipendente i messaggi.
Codice sorgente completo: github.com/ThreeDotsLabs/watermill/_examples/basic/3-router/main.go
// ...
pacchetto principale
import (
"context"
"fmt"
"log"
"time"
"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill/message"
"github.com/ThreeDotsLabs/watermill/message/router/middleware"
"github.com/ThreeDotsLabs/watermill/message/router/plugin"
"github.com/ThreeDotsLabs/watermill/pubsub/gochannel"
)
var (
// Per questo esempio, utiliziamo un'implementazione semplice del logger,
// potresti voler fornire la tua implementazione di `watermill.LoggerAdapter`.
logger = watermill.NewStdLogger(false, false)
)
func main() {
router, err := message.NewRouter(message.RouterConfig{}, logger)
if err != nil {
panic(err)
}
// Quando riceve il segnale SIGTERM, SignalsHandler chiude correttamente il Router.
// Puoi anche chiudere il router chiamando `r.Close()`.
router.AddPlugin(plugin.SignalsHandler)
// Il middleware a livello di Router verrà eseguito per ogni messaggio inviato al router
router.AddMiddleware(
// CorrelationID copia l'ID di correlazione dei metadati del messaggio in arrivo al messaggio generato
middleware.CorrelationID,
// Se l'handler restituisce un errore, ritenta la funzione dell'handler.
// Dopo aver raggiunto MaxRetries, il messaggio viene Nacked, e PubSub è responsabile di rinviarlo.
middleware.Retry{
MaxRetries: 3,
IntervalloIniziale: time.Millisecond * 100,
Logger: logger,
}.Middleware,
// Recoverer gestisce i panici nell'handler.
// In questo caso, li passa come errori al middleware di retry.
middleware.Recoverer,
)
// Per semplicità, utilizziamo gochannel Pub/Sub qui,
// puoi sostituirlo con qualsiasi implementazione di Pub/Sub e l'effetto sarà lo stesso.
pubSub := gochannel.NewGoChannel(gochannel.Config{}, logger)
// Genera alcuni messaggi in arrivo in background
go publishMessages(pubSub)
// AddHandler restituisce un handler che può essere utilizzato per aggiungere middleware a livello di handler
// o per fermare l'handler.
handler := router.AddHandler(
"struct_handler", // Nome dell'handler, deve essere univoco
"incoming_messages_topic", // Argomento da cui vengono letti gli eventi
pubSub,
"outgoing_messages_topic", // Argomento a cui vengono pubblicati gli eventi
pubSub,
structHandler{}.Handler,
)
// Il middleware a livello di handler viene eseguito solo per un handler specifico
// Questo tipo di middleware può essere aggiunto allo stesso modo del middleware a livello di router
handler.AddMiddleware(func(h message.HandlerFunc) message.HandlerFunc {
return func(message *message.Message) ([]*message.Message, error) {
log.Println("Esecuzione del middleware specifico dell'handler per", message.UUID)
return h(message)
}
})
// Per scopi di debug, stampiamo tutti i messaggi ricevuti su "incoming_messages_topic"
router.AddNoPublisherHandler(
"stampare_messaggi_entrant",
"incoming_messages_topic",
pubSub,
stampaMessaggi,
)
// Per scopi di debug, stampiamo tutti gli eventi inviati su "outgoing_messages_topic"
router.AddNoPublisherHandler(
"stampare_messaggi_uscenti",
"outgoing_messages_topic",
pubSub,
stampaMessaggi,
)
// Ora che tutti gli handler sono registrati, stiamo eseguendo il router.
// Run si blocca mentre il router è in esecuzione.
ctx := context.Background()
if err := router.Run(ctx); err != nil {
panic(err)
}
}
// ...
Messaggi in arrivo
Il gestore struct_handler
consuma messaggi dal incoming_messages_topic
, quindi simuliamo il traffico in arrivo chiamando publishMessages()
in background. Si noti che abbiamo aggiunto il middleware SetCorrelationID
. Il router aggiornerà un ID di correlazione a tutti i messaggi generati (memorizzati nei metadati).
Codice sorgente completo: github.com/ThreeDotsLabs/watermill/_examples/basic/3-router/main.go
// …
func publishMessages(publisher message.Publisher) {
for {
msg := message.NewMessage(watermill.NewUUID(), []byte("Ciao, mondo!"))
middleware.SetCorrelationID(watermill.NewUUID(), msg)
log.Printf("sending message %s, correlation id: %s\n", msg.UUID, middleware.MessageCorrelationID(msg))
if err := publisher.Publish("incoming_messages_topic", msg); err != nil {
panic(err)
}
time.Sleep(time.Second)
}
}
// …
Gestori
È possibile che tu abbia notato che ci sono due tipi di funzioni di gestione:
- Funzione
func(msg *message.Message) ([]*message.Message, error)
- Metodo
func (c structHandler) Handler(msg *message.Message) ([]*message.Message, error)
Se il tuo gestore è una funzione che non dipende da alcuna dipendenza, è bene utilizzare la prima opzione. Quando il gestore richiede alcune dipendenze (come ad esempio gestori di database, registri, ecc.), la seconda opzione è utile.
Codice sorgente completo: github.com/ThreeDotsLabs/watermill/_examples/basic/3-router/main.go
// …
func printMessages(msg *message.Message) error {
fmt.Printf(
"\n> Messaggio ricevuto: %s\n> %s\n> metadati: %v\n\n",
msg.UUID, string(msg.Payload), msg.Metadata,
)
return nil
}
type structHandler struct {
// Qui possiamo aggiungere alcune dipendenze
}
func (s structHandler) Handler(msg *message.Message) ([]*message.Message, error) {
log.Println("structHandler ha ricevuto il messaggio", msg.UUID)
msg = message.NewMessage(watermill.NewUUID(), []byte("messaggio prodotto da structHandler"))
return message.Messages{msg}, nil
}
Fatto!
Puoi eseguire questo esempio con go run main.go
.