Mode de travail de Golang RabbitMQ, réalisation de la consommation concurrente par plusieurs consommateurs.
Explication : P représente le producteur, C1 et C2 représentent les consommateurs, et la couleur rouge représente la file d'attente.
Astuce : Chaque message ne peut être consommé que par un seul consommateur.
Pré-tutoriel
Veuillez lire le tutoriel de démarrage rapide de Golang RabbitMQ d'abord pour comprendre les opérations de base de Golang sur RabbitMQ. Si vous n'êtes pas familier avec RabbitMQ, veuillez d'abord lire les chapitres précédents.
Consommation Concurrente
Golang utilise principalement des goroutines pour mettre en œuvre plusieurs consommateurs, voici la mise en œuvre de plusieurs consommateurs.
Astuce : Pour savoir comment envoyer des messages, veuillez vous référer à Golang RabbitMQ Quick Start Tutorial.
package main
import (
"log"
"time"
"github.com/streadway/amqp"
)
// Gestion des erreurs
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}
func main() {
// Se connecter à RabbitMQ
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Impossible de se connecter à RabbitMQ")
defer conn.Close()
// Créer 5 consommateurs à travers des goroutines
for i := 0; i < 5; i++ {
go func(number int) {
// Créer un canal rabbitmq pour chaque consommateur
ch, err := conn.Channel()
failOnError(err, "Impossible d'ouvrir un canal")
defer ch.Close()
// Déclarer la file d'attente à manipuler
q, err := ch.QueueDeclare(
"hello", // Nom de la file d'attente
false, // Durable
false, // Supprimer lorsqu'inutilisé
false, // Exclusif
false, // Aucune attente
nil, // Arguments
)
failOnError(err, "Impossible de déclarer une file d'attente")
// Créer un consommateur
msgs, err := ch.Consume(
q.Name, // Nom de la file d'attente à manipuler
"", // Identifiant unique du consommateur, si non renseigné, une valeur unique est générée automatiquement
true, // Reconnaître automatiquement les messages (c'est-à-dire confirmer automatiquement que le message a été traité)
false, // Exclusif
false, // Pas local
false, // Aucune attente
nil, // Args
)
failOnError(err, "Impossible d'enregistrer un consommateur")
// Traiter les messages dans une boucle
for d := range msgs {
log.Printf("[Numéro du consommateur=%d] Message reçu : %s", number, d.Body)
// Simuler le traitement métier, dormir pendant 1 seconde
time.Sleep(time.Second)
}
}(i)
}
// Suspendre la goroutine principale pour empêcher le programme de quitter
toujours := make(chan bool)
<-toujours
}
Astuce : Peu importe le type d'échange utilisé par RabbitMQ, les files d'attente peuvent avoir plusieurs consommateurs, et la manière de démarrer plusieurs consommateurs est la même que dans cet exemple.