Installation

go get -u github.com/ThreeDotsLabs/watermill

Préparation

L'idée fondamentale derrière les applications basées sur les événements est toujours la même : écouter les messages entrants et réagir à ceux-ci. Watermill prend en charge la mise en œuvre de ce comportement pour plusieurs éditeurs et abonnés.

La partie centrale de Watermill est le Message, aussi important que http.Request dans le package http. La plupart des fonctionnalités de Watermill utilisent cette structure d'une certaine manière.

Malgré les fonctionnalités complexes fournies par la bibliothèque PubSub, il est seulement nécessaire d'implémenter deux interfaces pour commencer à les utiliser avec Watermill : Publisher et Subscriber.

type Publisher interface {
	Publish(topic string, messages ...*Message) error
	Close() error
}

type Subscriber interface {
	Subscribe(ctx context.Context, topic string) (<-chan *Message, error)
}

Abonnement aux Messages

Commençons par l'abonnement. Subscribe attend un nom de sujet et renvoie un canal pour recevoir les messages entrants. La signification spécifique du sujet dépend de l'implémentation de PubSub.

messages, err := subscriber.Subscribe(ctx, "example.topic")
if err != nil {
	panic(err)
}

for msg := range messages {
	fmt.Printf("received message: %s, payload: %s\n", msg.UUID, string(msg.Payload))
	msg.Ack()
}

Pour des exemples détaillés des PubSub pris en charge, veuillez vous référer au contenu suivant.

Exemple de Canal Go

Code d'exemple complet : github.com/ThreeDotsLabs/watermill/_examples/pubsubs/go-channel/main.go

// ...
package main

import (
	"context"
	"fmt"
	"time"

	"github.com/ThreeDotsLabs/watermill"
	"github.com/ThreeDotsLabs/watermill/message"
	"github.com/ThreeDotsLabs/watermill/pubsub/gochannel"
)

func main() {
	pubSub := gochannel.NewGoChannel(
		gochannel.Config{},
		watermill.NewStdLogger(false, false),
	)

messages, err := pubSub.Subscribe(context.Background(), "example.topic")
if err != nil {
	panic(err)
}

go process(messages)
// ...

Code d'exemple complet : github.com/ThreeDotsLabs/watermill/_examples/pubsubs/go-channel/main.go

// ...
func process(messages <-chan *message.Message) {
	for msg := range messages {
		fmt.Printf("received message: %s, payload: %s\n", msg.UUID, string(msg.Payload))

		// Nous devons accuser réception que nous avons reçu et traité le message,
		// sinon il sera renvoyé plusieurs fois.
		msg.Ack()
	}
}

Exemple Kafka

Code source complet : github.com/ThreeDotsLabs/watermill/_examples/pubsubs/kafka/main.go

// ...
package main

import (
	"context"
	"log"
	"time"

	"github.com/Shopify/sarama"

	"github.com/ThreeDotsLabs/watermill"
	"github.com/ThreeDotsLabs/watermill-kafka/v2/pkg/kafka"
	"github.com/ThreeDotsLabs/watermill/message"
)

func main() {
	saramaSubscriberConfig := kafka.DefaultSaramaSubscriberConfig()
	// Équivalent à auto.offset.reset: earliest
	saramaSubscriberConfig.Consumer.Offsets.Initial = sarama.OffsetOldest

	subscriber, err := kafka.NewSubscriber(
		kafka.SubscriberConfig{
			Brokers:               []string{"kafka:9092"},
			Unmarshaler:           kafka.DefaultMarshaler{},
			OverwriteSaramaConfig: saramaSubscriberConfig,
			ConsumerGroup:         "test_consumer_group",
		},
		watermill.NewStdLogger(false, false),
	)
	if err != nil {
		panic(err)
	}

	messages, err := subscriber.Subscribe(context.Background(), "example.topic")
	if err != nil {
		panic(err)
	}

	go process(messages)
// ...
}

Code source complet : github.com/ThreeDotsLabs/watermill/_examples/pubsubs/kafka/main.go

// ...
func process(messages <-chan *message.Message) {
	for msg := range messages {
		log.Printf("message reçu : %s, charge utile : %s", msg.UUID, string(msg.Payload))

		// Nous devons accuser réception que nous avons reçu et traité le message,
		// sinon le message sera renvoyé de manière répétée.
		msg.Ack()
	}
}

Exemple RabbitMQ (AMQP)

Code source complet : github.com/ThreeDotsLabs/watermill/_examples/pubsubs/amqp/main.go

// ...
package main

import (
	"context"
	"log"
	"time"

	"github.com/ThreeDotsLabs/watermill"
	"github.com/ThreeDotsLabs/watermill-amqp/v2/pkg/amqp"
	"github.com/ThreeDotsLabs/watermill/message"
)

var amqpURI = "amqp://guest:guest@rabbitmq:5672/"

func main() {
	amqpConfig := amqp.NewDurableQueueConfig(amqpURI)

	subscriber, err := amqp.NewSubscriber(
		// Cette configuration est basée sur l'exemple suivant: https://www.rabbitmq.com/tutorials/tutorial-two-go.html
		// Elle est utilisée comme une file d'attente simple.
		//
		// Si vous souhaitez implémenter un service de style Pub/Sub, veuillez vous référer à
		// https://watermill.io/pubsubs/amqp/#amqp-consumer-groups
		amqpConfig,
		watermill.NewStdLogger(false, false),
	)
	if err != nil {
		panic(err)
	}

	messages, err := subscriber.Subscribe(context.Background(), "example.topic")
	if err != nil {
		panic(err)
	}

	go process(messages)
// ...
}

Code source complet : github.com/ThreeDotsLabs/watermill/_examples/pubsubs/amqp/main.go

// ...
func process(messages <-chan *message.Message) {
	for msg := range messages {
		log.Printf("Message reçu : %s, charge utile : %s", msg.UUID, string(msg.Payload))

		// Nous devons reconnaître que le message a été reçu et traité,
		// sinon, le message sera redistribué de manière répétée.
		msg.Ack()
	}
}

Exemple SQL

Code source complet : github.com/ThreeDotsLabs/watermill/_examples/pubsubs/sql/main.go

// ...
package main

import (
	contexte "context"
	stdSQL "database/sql"
	"log"
	"temps"

	driver "github.com/go-sql-driver/mysql"

	"github.com/ThreeDotsLabs/watermill"
	"github.com/ThreeDotsLabs/watermill-sql/v2/pkg/sql"
	"github.com/ThreeDotsLabs/watermill/message"
)

func main() {
	db := createDB()
	logger := watermill.NewStdLogger(false, false)

	subscriber, err := sql.NewSubscriber(
		db,
		sql.SubscriberConfig{
			SchemaAdapter:    sql.DefaultMySQLSchema{},
			OffsetsAdapter:   sql.DefaultMySQLOffsetsAdapter{},
			InitializeSchema: true,
		},
		logger,
	)
	if err != nil {
		panic(err)
	}

	messages, err := subscriber.Subscribe(contexte.Background(), "example_topic")
	if err != nil {
		panic(err)
	}

	go process(messages)
// ...

Code source complet : github.com/ThreeDotsLabs/watermill/_examples/pubsubs/sql/main.go

// ...
func process(messages <-chan *message.Message) {
	for msg := range messages {
		log.Printf("Message reçu : %s, charge utile : %s", msg.UUID, string(msg.Payload))

		// Nous devons accuser réception que le message a été reçu et traité,
		// sinon, le message sera redistribué de manière répétée.
		msg.Ack()
	}
}

Créer un message

Watermill n'impose aucun format de message. NewMessage attend que la charge utile soit une tranche d'octets. Vous pouvez utiliser des chaînes, JSON, protobuf, Avro, gob, ou tout autre format pouvant être sérialisé en []byte.

L'UUID du message est facultatif, mais il est recommandé car il facilite le débogage.

msg := message.NewMessage(watermill.NewUUID(), []byte("Bonjour, le monde !"))

Publier un message

La méthode Publish requiert un sujet et un ou plusieurs messages à publier.

err := publisher.Publish("example.topic", msg)
if err != nil {
    panic(err)
}

Exemple de canal Go

Code source complet : github.com/ThreeDotsLabs/watermill/_examples/pubsubs/go-channel/main.go

// ...
	go process(messages)

	publishMessages(pubSub)
}

func publishMessages(publisher message.Publisher) {
	for {
		msg := message.NewMessage(watermill.NewUUID(), []byte("Bonjour, le monde !"))

		if err := publisher.Publish("example.topic", msg); err != nil {
			panic(err)
		}

		time.Sleep(time.Second)
// ...

Exemple Kafka

Code source complet : github.com/ThreeDotsLabs/watermill/_examples/pubsubs/kafka/main.go

// ...
	go process(messages)

	publisher, err := kafka.NewPublisher(
		kafka.PublisherConfig{
			Brokers:   []string{"kafka:9092"},
			Marshaler: kafka.DefaultMarshaler{},
		},
		watermill.NewStdLogger(false, false),
	)
	if err != nil {
		panic(err)
	}

	publishMessages(publisher)
}

func publishMessages(publisher message.Publisher) {
	for {
		msg := message.NewMessage(watermill.NewUUID(), []byte("Bonjour, le monde !"))

		if err := publisher.Publish("example.topic", msg); err != nil {
			panic(err)
		}

		time.Sleep(time.Second)
// ...

Exemple RabbitMQ (AMQP)

Code source complet : github.com/ThreeDotsLabs/watermill/_examples/pubsubs/amqp/main.go

// ...
	go process(messages)

	publisher, err := amqp.NewPublisher(amqpConfig, watermill.NewStdLogger(false, false))
	if err != nil {
		panic(err)
	}

	publishMessages(publisher)
}

func publishMessages(publisher message.Publisher) {
	for {
		msg := message.NewMessage(watermill.NewUUID(), []byte("Bonjour tout le monde!"))

		if err := publisher.Publish("example_topic", msg); err != nil {
			panic(err)
		}

		time.Sleep(time.Second)
// ...

Exemple SQL

Code source complet : github.com/ThreeDotsLabs/watermill/_examples/pubsubs/sql/main.go

// ...
	go process(messages)

	publisher, err := sql.NewPublisher(
		db,
		sql.PublisherConfig{
			SchemaAdapter: sql.DefaultMySQLSchema{},
		},
		logger,
	)
	if err != nil {
		panic(err)
	}

	publishMessages(publisher)
}

func createDB() *stdSQL.DB {
	conf := driver.NewConfig()
	conf.Net = "tcp"
	conf.User = "root"
	conf.Addr = "mysql"
	conf.DBName = "watermill"

	db, err := stdSQL.Open("mysql", conf.FormatDSN())
	if err != nil {
		panic(err)
	}

	err = db.Ping()
	if err != nil {
		panic(err)
	}

	return db
}

func publishMessages(publisher message.Publisher) {
	for {
		msg := message.NewMessage(watermill.NewUUID(), []byte(`{"message": "Bonjour tout le monde!"}`))

		if err := publisher.Publish("example_topic", msg); err != nil {
			panic(err)
		}

		time.Sleep(time.Second)
// ...

Utilisation de Message Router

Les éditeurs et les abonnés sont des parties de bas niveau de Watermill. Dans la plupart des cas, vous voudriez généralement utiliser des interfaces et des fonctionnalités de niveau supérieur telles que les corrélations, les métriques, les files empoisonnées, les tentatives et la limitation de débit.

Vous ne voudrez peut-être reconnaître le message qu'une fois qu'il a été traité avec succès. Dans d'autres cas, vous voudrez peut-être le reconnaître immédiatement, puis envisager de le traiter. Parfois, vous voudrez effectuer certaines actions en fonction du message entrant et publier un autre message en réponse.

Pour répondre à ces exigences, il existe un composant appelé Routeur.

Exemple d'application de Message Router

Le flux de l'exemple d'application est le suivant :

  1. Générer un message sur le incoming_messages_topic toutes les secondes.
  2. Le gestionnaire struct_handler gère le incoming_messages_topic. Après avoir reçu un message, il imprime l'UUID et génère un nouveau message sur le outgoing_messages_topic.
  3. Le gestionnaire print_incoming_messages écoute le incoming_messages_topic et imprime l'UUID, la charge utile et les métadonnées du message.
  4. Le gestionnaire print_outgoing_messages écoute le outgoing_messages_topic et imprime l'UUID, la charge utile et les métadonnées du message. L'ID de corrélation devrait être le même que le message sur le incoming_messages_topic.

Configuration du routeur

Tout d'abord, configurez le routeur en ajoutant des plugins et des middleware. Ensuite, définissez les gestionnaires que le routeur utilisera. Chaque gestionnaire traitera indépendamment les messages.

Code source complet: github.com/ThreeDotsLabs/watermill/_examples/basic/3-router/main.go

// ...
package main

import (
	"context"
	"fmt"
	"log"
	"time"

	"github.com/ThreeDotsLabs/watermill"
	"github.com/ThreeDotsLabs/watermill/message"
	"github.com/ThreeDotsLabs/watermill/message/router/middleware"
	"github.com/ThreeDotsLabs/watermill/message/router/plugin"
	"github.com/ThreeDotsLabs/watermill/pubsub/gochannel"
)

var (
	// Pour cet exemple, nous utilisons une implémentation de journal simple,
	// vous voudrez peut-être fournir votre propre implémentation de `watermill.LoggerAdapter`.
	logger = watermill.NewStdLogger(false, false)
)

func main() {
	router, err := message.NewRouter(message.RouterConfig{}, logger)
	if err != nil {
		panic(err)
	}

	// Lors de la réception du signal SIGTERM, SignalsHandler fermera proprement le routeur.
	// Vous pouvez également fermer le routeur en appelant `r.Close()`.
	router.AddPlugin(plugin.SignalsHandler)

	// Les middleware au niveau du routeur seront exécutés pour chaque message envoyé au routeur
	router.AddMiddleware(
		// CorrelationID copie l'ID de corrélation des métadonnées du message entrant vers le message généré
		middleware.CorrelationID,

		// Si le gestionnaire renvoie une erreur, réessayez la fonction du gestionnaire.
		// Après avoir atteint MaxRetries, le message est Nacked et PubSub est responsable de le renvoyer.
		middleware.Retry{
			MaxRetries:      3,
			InitialInterval: time.Millisecond * 100,
			Logger:          logger,
		}.Middleware,

		// Recoverer gère les pannes dans le gestionnaire.
		// Dans ce cas, il les passe en tant qu'erreurs au middleware Retry.
		middleware.Recoverer,
	)

	// Pour des raisons de simplicité, nous utilisons ici gochannel Pub/Sub,
	// vous pouvez le remplacer par n'importe quelle implémentation Pub/Sub et l'effet sera le même.
	pubSub := gochannel.NewGoChannel(gochannel.Config{}, logger)

	// Générer quelques messages entrants en arrière-plan
	go publishMessages(pubSub)

	// AddHandler renvoie un gestionnaire qui peut être utilisé pour ajouter des middleware au niveau du gestionnaire
	// ou pour arrêter le gestionnaire.
	handler := router.AddHandler(
		"struct_handler",          // Nom du gestionnaire, doit être unique
		"incoming_messages_topic", // Sujet à partir duquel les événements sont lus
		pubSub,
		"outgoing_messages_topic", // Sujet vers lequel les événements sont publiés
		pubSub,
		structHandler{}.Handler,
	)

	// Les middleware au niveau du gestionnaire ne sont exécutés que pour un gestionnaire spécifique
	// Ce type de middleware peut être ajouté de la même manière que les middleware au niveau du routeur
	handler.AddMiddleware(func(h message.HandlerFunc) message.HandlerFunc {
		return func(message *message.Message) ([]*message.Message, error) {
			log.Println("Exécution du middleware spécifique au gestionnaire pour", message.UUID)

			return h(message)
		}
	})

	// À des fins de débogage, nous imprimons tous les messages reçus sur "incoming_messages_topic"
	router.AddNoPublisherHandler(
		"print_incoming_messages",
		"incoming_messages_topic",
		pubSub,
		printMessages,
	)

	// À des fins de débogage, nous imprimons tous les événements envoyés à "outgoing_messages_topic"
	router.AddNoPublisherHandler(
		"print_outgoing_messages",
		"outgoing_messages_topic",
		pubSub,
		printMessages,
	)

	// Maintenant que tous les gestionnaires sont enregistrés, nous exécutons le routeur.
	// Run bloque pendant l'exécution du routeur.
	ctx := context.Background()
	if err := router.Run(ctx); err != nil {
		panic(err)
	}
}
// ...

Messages entrants

Le struct_handler consomme des messages du incoming_messages_topic, donc nous simulons le trafic entrant en appelant publishMessages() en arrière-plan. Notez que nous avons ajouté le middleware SetCorrelationID. Le routeur ajoutera un ID de corrélation à tous les messages générés (stockés dans les métadonnées).

Code source complet: github.com/ThreeDotsLabs/watermill/_examples/basic/3-router/main.go

// …
func publishMessages(publisher message.Publisher) {
    for {
        msg := message.NewMessage(watermill.NewUUID(), []byte("Bonjour tout le monde!"))
        middleware.SetCorrelationID(watermill.NewUUID(), msg)

        log.Printf("envoi du message %s, ID de corrélation : %s\n", msg.UUID, middleware.MessageCorrelationID(msg))

        if err := publisher.Publish("incoming_messages_topic", msg); err != nil {
            panic(err)
        }

        time.Sleep(time.Second)
    }
}
// …

Gestionnaires

Vous avez peut-être remarqué qu'il existe deux types de fonctions de gestionnaire :

  1. Fonction func(msg *message.Message) ([]*message.Message, error)
  2. Méthode func (c structHandler) Handler(msg *message.Message) ([]*message.Message, error)

Si votre gestionnaire est une fonction qui ne dépend d'aucune dépendance, utiliser la première option est correcte. Lorsque votre gestionnaire requiert des dépendances (telles que des accès à la base de données, des enregistreurs, etc.), la deuxième option est utile.

Code source complet: github.com/ThreeDotsLabs/watermill/_examples/basic/3-router/main.go

// …
func printMessages(msg *message.Message) error {
    fmt.Printf(
        "\n> Message reçu : %s\n> %s\n> métadonnées : %v\n\n",
        msg.UUID, string(msg.Payload), msg.Metadata,
    )
    return nil
}

type structHandler struct {
    // Ici nous pouvons ajouter des dépendances
}

func (s structHandler) Handler(msg *message.Message) ([]*message.Message, error) {
    log.Println("structHandler a reçu le message", msg.UUID)

    msg = message.NewMessage(watermill.NewUUID(), []byte("message produit par structHandler"))
    return message.Messages{msg}, nil
}

Terminé !

Vous pouvez exécuter cet exemple avec go run main.go.