Modèle de publication/abonnement RabbitMQ (mode de diffusion, mode de fanout)
Le modèle de publication/abonnement dans RabbitMQ signifie qu'un message envoyé par un producteur sera traité par plusieurs consommateurs.
Explication :
- P représente le producteur, C1 et C2 représentent les consommateurs, le rouge représente les files d'attente et X représente l'échange.
- L'échange est responsable de l'envoi des messages à toutes les files d'attente liées à l'échange.
- Plusieurs files d'attente peuvent être définies, chacune étant liée au même échange.
- Chaque file d'attente peut avoir un ou plusieurs consommateurs.
Remarque : Si vous n'êtes pas familier avec RabbitMQ, veuillez d'abord lire la section Concepts de base de RabbitMQ.
1. Installer le package de dépendances
go get github.com/streadway/amqp
2. Envoyer des messages
Les étapes suivantes démontrent comment le producteur de messages envoie des messages.
2.1. Se connecter au serveur RabbitMQ
// Se connecter au serveur RabbitMQ
conn, err := amqp.Dial("amqp://invité:invité@localhost:5672/")
defer conn.Close()
Explication de l'adresse de connexion :
amqp://nom_utilisateur:mot_de_passe@AdresseRabbitMQ:Port/
2.2. Créer un canal
La plupart des opérations sont effectuées sur le canal.
ch, err := conn.Channel()
defer ch.Close()
2.3. Déclarer un échange
Les messages sont d'abord envoyés à l'échange. L'échange envoie les messages aux files d'attente en fonction de sa stratégie.
err = ch.ExchangeDeclare(
"tizi365", // Nom de l'échange
"fanout", // Type d'échange, utilisant le type fanout ici, c'est-à-dire le modèle de publication/abonnement
true, // Durable
false, // Auto-supprimé
false, // Interne
false, // Sans attente
nil, // Arguments
)
2.4. Publier un message
// Contenu du message
body := "Bonjour Tizi365.com !"
// Publier le message
err = ch.Publish(
"tizi365", // Échange (nom de l'échange correspondant à la déclaration précédente)
"", // Clé de routage, pour un échange de type fanout, la clé de routage est automatiquement ignorée, il n'est donc pas nécessaire d'en fournir une
false, // Obligatoire
false, // Immédiat
amqp.Publishing {
ContentType: "text/plain", // Type de contenu du message, ici c'est du texte brut
Body: []byte(body), // Contenu du message
})
2.5. Code complet de l'envoi de message
package main
import (
"log"
"github.com/streadway/amqp"
)
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s : %s", msg, err)
}
}
func main() {
// Se connecter à rabbitmq
conn, err := amqp.Dial("amqp://invité:invité@localhost:5672/")
failOnError(err, "Échec de la connexion à RabbitMQ")
defer conn.Close()
// Créer un canal
ch, err := conn.Channel()
failOnError(err, "Échec de l'ouverture d'un canal")
defer ch.Close()
// Déclarer un échange
err = ch.ExchangeDeclare(
"tizi365", // Nom de l'échange
"fanout", // Type d'échange, fanout pour le mode de publication/abonnement
true, // Durable
false, // Auto-supprimé
false, // Interne
false, // Sans attente
nil, // Arguments
)
failOnError(err, "Échec de la déclaration de l'échange")
// Contenu du message
body := "Bonjour Tizi365.com !"
// Envoyer le message
err = ch.Publish(
"tizi365", // Échange (correspondant à la déclaration ci-dessus)
"", // Clé de routage, pour les échanges de type fanout, la clé de routage est automatiquement ignorée
false, // Obligatoire
false, // Immédiat
amqp.Publishing {
ContentType: "text/plain", // Type de contenu du message, ici c'est du texte brut
Body: []byte(body), // Contenu du message
})
log.Printf("Contenu envoyé %s", body)
}
3. Recevoir des messages
Les trois premières étapes pour recevoir des messages - se connecter à RabbitMQ, créer un canal et déclarer un échange - sont les mêmes que pour l'envoi de messages. Référez-vous aux sections précédentes 2.1, 2.2 et 2.3.
3.1. Déclarer une file d'attente
Déclarez la file d'attente sur laquelle effectuer des opérations
q, err := ch.QueueDeclare(
"", // Nom de la file d'attente, si non spécifié, un nom aléatoire sera généré
false, // Durable
false, // Supprimer lorsque inutilisé
true, // Exclusif
false, // No-wait
nil, // Arguments
)
3.2. Lier la file d'attente à l'échange
La file d'attente doit être liée à l'échange pour recevoir des messages
err = ch.QueueBind(
q.Name, // Nom de la file d'attente
"", // Clé de routage, pour les échanges de type fanout, la clé de routage est automatiquement ignorée
"tizi365", // Nom de l'échange, doit correspondre à celui défini par l'expéditeur du message
false,
nil)
Remarque : Dans les applications réelles, nous pouvons définir N files d'attente, chacune liée au même échange, afin de recevoir des messages transférés par l'échange. C'est là que le motif de publication/abonnement se reflète.
3.3. Créer un consommateur
msgs, err := ch.Consume(
q.Name, // Référence au nom de la file d'attente ci-dessus
"", // Nom du consommateur, s'il n'est pas spécifié, il sera généré automatiquement
true, // Reconnaître automatiquement que le message a été traité
false, // Exclusif
false, // No-local
false, // No-wait
nil, // Args
)
// Boucle de traitement des messages
for d := range msgs {
log.Printf("Message reçu=%s", d.Body)
}
3.4. Code complet du consommateur
package main
import (
"log"
"github.com/streadway/amqp"
)
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}
func main() {
// Se connecter à RabbitMQ
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Impossible de se connecter à RabbitMQ")
defer conn.Close()
// Créer un canal, généralement un par consommateur
ch, err := conn.Channel()
failOnError(err, "Impossible d'ouvrir un canal")
defer ch.Close()
// Déclarer un échange
err = ch.ExchangeDeclare(
"tizi365", // Nom de l'échange, doit correspondre à celui utilisé par l'expéditeur du message
"fanout", // Type d'échange
true, // Durable
false, // Auto-supprimé
false, // Interne
false, // No-wait
nil, // Arguments
)
failOnError(err, "Impossible de déclarer un échange")
// Déclarer la file d'attente sur laquelle opérer
q, err := ch.QueueDeclare(
"", // Nom de la file d'attente, si vide un nom aléatoire sera généré
false, // Durable
false, // Supprimer lorsque inutilisé
true, // Exclusif
false, // No-wait
nil, // Arguments
)
failOnError(err, "Impossible de déclarer une file d'attente")
// Lier la file d'attente à l'échange spécifié
err = ch.QueueBind(
q.Name, // Nom de la file d'attente
"", // Clé de routage, ignorée pour les échanges de type fanout
"tizi365", // Nom de l'échange, doit correspondre à celui défini par l'expéditeur du message
false,
nil)
failOnError(err, "Impossible de lier une file d'attente")
// Créer un consommateur
msgs, err := ch.Consume(
q.Name, // Référence au nom de la file d'attente ci-dessus
"", // Nom du consommateur, sera généré automatiquement s'il est vide
true, // Auto-ack
false, // Exclusif
false, // No-local
false, // No-wait
nil, // Args
)
failOnError(err, "Impossible d'enregistrer un consommateur")
// Consommer les messages de la file d'attente dans une boucle
for d := range msgs {
log.Printf("Message reçu: %s", d.Body)
}
}
3.5. Multiples consommateurs
Consultez la section Mode de travail et démarrez simplement plusieurs consommateurs en utilisant des goroutines.