Publisher and Subscriber sont les parties de bas niveau de Watermill. Dans les applications pratiques, vous voulez généralement utiliser des interfaces et des fonctions de haut niveau, telles que les associations, les métriques, les files de messages non valides, les réessais, la limitation de débit, etc.

Parfois, vous ne voulez peut-être pas envoyer d'Accusé de réception lorsque le traitement est réussi. Parfois, vous souhaitez envoyer un message après qu'un autre message a été traité.

Pour répondre à ces exigences, il existe un composant appelé Router.

Routeur de Watermill

Configuration

Code source complet : github.com/ThreeDotsLabs/watermill/message/router.go

// ...
type RouterConfig struct {
	// CloseTimeout détermine la durée pendant laquelle le routeur doit fonctionner pour les gestionnaires lors de la fermeture.
	CloseTimeout time.Duration
}

func (c *RouterConfig) setDefaults() {
	if c.CloseTimeout == 0 {
		c.CloseTimeout = time.Second * 30
	}
}

// Validate vérifie s'il y a des erreurs dans la configuration du routeur.
func (c RouterConfig) Validate() error {
	return nil
}
// ...

Gestionnaire

Tout d'abord, vous devez implémenter la fonction HandlerFunc :

Code source complet : github.com/ThreeDotsLabs/watermill/message/router.go

// ...

// HandlerFunc est la fonction appelée lorsqu'un message est reçu.
// 
// Lorsque HandlerFunc ne retourne pas d'erreur, msg.Ack() sera appelé automatiquement.
// 
// Lorsque HandlerFunc retourne une erreur, msg.Nack() sera appelé.
// 
// Lorsque msg.Ack() est appelé dans le gestionnaire et que HandlerFunc retourne une erreur,
// msg.Nack() ne sera pas envoyé car Ack a déjà été envoyé.
// 
// Lors de la réception de plusieurs messages (en raison de l'envoi de msg.Ack() dans HandlerFunc ou du support de plusieurs consommateurs par l'abonné),
// HandlerFunc sera exécuté de manière concurrente.
type HandlerFunc func(msg *Message) ([]*Message, error)

// ...

Ensuite, vous devez utiliser Router.AddHandler pour ajouter un nouveau gestionnaire :

Code source complet : github.com/ThreeDotsLabs/watermill/message/router.go

// ...

// AddHandler ajoute un nouveau gestionnaire.

// handlerName doit être unique. Actuellement, il n'est utilisé que pour le débogage.

// subscribeTopic est le sujet à partir duquel le gestionnaire recevra des messages.

// publishTopic est le sujet à partir duquel les messages retournés par le gestionnaire seront générés par le routeur.

// Lorsque le gestionnaire doit publier sur plusieurs sujets,

// il est recommandé d'injecter uniquement le Publisher au gestionnaire ou de mettre en œuvre un middleware,

// qui peut capturer des messages en fonction des métadonnées et publier sur des sujets spécifiques.

// Si un gestionnaire est ajouté alors que le routeur est déjà en cours d'exécution, RunHandlers() doit être appelé explicitement.

func (r *Router) AddHandler(

	handlerName string,

	subscribeTopic string,

	subscriber Subscriber,

	publishTopic string,

	publisher Publisher,

	handlerFunc HandlerFunc,

) *Handler {

	r.logger.Info("Ajout du gestionnaire", watermill.LogFields{

		"nom_du_gestionnaire": handlerName,

		"sujet":        subscribeTopic,

	})

	r.handlersLock.Lock()

	defer r.handlersLock.Unlock()

	if _, ok := r.handlers[handlerName]; ok {

		panic(DuplicateHandlerNameError{handlerName})

	}

	publisherName, subscriberName := internal.StructName(publisher), internal.StructName(subscriber)

	newHandler := &handler{

		nom:   handlerName,

		logger: r.logger,

		subscriber:     subscriber,

		subscribeTopic: subscribeTopic,

		nom_souscripteur: subscriberName,

		publisher:     publisher,

		publishTopic:  publishTopic,

		nom_publieur: publisherName,

		handlerFunc: handlerFunc,

		runningHandlersWg:     r.runningHandlersWg,

		runningHandlersWgLock: r.runningHandlersWgLock,

		messagesCh:     nil,

		routersCloseCh: r.closingInProgressCh,

		startedCh: make(chan struct{}),

	}

	r.handlersWg.Add(1)

	r.handlers[handlerName] = newHandler

	select {

	case r.handlerAdded struct{}{}:

	default:

		// closeWhenAllHandlersStopped n'attend pas toujours handlerAdded

	}

	return &Handler{

		router:  r,

		handler: newHandler,

	}

}

// AddNoPublisherHandler ajoute un nouveau gestionnaire.

// Ce gestionnaire ne peut pas retourner de messages.

// Lorsqu'il retourne un message, une erreur se produit et un Nack est envoyé.

//

// handlerName doit être unique. Actuellement, il n'est utilisé que pour le débogage.

// subscribeTopic est le sujet à partir duquel le gestionnaire recevra des messages.

// subscriber est un abonné utilisé pour consommer des messages.

// Si un gestionnaire est ajouté alors que le routeur est déjà en cours d'exécution, RunHandlers() doit être appelé explicitement.

func (r *Router) AddNoPublisherHandler(

	handlerName string,

	subscribeTopic string,

	subscriber Subscriber,

handlerFunc NoPublishHandlerFunc,

) *Handler {

handlerFuncAdapter := func(msg *Message) ([]*Message, error) {
// ...

Référez-vous à l'exemple d'utilisation dans "Commencer". Code source complet : github.com/ThreeDotsLabs/watermill/_examples/basic/3-router/main.go

// ...
	// AddHandler renvoie un gestionnaire qui peut être utilisé pour ajouter un middleware au niveau du gestionnaire ou arrêter les gestionnaires.
	handler := router.AddHandler(
		"struct_handler",          // nom du gestionnaire, doit être unique
		"incoming_messages_topic", // sujet à partir duquel les événements sont lus
		pubSub,
		"outgoing_messages_topic", // sujet pour publier les événements
		pubSub,
		structHandler{}.Handler,
	)

	// Le middleware au niveau du gestionnaire n'est exécuté que pour des gestionnaires spécifiques
	// Ce middleware peut être ajouté de la même manière que le middleware au niveau du routeur
	handler.AddMiddleware(func(h message.HandlerFunc) message.HandlerFunc {
		return func(message *message.Message) ([]*message.Message, error) {
			log.Println("Exécution du middleware spécifique au gestionnaire, UUID du message : ", message.UUID)

			return h(message)
		}
	})
// ...

Aucun gestionnaire d'éditeur

Tous les gestionnaires ne généreront pas un nouveau message. Vous pouvez utiliser Router.AddNoPublisherHandler pour ajouter ce type de gestionnaire:

Code source complet: github.com/ThreeDotsLabs/watermill/message/router.go

// ...
// AddNoPublisherHandler ajoute un nouveau gestionnaire.
// Ce gestionnaire ne peut pas renvoyer de messages.
// Lorsqu'il renvoie un message, une erreur se produira et Nack sera envoyé.
//
// handlerName doit être unique et actuellement utilisé uniquement à des fins de débogage.
//
// subscribeTopic est le sujet sur lequel le gestionnaire recevra des messages.
//
// subscriber est utilisé pour consommer des messages.
//
// Si vous ajoutez un gestionnaire à un routeur qui est déjà en cours d'exécution, vous devez appeler explicitement RunHandlers().
func (r *Router) AddNoPublisherHandler(
	handlerName string,
	subscribeTopic string,
	subscriber Subscriber,
	handlerFunc NoPublishHandlerFunc,
) *Handler {
	handlerFuncAdapter := func(msg *Message) ([]*Message, error) {
// ...
}

Accusé de réception

Par défaut, lorsque HanderFunc ne renvoie pas d'erreur, msg.Ack() sera appelé. Si une erreur est renvoyée, msg.Nack() sera appelé. Ainsi, après avoir traité le message, vous n'avez pas besoin d'appeler msg.Ack() ou msg.Nack (bien sûr, vous pouvez le faire si vous le souhaitez).

Production de messages

Lorsque plusieurs messages sont renvoyés par le gestionnaire, veuillez noter que la plupart des implémentations de Publisher ne supportent pas la publication atomique de messages. Si le courtier ou le stockage n'est pas disponible, seuls certains messages peuvent être générés et msg.Nack() sera envoyé.

Si c'est un problème, envisagez de faire en sorte que chaque gestionnaire ne publie qu'un seul message.

Exécution du routeur

Pour exécuter le routeur, vous devez appeler Run().

Code source complet: github.com/ThreeDotsLabs/watermill/message/router.go

// ...
// Run exécute tous les plugins et gestionnaires, et commence à s'abonner aux sujets donnés.
// Cet appel bloque pendant l'exécution du routeur.
//
// Lorsque tous les gestionnaires s'arrêtent (par exemple parce que l'abonnement a été fermé), le routeur s'arrêtera également.
//
// Pour arrêter Run(), vous devez appeler Close() sur le routeur.
//
// ctx sera propagé à tous les abonnés.
//
// Lorsque tous les gestionnaires s'arrêtent (par exemple pour cause de connexions fermées), Run() s'arrêtera également.
func (r *Router) Run(ctx context.Context) (err error) {
// ...
}

S'assurer que le routeur est en cours d'exécution

Il peut être utile de comprendre si le routeur est en cours d'exécution. Vous pouvez y parvenir en utilisant la méthode Running().

Code source complet: github.com/ThreeDotsLabs/watermill/message/router.go

// ...
// Running se ferme lorsque le routeur est en cours d'exécution.
// Autrement dit, vous pouvez attendre que le routeur soit en cours d'exécution de la manière suivante:

// 	fmt.Println("Démarrage du routeur")
//	go r.Run(ctx)
//	//	fmt.Println("Le routeur est en cours d'exécution")

// Attention : Pour des raisons historiques, ce canal ne sait pas quand le routeur s'arrête - il se fermera si le routeur continue de fonctionner et puis s'arrête.
func (r *Router) Running() chan struct{} {
// ...
}

Vous pouvez également utiliser la fonction IsRunning qui renvoie une valeur booléenne :

Code source complet: github.com/ThreeDotsLabs/watermill/message/router.go

// ...
// IsRunning renvoie true lorsque le routeur est en cours d'exécution.
//
// Attention : Pour des raisons historiques, cette méthode ne sait pas si le routeur est fermé.
// Si vous voulez savoir si le routeur a été fermé, utilisez IsClosed.
func (r *Router) IsRunning() bool {
// ...
}

Éteindre le routeur

Pour éteindre le routeur, vous devez appeler Close().

Code source complet : github.com/ThreeDotsLabs/watermill/message/router.go

// ...
// Close ferme proprement le routeur avec un délai fourni dans la configuration.
func (r *Router) Close() error {
	r.closedLock.Lock()
// ...

Close() éteindra tous les éditeurs et abonnés, et attendra que tous les gestionnaires se terminent.

Close() attendra le délai défini dans RouterConfig.CloseTimeout dans la configuration. Si le délai est atteint, Close() renverra une erreur.

Ajouter des gestionnaires après le démarrage du routeur

Vous pouvez ajouter un nouveau gestionnaire lorsque le routeur est déjà en cours d'exécution. Pour ce faire, vous devez appeler AddNoPublisherHandler ou AddHandler, puis appeler RunHandlers.

Code source complet : github.com/ThreeDotsLabs/watermill/message/router.go

// ...
// RunHandlers exécute tous les gestionnaires qui ont été ajoutés après Run().
// RunHandlers est idempotent, donc peut être appelé plusieurs fois en toute sécurité.
func (r *Router) RunHandlers(ctx context.Context) error {
// ...

Arrêter les gestionnaires en cours d'exécution

Vous pouvez arrêter uniquement un gestionnaire en cours d'exécution en appelant Stop().

Veuillez noter que le routeur s'arrêtera lorsqu'il n'y aura plus de gestionnaires en cours d'exécution.

Code source complet : github.com/ThreeDotsLabs/watermill/message/router.go

// ...
// Stop arrête le gestionnaire.
// Stop est asynchrone.
// Vous pouvez vérifier si le gestionnaire a été arrêté avec la fonction Stopped().
func (h *Handler) Stop() {
// ...

Modèle d'exécution

Les abonnés peuvent consommer un seul message séquentiellement ou plusieurs messages en parallèle.

  • Le flux de message unique est la méthode la plus simple, ce qui signifie que les abonnés ne recevront pas de nouveaux messages jusqu'à ce que msg.Ack() soit appelé.
  • Le flux de messages multiples est pris en charge par certains abonnés seulement. En s'abonnant simultanément à plusieurs partitions de sujet, plusieurs messages peuvent être consommés en parallèle, même des messages qui n'ont pas été précédemment confirmés (par exemple, le fonctionnement des abonnés Kafka). Le routeur traite ce modèle en exécutant HandlerFunc en parallèle.

Veuillez vous référer à la documentation Pub/Sub sélectionnée pour comprendre les modèles d'exécution pris en charge.

Middleware

Code source complet : github.com/ThreeDotsLabs/watermill/message/router.go

// ...
// HandlerMiddleware nous permet d'écrire quelque chose de similaire à un décorateur pour HandlerFunc.
// Il peut exécuter certaines opérations avant (par exemple, modifier le message consommé) ou après le gestionnaire (modifier le message généré, confirmer/nier le message consommé, gérer les erreurs, journaliser, etc.).
//
// Il peut être attaché au routeur en utilisant la méthode `AddMiddleware`.
//
// Exemple :
//
// 	func ExampleMiddleware(h HandlerFunc) HandlerFunc {
// 		return func(message *message.Message) ([]*message.Message, error) {
// 			fmt.Println("exécuté avant le gestionnaire")
// 			messagesProduits, err := h(message)
// 			fmt.Println("exécuté après le gestionnaire")
//
// 			return messagesProduits, err
// 		}
// 	}
type HandlerMiddleware func(h HandlerFunc) HandlerFunc

// ...

La liste complète des middlewares standard peut être trouvée dans Middlewares.

Plugins

Code source complet : github.com/ThreeDotsLabs/watermill/message/router.go

// ...
// RouterPlugin est une fonction exécutée lorsque le routeur démarre.
type RouterPlugin func(*Router) error

// ...

La liste complète des plugins standard peut être trouvée dans message/router/plugin.

Context

Certain valeurs utiles sont stockées dans le contexte pour chaque message reçu par le gestionnaire :

Code source complet : github.com/ThreeDotsLabs/watermill/message/router_context.go

// ...
// HandlerNameFromCtx renvoie le nom du gestionnaire de messages dans le routeur qui a consommé le message à partir du contexte.
func HandlerNameFromCtx(ctx context.Context) string {
	return valFromCtx(ctx, handlerNameKey)
}

// PublisherNameFromCtx renvoie le nom du type de publication de messages dans le routeur à partir du contexte.
// Par exemple, pour Kafka, ce sera `kafka.Publisher`.
func PublisherNameFromCtx(ctx context.Context) string {
	return valFromCtx(ctx, publisherNameKey)
}

// SubscriberNameFromCtx renvoie le nom du type d'abonnement de messages dans le routeur à partir du contexte.
// Par exemple, pour Kafka, ce sera `kafka.Subscriber`.
func SubscriberNameFromCtx(ctx context.Context) string {
	return valFromCtx(ctx, subscriberNameKey)
}

// SubscribeTopicFromCtx renvoie le sujet à partir duquel le message a été reçu dans le routeur à partir du contexte.
func SubscribeTopicFromCtx(ctx context.Context) string {
	return valFromCtx(ctx, subscribeTopicKey)
}

// PublishTopicFromCtx renvoie le sujet vers lequel le message sera publié dans le routeur à partir du contexte.
func PublishTopicFromCtx(ctx context.Context) string {
	return valFromCtx(ctx, publishTopicKey)
}
// ...