Modèle de publication/abonnement RabbitMQ (mode de diffusion, mode de fanout)

Le modèle de publication/abonnement dans RabbitMQ signifie qu'un message envoyé par un producteur sera traité par plusieurs consommateurs.

Mode fanout

Explication :

  • P représente le producteur, C1 et C2 représentent les consommateurs, le rouge représente les files d'attente et X représente l'échange.
  • L'échange est responsable de l'envoi des messages à toutes les files d'attente liées à l'échange.
  • Plusieurs files d'attente peuvent être définies, chacune étant liée au même échange.
  • Chaque file d'attente peut avoir un ou plusieurs consommateurs.

Remarque : Si vous n'êtes pas familier avec RabbitMQ, veuillez d'abord lire la section Concepts de base de RabbitMQ.

1. Installer le package de dépendances

go get github.com/streadway/amqp

2. Envoyer des messages

Les étapes suivantes démontrent comment le producteur de messages envoie des messages.

2.1. Se connecter au serveur RabbitMQ

// Se connecter au serveur RabbitMQ
conn, err := amqp.Dial("amqp://invité:invité@localhost:5672/")
defer conn.Close()

Explication de l'adresse de connexion :

amqp://nom_utilisateur:mot_de_passe@AdresseRabbitMQ:Port/

2.2. Créer un canal

La plupart des opérations sont effectuées sur le canal.

ch, err := conn.Channel()
defer ch.Close()

2.3. Déclarer un échange

Les messages sont d'abord envoyés à l'échange. L'échange envoie les messages aux files d'attente en fonction de sa stratégie.

err = ch.ExchangeDeclare(
    "tizi365",   // Nom de l'échange
    "fanout", // Type d'échange, utilisant le type fanout ici, c'est-à-dire le modèle de publication/abonnement
    true,     // Durable
    false,    // Auto-supprimé
    false,    // Interne
    false,    // Sans attente
    nil,      // Arguments
)

2.4. Publier un message

// Contenu du message
body := "Bonjour Tizi365.com !"

// Publier le message
err = ch.Publish(
  "tizi365",     // Échange (nom de l'échange correspondant à la déclaration précédente)
  "", // Clé de routage, pour un échange de type fanout, la clé de routage est automatiquement ignorée, il n'est donc pas nécessaire d'en fournir une
  false,  // Obligatoire
  false,  // Immédiat
  amqp.Publishing {
    ContentType: "text/plain", // Type de contenu du message, ici c'est du texte brut
    Body:        []byte(body),  // Contenu du message
  })

2.5. Code complet de l'envoi de message

package main

import (
	"log"
	"github.com/streadway/amqp"
)

func failOnError(err error, msg string) {
	if err != nil {
		log.Fatalf("%s : %s", msg, err)
	}
}

func main() {
	// Se connecter à rabbitmq
	conn, err := amqp.Dial("amqp://invité:invité@localhost:5672/")
	failOnError(err, "Échec de la connexion à RabbitMQ")
	defer conn.Close()

	// Créer un canal
	ch, err := conn.Channel()
	failOnError(err, "Échec de l'ouverture d'un canal")
	defer ch.Close()

	// Déclarer un échange
	err = ch.ExchangeDeclare(
		"tizi365",   // Nom de l'échange
		"fanout", // Type d'échange, fanout pour le mode de publication/abonnement
		true,     // Durable
		false,    // Auto-supprimé
		false,    // Interne
		false,    // Sans attente
		nil,      // Arguments
	)
	failOnError(err, "Échec de la déclaration de l'échange")

	// Contenu du message
	body := "Bonjour Tizi365.com !"
	// Envoyer le message
	err = ch.Publish(
		"tizi365",     // Échange (correspondant à la déclaration ci-dessus)
		"", // Clé de routage, pour les échanges de type fanout, la clé de routage est automatiquement ignorée
		false,  // Obligatoire
		false,  // Immédiat
		amqp.Publishing {
			ContentType: "text/plain", // Type de contenu du message, ici c'est du texte brut
			Body:        []byte(body),  // Contenu du message
		})

	log.Printf("Contenu envoyé %s", body)
}

3. Recevoir des messages

Les trois premières étapes pour recevoir des messages - se connecter à RabbitMQ, créer un canal et déclarer un échange - sont les mêmes que pour l'envoi de messages. Référez-vous aux sections précédentes 2.1, 2.2 et 2.3.

3.1. Déclarer une file d'attente

Déclarez la file d'attente sur laquelle effectuer des opérations

q, err := ch.QueueDeclare(
		"",    // Nom de la file d'attente, si non spécifié, un nom aléatoire sera généré
		false, // Durable
		false, // Supprimer lorsque inutilisé
		true,  // Exclusif
		false, // No-wait
		nil,   // Arguments
	)

3.2. Lier la file d'attente à l'échange

La file d'attente doit être liée à l'échange pour recevoir des messages

err = ch.QueueBind(
		q.Name, // Nom de la file d'attente
		"",     // Clé de routage, pour les échanges de type fanout, la clé de routage est automatiquement ignorée
		"tizi365", // Nom de l'échange, doit correspondre à celui défini par l'expéditeur du message
		false,
		nil)

Remarque : Dans les applications réelles, nous pouvons définir N files d'attente, chacune liée au même échange, afin de recevoir des messages transférés par l'échange. C'est là que le motif de publication/abonnement se reflète.

3.3. Créer un consommateur

msgs, err := ch.Consume(
		q.Name, // Référence au nom de la file d'attente ci-dessus
		"",     // Nom du consommateur, s'il n'est pas spécifié, il sera généré automatiquement
		true,   // Reconnaître automatiquement que le message a été traité
		false,  // Exclusif
		false,  // No-local
		false,  // No-wait
		nil,    // Args
	)
	
// Boucle de traitement des messages
for d := range msgs {
	log.Printf("Message reçu=%s", d.Body)
}

3.4. Code complet du consommateur

package main

import (
	"log"

	"github.com/streadway/amqp"
)

func failOnError(err error, msg string) {
	if err != nil {
		log.Fatalf("%s: %s", msg, err)
	}
}

func main() {
	// Se connecter à RabbitMQ
	conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
	failOnError(err, "Impossible de se connecter à RabbitMQ")
	defer conn.Close()

	// Créer un canal, généralement un par consommateur
	ch, err := conn.Channel()
	failOnError(err, "Impossible d'ouvrir un canal")
	defer ch.Close()

	// Déclarer un échange
	err = ch.ExchangeDeclare(
		"tizi365",   // Nom de l'échange, doit correspondre à celui utilisé par l'expéditeur du message
		"fanout", // Type d'échange
		true,     // Durable
		false,    // Auto-supprimé
		false,    // Interne
		false,    // No-wait
		nil,      // Arguments
	)
	failOnError(err, "Impossible de déclarer un échange")

	// Déclarer la file d'attente sur laquelle opérer
	q, err := ch.QueueDeclare(
		"",    // Nom de la file d'attente, si vide un nom aléatoire sera généré
		false, // Durable
		false, // Supprimer lorsque inutilisé
		true,  // Exclusif
		false, // No-wait
		nil,   // Arguments
	)
	failOnError(err, "Impossible de déclarer une file d'attente")

	// Lier la file d'attente à l'échange spécifié
	err = ch.QueueBind(
		q.Name, // Nom de la file d'attente
		"",     // Clé de routage, ignorée pour les échanges de type fanout
		"tizi365", // Nom de l'échange, doit correspondre à celui défini par l'expéditeur du message
		false,
		nil)
	failOnError(err, "Impossible de lier une file d'attente")

	// Créer un consommateur
	msgs, err := ch.Consume(
		q.Name, // Référence au nom de la file d'attente ci-dessus
		"",     // Nom du consommateur, sera généré automatiquement s'il est vide
		true,   // Auto-ack
		false,  // Exclusif
		false,  // No-local
		false,  // No-wait
		nil,    // Args
	)
	failOnError(err, "Impossible d'enregistrer un consommateur")

	// Consommer les messages de la file d'attente dans une boucle
	for d := range msgs {
		log.Printf("Message reçu: %s", d.Body)
	}
}

3.5. Multiples consommateurs

Consultez la section Mode de travail et démarrez simplement plusieurs consommateurs en utilisant des goroutines.