Mode queue simple de Golang RabbitMQ

Golang RabbitMQ

Explication: P représente le producteur, C représente le consommateur, et le rouge représente la file d'attente.

Remarque: Si vous n'êtes pas familier avec RabbitMQ, veuillez d'abord lire la section Concepts de base de RabbitMQ.

1. Installer les dépendances

go get github.com/streadway/amqp

Importer le package de dépendance

import (
  "github.com/streadway/amqp"
)

2. Envoyer des messages

Les étapes suivantes démontrent comment le producteur de messages complète la poussée de message.

2.1. Se connecter au serveur RabbitMQ

// Se connecter au serveur RabbitMQ
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
defer conn.Close()

Explication de l'adresse de connexion:

amqp://nom_utilisateur:mot_de_passe@adresse_RabbitMQ:port/

2.2. Créer un canal

La plupart des opérations sont effectuées sur le canal.

ch, err := conn.Channel()
defer ch.Close()

2.3. Déclarer une file d'attente

Représente la file d'attente à partir de laquelle nous devons lire ou écrire.

q, err := ch.QueueDeclare(
  "hello", // Nom de la file d'attente
  false,   // Persistance du message
  false,   // Supprimer la file d'attente lorsqu'elle n'est pas utilisée
  false,   // Exclusif
  false,   // No-wait
  nil,     // Arguments
)

2.4. Pousser les messages

// Contenu du message
body := "Bonjour le monde !"

// Pousser le message
err = ch.Publish(
  "",     // Échange (ignorer ici)
  q.Name, // Paramètre de routage, utiliser le nom de la file d'attente comme paramètre de routage
  false,  // Obligatoire
  false,  // Immédiat
  amqp.Publishing {
    ContentType: "text/plain",
    Body:        []byte(body),  // Contenu du message
  })

2.5. Code complet pour l'envoi des messages

package main

// Importer les packages
import (
	"log"
	"github.com/streadway/amqp"
)

// Gérer les erreurs
func failOnError(err erreur, msg string) {
	if err != nil {
		log.Fatalf("%s: %s", msg, err)
	}
}

func main() {
	// Se connecter à RabbitMQ
	conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
	failOnError(err, "Échec de la connexion à RabbitMQ")
	defer conn.Close()

	// Créer un canal
	ch, err := conn.Channel()
	failOnError(err, "Échec d'ouverture d'un canal")
	defer ch.Close()

	// Déclarer la file d'attente à manipuler
	q, err := ch.QueueDeclare(
		"hello", // Nom
		false,   // Durable
		false,   // Supprimer lorsqu'inutilisé
		false,   // Exclusif
		false,   // No-wait
		nil,     // Arguments
	)
	failOnError(err, "Échec de la déclaration d'une file d'attente")

	// Contenu du message à envoyer
	body := "Bonjour le monde !"

	// Envoyer le message
	err = ch.Publish(
		"",     // Échange
		q.Name, // Clé de routage
		false,  // Obligatoire
		false,  // Immédiat
		amqp.Publishing{
			ContentType: "text/plain",
			Body:        []byte(body),
		})
	failOnError(err, "Échec de la publication d'un message")
	log.Printf(" [x] Envoyé %s", body)
}

Les trois premières étapes de la réception des messages sont les mêmes que l'envoi des messages, correspondant aux sections 2.1, 2.2 et 2.3 respectivement. Le code complet pour la réception des messages est le suivant:

package main

// Importer les packages
import (
	"log"
	"github.com/streadway/amqp"
)

// Gestion des erreurs
func failOnError(err error, msg string) {
	if err != nil {
		log.Fatalf("%s: %s", msg, err)
	}
}

func main() {
	// Se connecter à RabbitMQ
	conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
	failOnError(err, "Impossible de se connecter à RabbitMQ")
	defer conn.Close()

	// Créer un canal
	ch, err := conn.Channel()
	failOnError(err, "Impossible d'ouvrir un canal")
	defer ch.Close()
	
	// Déclarer la file sur laquelle opérer
	q, err := ch.QueueDeclare(
		"hello", // Le nom de la file doit être cohérent avec le nom de la file pour l'envoi des messages
		false,   // durable
		false,   // supprimer lorsqu'inutilisé
		false,   // exclusif
		false,   // no-wait
		nil,     // arguments
	)
	failOnError(err, "Impossible de déclarer une file")

	// Créer un consommateur de messages
	msgs, err := ch.Consume(
		q.Name, // Nom de la file
		"",     // Nom du consommateur, si non renseigné, un identifiant unique sera généré automatiquement
		true,   // Sautomatiquement acquitter les messages, c'est-à-dire informer automatiquement RabbitMQ que le message a été traité avec succès
		false,  // exclusif
		false,  // no-local
		false,  // no-wait
		nil,    // args
	)
	failOnError(err, "Impossible d'enregistrer un consommateur")
	
	// Récupérer les messages de la file en boucle
	for d := range msgs {
		// Afficher le contenu du message
		log.Printf("Message reçu: %s", d.Body)
	}
}