Installazione

go get -u github.com/ThreeDotsLabs/watermill

Preparazione

L'idea fondamentale dietro alle applicazioni basate su eventi è sempre la stessa: ascoltare i messaggi in arrivo e reagire ad essi. Watermill supporta l'implementazione di questo comportamento per più publisher e subscriber.

La parte centrale di Watermill è il Message, che è tanto importante quanto http.Request nel pacchetto http. La maggior parte delle funzionalità di Watermill utilizza questa struttura in qualche modo.

Nonostante le funzionalità complesse fornite dalla libreria PubSub, per Watermill è necessario implementare solo due interfacce per iniziare a usarle: Publisher e Subscriber.

type Publisher interface {
	Publish(topic string, messages ...*Message) error
	Close() error
}

type Subscriber interface {
	Subscribe(ctx context.Context, topic string) (<-chan *Message, error)
}

Sottoscrizione ai messaggi

Iniziamo con la sottoscrizione. Subscribe si aspetta un nome di topic e restituisce un canale per ricevere i messaggi in arrivo. Il significato specifico di topic dipende dall'implementazione di PubSub.

messages, err := subscriber.Subscribe(ctx, "esempio.topic")
if err != nil {
	panic(err)
}

for msg := range messages {
	fmt.Printf("messaggio ricevuto: %s, payload: %s\n", msg.UUID, string(msg.Payload))
	msg.Ack()
}

Per esempi dettagliati sui PubSub supportati, si prega di fare riferimento al contenuto seguente.

Esempio di Go Channel

Codice completo dell'esempio: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/go-channel/main.go

// ...
package main

import (
	"context"
	"fmt"
	"time"

	"github.com/ThreeDotsLabs/watermill"
	"github.com/ThreeDotsLabs/watermill/message"
	"github.com/ThreeDotsLabs/watermill/pubsub/gochannel"
)

func main() {
	pubSub := gochannel.NewGoChannel(
		gochannel.Config{},
		watermill.NewStdLogger(false, false),
	)

messages, err := pubSub.Subscribe(context.Background(), "esempio.topic")
if err != nil {
	panic(err)
}

go process(messages)
// ...

Codice completo dell'esempio: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/go-channel/main.go

// ...
func process(messages <-chan *message.Message) {
	for msg := range messages {
		fmt.Printf("messaggio ricevuto: %s, payload: %s\n", msg.UUID, string(msg.Payload))

	// Dobbiamo confermare di aver ricevuto e elaborato il messaggio,
	// altrimenti verrà rinviato più volte.
		msg.Ack()
	}
}

Esempio Kafka

Codice sorgente completo: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/kafka/main.go

// ...
package main

import (
	"context"
	"log"
	"time"

	"github.com/Shopify/sarama"

	"github.com/ThreeDotsLabs/watermill"
	"github.com/ThreeDotsLabs/watermill-kafka/v2/pkg/kafka"
	"github.com/ThreeDotsLabs/watermill/message"
)

func main() {
	saramaSubscriberConfig := kafka.DefaultSaramaSubscriberConfig()
	// Equivalente a auto.offset.reset: earliest
	saramaSubscriberConfig.Consumer.Offsets.Initial = sarama.OffsetOldest

	subscriber, err := kafka.NewSubscriber(
		kafka.SubscriberConfig{
			Brokers:               []string{"kafka:9092"},
			Unmarshaler:           kafka.DefaultMarshaler{},
			OverwriteSaramaConfig: saramaSubscriberConfig,
			ConsumerGroup:         "test_consumer_group",
		},
		watermill.NewStdLogger(false, false),
	)
	if err != nil {
		panic(err)
	}

	messages, err := subscriber.Subscribe(context.Background(), "example.topic")
	if err != nil {
		panic(err)
	}

	go process(messages)
// ...
}

Codice sorgente completo: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/kafka/main.go

// ...
func process(messages <-chan *message.Message) {
	for msg := range messages {
		log.Printf("messaggio ricevuto: %s, payload: %s", msg.UUID, string(msg.Payload))

		// È necessario confermare la ricezione e l'elaborazione del messaggio,
		// altrimenti il messaggio verrà inviato nuovamente ripetutamente.
		msg.Ack()
	}
}

Esempio RabbitMQ (AMQP)

Codice sorgente completo: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/amqp/main.go

// ...
package main

import (
	"context"
	"log"
	"time"

	"github.com/ThreeDotsLabs/watermill"
	"github.com/ThreeDotsLabs/watermill-amqp/v2/pkg/amqp"
	"github.com/ThreeDotsLabs/watermill/message"
)

var amqpURI = "amqp://guest:guest@rabbitmq:5672/"

func main() {
	amqpConfig := amqp.NewDurableQueueConfig(amqpURI)

	subscriber, err := amqp.NewSubscriber(
		// Questa configurazione si basa sull'esempio seguente: https://www.rabbitmq.com/tutorials/tutorial-two-go.html
		// Viene utilizzato come coda semplice.
		//
		// Se si desidera implementare un servizio stile Pub/Sub, fare riferimento a
		// https://watermill.io/pubsubs/amqp/#amqp-consumer-groups
		amqpConfig,
		watermill.NewStdLogger(false, false),
	)
	if err != nil {
		panic(err)
	}

	messages, err := subscriber.Subscribe(context.Background(), "example.topic")
	if err != nil {
		panic(err)
	}

	go process(messages)
// ...
}

Codice sorgente completo: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/amqp/main.go

// ...
func process(messages <-chan *message.Message) {
	for msg := range messages {
		log.Printf("Messaggio ricevuto: %s, payload: %s", msg.UUID, string(msg.Payload))

		// È necessario confermare che il messaggio sia stato ricevuto ed elaborato,
		// altrimenti il messaggio verrà nuovamente recapitato ripetutamente.
		msg.Ack()
	}
}

Esempio SQL

Codice sorgente completo: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/sql/main.go

// ...
package main

import (
	"context"
	stdSQL "database/sql"
	"log"
	"time"

	driver "github.com/go-sql-driver/mysql"

	"github.com/ThreeDotsLabs/watermill"
	"github.com/ThreeDotsLabs/watermill-sql/v2/pkg/sql"
	"github.com/ThreeDotsLabs/watermill/message"
)

func main() {
	db := createDB()
	logger := watermill.NewStdLogger(false, false)

	subscriber, err := sql.NewSubscriber(
		db,
		sql.SubscriberConfig{
			SchemaAdapter:    sql.DefaultMySQLSchema{},
			OffsetsAdapter:   sql.DefaultMySQLOffsetsAdapter{},
			InitializeSchema: true,
		},
		logger,
	)
	if err != nil {
		panic(err)
	}

	messages, err := subscriber.Subscribe(context.Background(), "example_topic")
	if err != nil {
		panic(err)
	}

	go process(messages)
// ...

Codice sorgente completo: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/sql/main.go

// ...
func process(messages <-chan *message.Message) {
	for msg := range messages {
		log.Printf("Messaggio ricevuto: %s, payload: %s", msg.UUID, string(msg.Payload))

		// È necessario confermare che il messaggio è stato ricevuto e elaborato,
		// altrimenti il messaggio verrà nuovamente consegnato ripetutamente.
		msg.Ack()
	}
}

Creare un Messaggio

Watermill non impone alcun formato di messaggio. NewMessage si aspetta che il payload sia una slice di byte. È possibile utilizzare stringhe, JSON, protobuf, Avro, gob o qualsiasi altro formato che possa essere serializzato in []byte.

L'UUID del messaggio è facoltativo, ma è consigliato in quanto aiuta nel debug.

msg := message.NewMessage(watermill.NewUUID(), []byte("Ciao, mondo!"))

Pubblicare un Messaggio

Il metodo Publish richiede un argomento e uno o più messaggi da pubblicare.

err := publisher.Publish("example.topic", msg)
if err != nil {
    panic(err)
}

Esempio di Go Channel

Codice sorgente completo: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/go-channel/main.go

// ...
	go process(messages)

	publishMessages(pubSub)
}

func publishMessages(publisher message.Publisher) {
	for {
		msg := message.NewMessage(watermill.NewUUID(), []byte("Ciao, mondo!"))

		if err := publisher.Publish("example.topic", msg); err != nil {
			panic(err)
		}

		time.Sleep(time.Second)
// ...

Esempio Kafka

Codice sorgente completo: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/kafka/main.go

// ...
	go process(messages)

	publisher, err := kafka.NewPublisher(
		kafka.PublisherConfig{
			Brokers:   []string{"kafka:9092"},
			Marshaler: kafka.DefaultMarshaler{},
		},
		watermill.NewStdLogger(false, false),
	)
	if err != nil {
		panic(err)
	}

	publishMessages(publisher)
}

func publishMessages(publisher message.Publisher) {
	for {
		msg := message.NewMessage(watermill.NewUUID(), []byte("Ciao, mondo!"))

		if err := publisher.Publish("example.topic", msg); err != nil {
			panic(err)
		}

		time.Sleep(time.Second)
// ...

Esempio di RabbitMQ (AMQP)

Codice Sorgente Completo: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/amqp/main.go

// ...
	go process(messages)

	publisher, err := amqp.NewPublisher(amqpConfig, watermill.NewStdLogger(false, false))
	if err != nil {
		panic(err)
	}

	publishMessages(publisher)
}

func publishMessages(publisher message.Publisher) {
	for {
		msg := message.NewMessage(watermill.NewUUID(), []byte("Ciao, mondo!"))

		if err := publisher.Publish("example.topic", msg); err != nil {
			panic(err)
		}

		time.Sleep(time.Second)
// ...

Esempio di SQL

Codice Sorgente Completo: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/sql/main.go

// ...
	go process(messages)

	publisher, err := sql.NewPublisher(
		db,
		sql.PublisherConfig{
			SchemaAdapter: sql.DefaultMySQLSchema{},
		},
		logger,
	)
	if err != nil {
		panic(err)
	}

	publishMessages(publisher)
}

func createDB() *stdSQL.DB {
	conf := driver.NewConfig()
	conf.Net = "tcp"
	conf.User = "root"
	conf.Addr = "mysql"
	conf.DBName = "watermill"

	db, err := stdSQL.Open("mysql", conf.FormatDSN())
	if err != nil {
		panic(err)
	}

	err = db.Ping()
	if err != nil {
		panic(err)
	}

	return db
}

func publishMessages(publisher message.Publisher) {
	for {
		msg := message.NewMessage(watermill.NewUUID(), []byte(`{"message": "Ciao, mondo!"}`))

		if err := publisher.Publish("example_topic", msg); err != nil {
			panic(err)
		}

		time.Sleep(time.Second)
// ...

Utilizzo di Message Router

Publisher e Subscriber sono componenti di livello inferiore di Watermill. Nella maggior parte dei casi, probabilmente desidereresti utilizzare interfacce e funzionalità di livello superiore come correlazioni, metriche, code di sicurezza, ritentativi e limitazioni di velocità.

Potresti voler riconoscere il messaggio solo una volta che è stato elaborato con successo. In altri casi, potresti volerlo riconoscere immediatamente e poi considerare l'elaborazione. A volte, potresti voler eseguire determinate azioni in base al messaggio in arrivo e pubblicare un altro messaggio in risposta.

Per soddisfare tali requisiti, esiste un componente chiamato Router.

Applicazione di Esempio di Message Router

Il flusso dell'applicazione di esempio è il seguente:

  1. Genera un messaggio sul topic incoming_messages_topic ogni secondo.
  2. Il listener struct_handler gestisce il incoming_messages_topic. Al ricevere un messaggio, stampa l'UUID e genera un nuovo messaggio sul topic outgoing_messages_topic.
  3. Il gestore print_incoming_messages ascolta il incoming_messages_topic e stampa l'UUID, il payload e i metadati del messaggio.
  4. Il gestore print_outgoing_messages ascolta il outgoing_messages_topic e stampa l'UUID, il payload e i metadati del messaggio. L'ID di correlazione dovrebbe essere lo stesso del messaggio sul incoming_messages_topic.

Configurazione del router

Per prima cosa, configura il router aggiungendo plugin e middleware. Successivamente, imposta gli handler che il router utilizzerà. Ogni handler elaborerà in modo indipendente i messaggi.

Codice sorgente completo: github.com/ThreeDotsLabs/watermill/_examples/basic/3-router/main.go

// ...
pacchetto principale

import (
	"context"
	"fmt"
	"log"
	"time"

	"github.com/ThreeDotsLabs/watermill"
	"github.com/ThreeDotsLabs/watermill/message"
	"github.com/ThreeDotsLabs/watermill/message/router/middleware"
	"github.com/ThreeDotsLabs/watermill/message/router/plugin"
	"github.com/ThreeDotsLabs/watermill/pubsub/gochannel"
)

var (
	// Per questo esempio, utiliziamo un'implementazione semplice del logger,
	// potresti voler fornire la tua implementazione di `watermill.LoggerAdapter`.
	logger = watermill.NewStdLogger(false, false)
)

func main() {
	router, err := message.NewRouter(message.RouterConfig{}, logger)
	if err != nil {
		panic(err)
	}

	// Quando riceve il segnale SIGTERM, SignalsHandler chiude correttamente il Router.
	// Puoi anche chiudere il router chiamando `r.Close()`.
	router.AddPlugin(plugin.SignalsHandler)

	// Il middleware a livello di Router verrà eseguito per ogni messaggio inviato al router
	router.AddMiddleware(
		// CorrelationID copia l'ID di correlazione dei metadati del messaggio in arrivo al messaggio generato
		middleware.CorrelationID,

		// Se l'handler restituisce un errore, ritenta la funzione dell'handler.
		// Dopo aver raggiunto MaxRetries, il messaggio viene Nacked, e PubSub è responsabile di rinviarlo.
		middleware.Retry{
			MaxRetries:      3,
			IntervalloIniziale: time.Millisecond * 100,
			Logger:          logger,
		}.Middleware,

		// Recoverer gestisce i panici nell'handler.
		// In questo caso, li passa come errori al middleware di retry.
		middleware.Recoverer,
	)

	// Per semplicità, utilizziamo gochannel Pub/Sub qui,
	// puoi sostituirlo con qualsiasi implementazione di Pub/Sub e l'effetto sarà lo stesso.
	pubSub := gochannel.NewGoChannel(gochannel.Config{}, logger)

	// Genera alcuni messaggi in arrivo in background
	go publishMessages(pubSub)

	// AddHandler restituisce un handler che può essere utilizzato per aggiungere middleware a livello di handler
	// o per fermare l'handler.
	handler := router.AddHandler(
		"struct_handler",          // Nome dell'handler, deve essere univoco
		"incoming_messages_topic", // Argomento da cui vengono letti gli eventi
		pubSub,
		"outgoing_messages_topic", // Argomento a cui vengono pubblicati gli eventi
		pubSub,
		structHandler{}.Handler,
	)

	// Il middleware a livello di handler viene eseguito solo per un handler specifico
	// Questo tipo di middleware può essere aggiunto allo stesso modo del middleware a livello di router
	handler.AddMiddleware(func(h message.HandlerFunc) message.HandlerFunc {
		return func(message *message.Message) ([]*message.Message, error) {
			log.Println("Esecuzione del middleware specifico dell'handler per", message.UUID)

			return h(message)
		}
	})

	// Per scopi di debug, stampiamo tutti i messaggi ricevuti su "incoming_messages_topic"
	router.AddNoPublisherHandler(
		"stampare_messaggi_entrant",
		"incoming_messages_topic",
		pubSub,
		stampaMessaggi,
	)

	// Per scopi di debug, stampiamo tutti gli eventi inviati su "outgoing_messages_topic"
	router.AddNoPublisherHandler(
		"stampare_messaggi_uscenti",
		"outgoing_messages_topic",
		pubSub,
		stampaMessaggi,
	)

	// Ora che tutti gli handler sono registrati, stiamo eseguendo il router.
	// Run si blocca mentre il router è in esecuzione.
	ctx := context.Background()
	if err := router.Run(ctx); err != nil {
		panic(err)
	}
}
// ...

Messaggi in arrivo

Il gestore struct_handler consuma messaggi dal incoming_messages_topic, quindi simuliamo il traffico in arrivo chiamando publishMessages() in background. Si noti che abbiamo aggiunto il middleware SetCorrelationID. Il router aggiornerà un ID di correlazione a tutti i messaggi generati (memorizzati nei metadati).

Codice sorgente completo: github.com/ThreeDotsLabs/watermill/_examples/basic/3-router/main.go

// …
func publishMessages(publisher message.Publisher) {
    for {
        msg := message.NewMessage(watermill.NewUUID(), []byte("Ciao, mondo!"))
        middleware.SetCorrelationID(watermill.NewUUID(), msg)

        log.Printf("sending message %s, correlation id: %s\n", msg.UUID, middleware.MessageCorrelationID(msg))

        if err := publisher.Publish("incoming_messages_topic", msg); err != nil {
            panic(err)
        }

        time.Sleep(time.Second)
    }
}
// …

Gestori

È possibile che tu abbia notato che ci sono due tipi di funzioni di gestione:

  1. Funzione func(msg *message.Message) ([]*message.Message, error)
  2. Metodo func (c structHandler) Handler(msg *message.Message) ([]*message.Message, error)

Se il tuo gestore è una funzione che non dipende da alcuna dipendenza, è bene utilizzare la prima opzione. Quando il gestore richiede alcune dipendenze (come ad esempio gestori di database, registri, ecc.), la seconda opzione è utile.

Codice sorgente completo: github.com/ThreeDotsLabs/watermill/_examples/basic/3-router/main.go

// …
func printMessages(msg *message.Message) error {
    fmt.Printf(
        "\n> Messaggio ricevuto: %s\n> %s\n> metadati: %v\n\n",
        msg.UUID, string(msg.Payload), msg.Metadata,
    )
    return nil
}

type structHandler struct {
    // Qui possiamo aggiungere alcune dipendenze
}

func (s structHandler) Handler(msg *message.Message) ([]*message.Message, error) {
    log.Println("structHandler ha ricevuto il messaggio", msg.UUID)

    msg = message.NewMessage(watermill.NewUUID(), []byte("messaggio prodotto da structHandler"))
    return message.Messages{msg}, nil
}

Fatto!

Puoi eseguire questo esempio con go run main.go.