Mode queue simple de Golang RabbitMQ
Explication: P représente le producteur, C représente le consommateur, et le rouge représente la file d'attente.
Remarque: Si vous n'êtes pas familier avec RabbitMQ, veuillez d'abord lire la section Concepts de base de RabbitMQ.
1. Installer les dépendances
go get github.com/streadway/amqp
Importer le package de dépendance
import (
"github.com/streadway/amqp"
)
2. Envoyer des messages
Les étapes suivantes démontrent comment le producteur de messages complète la poussée de message.
2.1. Se connecter au serveur RabbitMQ
// Se connecter au serveur RabbitMQ
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
defer conn.Close()
Explication de l'adresse de connexion:
amqp://nom_utilisateur:mot_de_passe@adresse_RabbitMQ:port/
2.2. Créer un canal
La plupart des opérations sont effectuées sur le canal.
ch, err := conn.Channel()
defer ch.Close()
2.3. Déclarer une file d'attente
Représente la file d'attente à partir de laquelle nous devons lire ou écrire.
q, err := ch.QueueDeclare(
"hello", // Nom de la file d'attente
false, // Persistance du message
false, // Supprimer la file d'attente lorsqu'elle n'est pas utilisée
false, // Exclusif
false, // No-wait
nil, // Arguments
)
2.4. Pousser les messages
// Contenu du message
body := "Bonjour le monde !"
// Pousser le message
err = ch.Publish(
"", // Échange (ignorer ici)
q.Name, // Paramètre de routage, utiliser le nom de la file d'attente comme paramètre de routage
false, // Obligatoire
false, // Immédiat
amqp.Publishing {
ContentType: "text/plain",
Body: []byte(body), // Contenu du message
})
2.5. Code complet pour l'envoi des messages
package main
// Importer les packages
import (
"log"
"github.com/streadway/amqp"
)
// Gérer les erreurs
func failOnError(err erreur, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}
func main() {
// Se connecter à RabbitMQ
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Échec de la connexion à RabbitMQ")
defer conn.Close()
// Créer un canal
ch, err := conn.Channel()
failOnError(err, "Échec d'ouverture d'un canal")
defer ch.Close()
// Déclarer la file d'attente à manipuler
q, err := ch.QueueDeclare(
"hello", // Nom
false, // Durable
false, // Supprimer lorsqu'inutilisé
false, // Exclusif
false, // No-wait
nil, // Arguments
)
failOnError(err, "Échec de la déclaration d'une file d'attente")
// Contenu du message à envoyer
body := "Bonjour le monde !"
// Envoyer le message
err = ch.Publish(
"", // Échange
q.Name, // Clé de routage
false, // Obligatoire
false, // Immédiat
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(body),
})
failOnError(err, "Échec de la publication d'un message")
log.Printf(" [x] Envoyé %s", body)
}
Les trois premières étapes de la réception des messages sont les mêmes que l'envoi des messages, correspondant aux sections 2.1, 2.2 et 2.3 respectivement. Le code complet pour la réception des messages est le suivant:
package main
// Importer les packages
import (
"log"
"github.com/streadway/amqp"
)
// Gestion des erreurs
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}
func main() {
// Se connecter à RabbitMQ
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Impossible de se connecter à RabbitMQ")
defer conn.Close()
// Créer un canal
ch, err := conn.Channel()
failOnError(err, "Impossible d'ouvrir un canal")
defer ch.Close()
// Déclarer la file sur laquelle opérer
q, err := ch.QueueDeclare(
"hello", // Le nom de la file doit être cohérent avec le nom de la file pour l'envoi des messages
false, // durable
false, // supprimer lorsqu'inutilisé
false, // exclusif
false, // no-wait
nil, // arguments
)
failOnError(err, "Impossible de déclarer une file")
// Créer un consommateur de messages
msgs, err := ch.Consume(
q.Name, // Nom de la file
"", // Nom du consommateur, si non renseigné, un identifiant unique sera généré automatiquement
true, // Sautomatiquement acquitter les messages, c'est-à-dire informer automatiquement RabbitMQ que le message a été traité avec succès
false, // exclusif
false, // no-local
false, // no-wait
nil, // args
)
failOnError(err, "Impossible d'enregistrer un consommateur")
// Récupérer les messages de la file en boucle
for d := range msgs {
// Afficher le contenu du message
log.Printf("Message reçu: %s", d.Body)
}
}