Installation
go get -u github.com/ThreeDotsLabs/watermill
Préparation
L'idée fondamentale derrière les applications basées sur les événements est toujours la même : écouter les messages entrants et réagir à ceux-ci. Watermill prend en charge la mise en œuvre de ce comportement pour plusieurs éditeurs et abonnés.
La partie centrale de Watermill est le Message, aussi important que http.Request
dans le package http
. La plupart des fonctionnalités de Watermill utilisent cette structure d'une certaine manière.
Malgré les fonctionnalités complexes fournies par la bibliothèque PubSub, il est seulement nécessaire d'implémenter deux interfaces pour commencer à les utiliser avec Watermill : Publisher
et Subscriber
.
type Publisher interface {
Publish(topic string, messages ...*Message) error
Close() error
}
type Subscriber interface {
Subscribe(ctx context.Context, topic string) (<-chan *Message, error)
}
Abonnement aux Messages
Commençons par l'abonnement. Subscribe
attend un nom de sujet et renvoie un canal pour recevoir les messages entrants. La signification spécifique du sujet dépend de l'implémentation de PubSub.
messages, err := subscriber.Subscribe(ctx, "example.topic")
if err != nil {
panic(err)
}
for msg := range messages {
fmt.Printf("received message: %s, payload: %s\n", msg.UUID, string(msg.Payload))
msg.Ack()
}
Pour des exemples détaillés des PubSub pris en charge, veuillez vous référer au contenu suivant.
Exemple de Canal Go
Code d'exemple complet : github.com/ThreeDotsLabs/watermill/_examples/pubsubs/go-channel/main.go
// ...
package main
import (
"context"
"fmt"
"time"
"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill/message"
"github.com/ThreeDotsLabs/watermill/pubsub/gochannel"
)
func main() {
pubSub := gochannel.NewGoChannel(
gochannel.Config{},
watermill.NewStdLogger(false, false),
)
messages, err := pubSub.Subscribe(context.Background(), "example.topic")
if err != nil {
panic(err)
}
go process(messages)
// ...
Code d'exemple complet : github.com/ThreeDotsLabs/watermill/_examples/pubsubs/go-channel/main.go
// ...
func process(messages <-chan *message.Message) {
for msg := range messages {
fmt.Printf("received message: %s, payload: %s\n", msg.UUID, string(msg.Payload))
// Nous devons accuser réception que nous avons reçu et traité le message,
// sinon il sera renvoyé plusieurs fois.
msg.Ack()
}
}
Exemple Kafka
Code source complet : github.com/ThreeDotsLabs/watermill/_examples/pubsubs/kafka/main.go
// ...
package main
import (
"context"
"log"
"time"
"github.com/Shopify/sarama"
"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill-kafka/v2/pkg/kafka"
"github.com/ThreeDotsLabs/watermill/message"
)
func main() {
saramaSubscriberConfig := kafka.DefaultSaramaSubscriberConfig()
// Équivalent à auto.offset.reset: earliest
saramaSubscriberConfig.Consumer.Offsets.Initial = sarama.OffsetOldest
subscriber, err := kafka.NewSubscriber(
kafka.SubscriberConfig{
Brokers: []string{"kafka:9092"},
Unmarshaler: kafka.DefaultMarshaler{},
OverwriteSaramaConfig: saramaSubscriberConfig,
ConsumerGroup: "test_consumer_group",
},
watermill.NewStdLogger(false, false),
)
if err != nil {
panic(err)
}
messages, err := subscriber.Subscribe(context.Background(), "example.topic")
if err != nil {
panic(err)
}
go process(messages)
// ...
}
Code source complet : github.com/ThreeDotsLabs/watermill/_examples/pubsubs/kafka/main.go
// ...
func process(messages <-chan *message.Message) {
for msg := range messages {
log.Printf("message reçu : %s, charge utile : %s", msg.UUID, string(msg.Payload))
// Nous devons accuser réception que nous avons reçu et traité le message,
// sinon le message sera renvoyé de manière répétée.
msg.Ack()
}
}
Exemple RabbitMQ (AMQP)
Code source complet : github.com/ThreeDotsLabs/watermill/_examples/pubsubs/amqp/main.go
// ...
package main
import (
"context"
"log"
"time"
"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill-amqp/v2/pkg/amqp"
"github.com/ThreeDotsLabs/watermill/message"
)
var amqpURI = "amqp://guest:guest@rabbitmq:5672/"
func main() {
amqpConfig := amqp.NewDurableQueueConfig(amqpURI)
subscriber, err := amqp.NewSubscriber(
// Cette configuration est basée sur l'exemple suivant: https://www.rabbitmq.com/tutorials/tutorial-two-go.html
// Elle est utilisée comme une file d'attente simple.
//
// Si vous souhaitez implémenter un service de style Pub/Sub, veuillez vous référer à
// https://watermill.io/pubsubs/amqp/#amqp-consumer-groups
amqpConfig,
watermill.NewStdLogger(false, false),
)
if err != nil {
panic(err)
}
messages, err := subscriber.Subscribe(context.Background(), "example.topic")
if err != nil {
panic(err)
}
go process(messages)
// ...
}
Code source complet : github.com/ThreeDotsLabs/watermill/_examples/pubsubs/amqp/main.go
// ...
func process(messages <-chan *message.Message) {
for msg := range messages {
log.Printf("Message reçu : %s, charge utile : %s", msg.UUID, string(msg.Payload))
// Nous devons reconnaître que le message a été reçu et traité,
// sinon, le message sera redistribué de manière répétée.
msg.Ack()
}
}
Exemple SQL
Code source complet : github.com/ThreeDotsLabs/watermill/_examples/pubsubs/sql/main.go
// ...
package main
import (
contexte "context"
stdSQL "database/sql"
"log"
"temps"
driver "github.com/go-sql-driver/mysql"
"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill-sql/v2/pkg/sql"
"github.com/ThreeDotsLabs/watermill/message"
)
func main() {
db := createDB()
logger := watermill.NewStdLogger(false, false)
subscriber, err := sql.NewSubscriber(
db,
sql.SubscriberConfig{
SchemaAdapter: sql.DefaultMySQLSchema{},
OffsetsAdapter: sql.DefaultMySQLOffsetsAdapter{},
InitializeSchema: true,
},
logger,
)
if err != nil {
panic(err)
}
messages, err := subscriber.Subscribe(contexte.Background(), "example_topic")
if err != nil {
panic(err)
}
go process(messages)
// ...
Code source complet : github.com/ThreeDotsLabs/watermill/_examples/pubsubs/sql/main.go
// ...
func process(messages <-chan *message.Message) {
for msg := range messages {
log.Printf("Message reçu : %s, charge utile : %s", msg.UUID, string(msg.Payload))
// Nous devons accuser réception que le message a été reçu et traité,
// sinon, le message sera redistribué de manière répétée.
msg.Ack()
}
}
Créer un message
Watermill n'impose aucun format de message. NewMessage
attend que la charge utile soit une tranche d'octets. Vous pouvez utiliser des chaînes, JSON, protobuf, Avro, gob, ou tout autre format pouvant être sérialisé en []byte
.
L'UUID du message est facultatif, mais il est recommandé car il facilite le débogage.
msg := message.NewMessage(watermill.NewUUID(), []byte("Bonjour, le monde !"))
Publier un message
La méthode Publish
requiert un sujet et un ou plusieurs messages à publier.
err := publisher.Publish("example.topic", msg)
if err != nil {
panic(err)
}
Exemple de canal Go
Code source complet : github.com/ThreeDotsLabs/watermill/_examples/pubsubs/go-channel/main.go
// ...
go process(messages)
publishMessages(pubSub)
}
func publishMessages(publisher message.Publisher) {
for {
msg := message.NewMessage(watermill.NewUUID(), []byte("Bonjour, le monde !"))
if err := publisher.Publish("example.topic", msg); err != nil {
panic(err)
}
time.Sleep(time.Second)
// ...
Exemple Kafka
Code source complet : github.com/ThreeDotsLabs/watermill/_examples/pubsubs/kafka/main.go
// ...
go process(messages)
publisher, err := kafka.NewPublisher(
kafka.PublisherConfig{
Brokers: []string{"kafka:9092"},
Marshaler: kafka.DefaultMarshaler{},
},
watermill.NewStdLogger(false, false),
)
if err != nil {
panic(err)
}
publishMessages(publisher)
}
func publishMessages(publisher message.Publisher) {
for {
msg := message.NewMessage(watermill.NewUUID(), []byte("Bonjour, le monde !"))
if err := publisher.Publish("example.topic", msg); err != nil {
panic(err)
}
time.Sleep(time.Second)
// ...
Exemple RabbitMQ (AMQP)
Code source complet : github.com/ThreeDotsLabs/watermill/_examples/pubsubs/amqp/main.go
// ...
go process(messages)
publisher, err := amqp.NewPublisher(amqpConfig, watermill.NewStdLogger(false, false))
if err != nil {
panic(err)
}
publishMessages(publisher)
}
func publishMessages(publisher message.Publisher) {
for {
msg := message.NewMessage(watermill.NewUUID(), []byte("Bonjour tout le monde!"))
if err := publisher.Publish("example_topic", msg); err != nil {
panic(err)
}
time.Sleep(time.Second)
// ...
Exemple SQL
Code source complet : github.com/ThreeDotsLabs/watermill/_examples/pubsubs/sql/main.go
// ...
go process(messages)
publisher, err := sql.NewPublisher(
db,
sql.PublisherConfig{
SchemaAdapter: sql.DefaultMySQLSchema{},
},
logger,
)
if err != nil {
panic(err)
}
publishMessages(publisher)
}
func createDB() *stdSQL.DB {
conf := driver.NewConfig()
conf.Net = "tcp"
conf.User = "root"
conf.Addr = "mysql"
conf.DBName = "watermill"
db, err := stdSQL.Open("mysql", conf.FormatDSN())
if err != nil {
panic(err)
}
err = db.Ping()
if err != nil {
panic(err)
}
return db
}
func publishMessages(publisher message.Publisher) {
for {
msg := message.NewMessage(watermill.NewUUID(), []byte(`{"message": "Bonjour tout le monde!"}`))
if err := publisher.Publish("example_topic", msg); err != nil {
panic(err)
}
time.Sleep(time.Second)
// ...
Utilisation de Message Router
Les éditeurs et les abonnés sont des parties de bas niveau de Watermill. Dans la plupart des cas, vous voudriez généralement utiliser des interfaces et des fonctionnalités de niveau supérieur telles que les corrélations, les métriques, les files empoisonnées, les tentatives et la limitation de débit.
Vous ne voudrez peut-être reconnaître le message qu'une fois qu'il a été traité avec succès. Dans d'autres cas, vous voudrez peut-être le reconnaître immédiatement, puis envisager de le traiter. Parfois, vous voudrez effectuer certaines actions en fonction du message entrant et publier un autre message en réponse.
Pour répondre à ces exigences, il existe un composant appelé Routeur.
Exemple d'application de Message Router
Le flux de l'exemple d'application est le suivant :
- Générer un message sur le
incoming_messages_topic
toutes les secondes. - Le gestionnaire
struct_handler
gère leincoming_messages_topic
. Après avoir reçu un message, il imprime l'UUID et génère un nouveau message sur leoutgoing_messages_topic
. - Le gestionnaire
print_incoming_messages
écoute leincoming_messages_topic
et imprime l'UUID, la charge utile et les métadonnées du message. - Le gestionnaire
print_outgoing_messages
écoute leoutgoing_messages_topic
et imprime l'UUID, la charge utile et les métadonnées du message. L'ID de corrélation devrait être le même que le message sur leincoming_messages_topic
.
Configuration du routeur
Tout d'abord, configurez le routeur en ajoutant des plugins et des middleware. Ensuite, définissez les gestionnaires que le routeur utilisera. Chaque gestionnaire traitera indépendamment les messages.
Code source complet: github.com/ThreeDotsLabs/watermill/_examples/basic/3-router/main.go
// ...
package main
import (
"context"
"fmt"
"log"
"time"
"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill/message"
"github.com/ThreeDotsLabs/watermill/message/router/middleware"
"github.com/ThreeDotsLabs/watermill/message/router/plugin"
"github.com/ThreeDotsLabs/watermill/pubsub/gochannel"
)
var (
// Pour cet exemple, nous utilisons une implémentation de journal simple,
// vous voudrez peut-être fournir votre propre implémentation de `watermill.LoggerAdapter`.
logger = watermill.NewStdLogger(false, false)
)
func main() {
router, err := message.NewRouter(message.RouterConfig{}, logger)
if err != nil {
panic(err)
}
// Lors de la réception du signal SIGTERM, SignalsHandler fermera proprement le routeur.
// Vous pouvez également fermer le routeur en appelant `r.Close()`.
router.AddPlugin(plugin.SignalsHandler)
// Les middleware au niveau du routeur seront exécutés pour chaque message envoyé au routeur
router.AddMiddleware(
// CorrelationID copie l'ID de corrélation des métadonnées du message entrant vers le message généré
middleware.CorrelationID,
// Si le gestionnaire renvoie une erreur, réessayez la fonction du gestionnaire.
// Après avoir atteint MaxRetries, le message est Nacked et PubSub est responsable de le renvoyer.
middleware.Retry{
MaxRetries: 3,
InitialInterval: time.Millisecond * 100,
Logger: logger,
}.Middleware,
// Recoverer gère les pannes dans le gestionnaire.
// Dans ce cas, il les passe en tant qu'erreurs au middleware Retry.
middleware.Recoverer,
)
// Pour des raisons de simplicité, nous utilisons ici gochannel Pub/Sub,
// vous pouvez le remplacer par n'importe quelle implémentation Pub/Sub et l'effet sera le même.
pubSub := gochannel.NewGoChannel(gochannel.Config{}, logger)
// Générer quelques messages entrants en arrière-plan
go publishMessages(pubSub)
// AddHandler renvoie un gestionnaire qui peut être utilisé pour ajouter des middleware au niveau du gestionnaire
// ou pour arrêter le gestionnaire.
handler := router.AddHandler(
"struct_handler", // Nom du gestionnaire, doit être unique
"incoming_messages_topic", // Sujet à partir duquel les événements sont lus
pubSub,
"outgoing_messages_topic", // Sujet vers lequel les événements sont publiés
pubSub,
structHandler{}.Handler,
)
// Les middleware au niveau du gestionnaire ne sont exécutés que pour un gestionnaire spécifique
// Ce type de middleware peut être ajouté de la même manière que les middleware au niveau du routeur
handler.AddMiddleware(func(h message.HandlerFunc) message.HandlerFunc {
return func(message *message.Message) ([]*message.Message, error) {
log.Println("Exécution du middleware spécifique au gestionnaire pour", message.UUID)
return h(message)
}
})
// À des fins de débogage, nous imprimons tous les messages reçus sur "incoming_messages_topic"
router.AddNoPublisherHandler(
"print_incoming_messages",
"incoming_messages_topic",
pubSub,
printMessages,
)
// À des fins de débogage, nous imprimons tous les événements envoyés à "outgoing_messages_topic"
router.AddNoPublisherHandler(
"print_outgoing_messages",
"outgoing_messages_topic",
pubSub,
printMessages,
)
// Maintenant que tous les gestionnaires sont enregistrés, nous exécutons le routeur.
// Run bloque pendant l'exécution du routeur.
ctx := context.Background()
if err := router.Run(ctx); err != nil {
panic(err)
}
}
// ...
Messages entrants
Le struct_handler
consomme des messages du incoming_messages_topic
, donc nous simulons le trafic entrant en appelant publishMessages()
en arrière-plan. Notez que nous avons ajouté le middleware SetCorrelationID
. Le routeur ajoutera un ID de corrélation à tous les messages générés (stockés dans les métadonnées).
Code source complet: github.com/ThreeDotsLabs/watermill/_examples/basic/3-router/main.go
// …
func publishMessages(publisher message.Publisher) {
for {
msg := message.NewMessage(watermill.NewUUID(), []byte("Bonjour tout le monde!"))
middleware.SetCorrelationID(watermill.NewUUID(), msg)
log.Printf("envoi du message %s, ID de corrélation : %s\n", msg.UUID, middleware.MessageCorrelationID(msg))
if err := publisher.Publish("incoming_messages_topic", msg); err != nil {
panic(err)
}
time.Sleep(time.Second)
}
}
// …
Gestionnaires
Vous avez peut-être remarqué qu'il existe deux types de fonctions de gestionnaire :
- Fonction
func(msg *message.Message) ([]*message.Message, error)
- Méthode
func (c structHandler) Handler(msg *message.Message) ([]*message.Message, error)
Si votre gestionnaire est une fonction qui ne dépend d'aucune dépendance, utiliser la première option est correcte. Lorsque votre gestionnaire requiert des dépendances (telles que des accès à la base de données, des enregistreurs, etc.), la deuxième option est utile.
Code source complet: github.com/ThreeDotsLabs/watermill/_examples/basic/3-router/main.go
// …
func printMessages(msg *message.Message) error {
fmt.Printf(
"\n> Message reçu : %s\n> %s\n> métadonnées : %v\n\n",
msg.UUID, string(msg.Payload), msg.Metadata,
)
return nil
}
type structHandler struct {
// Ici nous pouvons ajouter des dépendances
}
func (s structHandler) Handler(msg *message.Message) ([]*message.Message, error) {
log.Println("structHandler a reçu le message", msg.UUID)
msg = message.NewMessage(watermill.NewUUID(), []byte("message produit par structHandler"))
return message.Messages{msg}, nil
}
Terminé !
Vous pouvez exécuter cet exemple avec go run main.go
.