Mode de travail de Golang RabbitMQ, réalisation de la consommation concurrente par plusieurs consommateurs.

File d'attente de travail Golang

Explication : P représente le producteur, C1 et C2 représentent les consommateurs, et la couleur rouge représente la file d'attente.

Astuce : Chaque message ne peut être consommé que par un seul consommateur.

Pré-tutoriel

Veuillez lire le tutoriel de démarrage rapide de Golang RabbitMQ d'abord pour comprendre les opérations de base de Golang sur RabbitMQ. Si vous n'êtes pas familier avec RabbitMQ, veuillez d'abord lire les chapitres précédents.

Consommation Concurrente

Golang utilise principalement des goroutines pour mettre en œuvre plusieurs consommateurs, voici la mise en œuvre de plusieurs consommateurs.

Astuce : Pour savoir comment envoyer des messages, veuillez vous référer à Golang RabbitMQ Quick Start Tutorial.

package main

import (
	"log"
	"time"

	"github.com/streadway/amqp"
)

// Gestion des erreurs
func failOnError(err error, msg string) {
	if err != nil {
		log.Fatalf("%s: %s", msg, err)
	}
}

func main() {
	// Se connecter à RabbitMQ
	conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
	failOnError(err, "Impossible de se connecter à RabbitMQ")
	defer conn.Close()

	// Créer 5 consommateurs à travers des goroutines
	for i := 0; i < 5; i++ {
		go func(number int) {
			// Créer un canal rabbitmq pour chaque consommateur
			ch, err := conn.Channel()
			failOnError(err, "Impossible d'ouvrir un canal")
			defer ch.Close()

			// Déclarer la file d'attente à manipuler
			q, err := ch.QueueDeclare(
				"hello", // Nom de la file d'attente
				false,   // Durable
				false,   // Supprimer lorsqu'inutilisé
				false,   // Exclusif
				false,   // Aucune attente
				nil,     // Arguments
			)
			failOnError(err, "Impossible de déclarer une file d'attente")

			// Créer un consommateur
			msgs, err := ch.Consume(
				q.Name, // Nom de la file d'attente à manipuler
				"",     // Identifiant unique du consommateur, si non renseigné, une valeur unique est générée automatiquement
				true,   // Reconnaître automatiquement les messages (c'est-à-dire confirmer automatiquement que le message a été traité)
				false,  // Exclusif
				false,  // Pas local
				false,  // Aucune attente
				nil,    // Args
			)
			failOnError(err, "Impossible d'enregistrer un consommateur")

			// Traiter les messages dans une boucle
			for d := range msgs {
				log.Printf("[Numéro du consommateur=%d] Message reçu : %s", number, d.Body)
				// Simuler le traitement métier, dormir pendant 1 seconde
				time.Sleep(time.Second)
			}
		}(i)
	}

	// Suspendre la goroutine principale pour empêcher le programme de quitter
	toujours := make(chan bool)
	<-toujours
}

Astuce : Peu importe le type d'échange utilisé par RabbitMQ, les files d'attente peuvent avoir plusieurs consommateurs, et la manière de démarrer plusieurs consommateurs est la même que dans cet exemple.