Einfacher Warteschlangenmodus von Golang RabbitMQ

Golang RabbitMQ

Erklärung: P stellt den Produzenten dar, C den Verbraucher und Rot die Warteschlange.

Hinweis: Wenn Sie mit RabbitMQ nicht vertraut sind, lesen Sie bitte zuerst den Abschnitt RabbitMQ-Grundkonzepte.

1. Abhängigkeiten installieren

go get github.com/streadway/amqp

Importieren Sie das Abhängigkeitspaket

import (
  "github.com/streadway/amqp"
)

2. Nachrichten senden

Die folgenden Schritte zeigen, wie der Nachrichtenproduzent die Nachrichtenübermittlung abschließt.

2.1. Verbindung zum RabbitMQ-Server herstellen

// Verbindung zum RabbitMQ-Server herstellen
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
defer conn.Close()

Erklärung der Verbindungsadresse:

amqp://benutzername:passwort@RabbitMQ-Adresse:port/

2.2. Einen Kanal erstellen

Die meisten Operationen werden auf dem Kanal ausgeführt.

ch, err := conn.Channel()
defer ch.Close()

2.3. Eine Warteschlange deklarieren

Stellt die Warteschlange dar, von der wir lesen oder in die wir schreiben müssen.

q, err := ch.QueueDeclare(
  "hallo", // Queue-Name
  false,   // Nachrichtendauerhaftigkeit
  false,   // Löschen der Warteschlange bei Nichtverwendung
  false,   // Exklusiv
  false,   // Kein Warten
  nil,     // Argumente
)

2.4. Nachrichten übermitteln

// Nachrichteninhalt
body := "Hallo Welt!"

// Nachricht übermitteln
err = ch.Publish(
  "",     // Austausch (hier ignorieren)
  q.Name, // Routing-Parameter,  Queue-Name als Routing-Parameter verwenden
  false,  // Obligatorisch
  false,  // Sofort
  amqp.Publishing {
    ContentType: "text/plain",
    Body:        []byte(body),  // Nachrichteninhalt
  })

2.5. Vollständiger Code zum Senden von Nachrichten

package main

// Pakete importieren
import (
	"log"
	"github.com/streadway/amqp"
)

// Fehler behandeln
func failOnError(err error, msg string) {
	if err != nil {
		log.Fatalf("%s: %s", msg, err)
	}
}

func main() {
	// Mit RabbitMQ verbinden
	conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
	failOnError(err, "Verbindung zu RabbitMQ fehlgeschlagen")
	defer conn.Close()

	// Einen Kanal erstellen
	ch, err := conn.Channel()
	failOnError(err, "Fehler beim Öffnen eines Kanals")
	defer ch.Close()

	// Die Warteschlange zur Bearbeitung deklarieren
	q, err := ch.QueueDeclare(
		"hallo", // Name
		false,   // Dauerhaft
		false,   // Löschen bei Nichtverwendung
		false,   // Exklusiv
		false,   // Kein Warten
		nil,     // Argumente
	)
	failOnError(err, "Fehler beim Deklarieren einer Warteschlange")

	// Zu sendender Nachrichteninhalt
	body := "Hallo Welt!"

	// Nachricht senden
	err = ch.Publish(
		"",     // Austausch
		q.Name, // Routing-Schlüssel
		false,  // Obligatorisch
		false,  // Sofort
		amqp.Publishing{
			ContentType: "text/plain",
			Body:        []byte(body),
		})
	failOnError(err, "Fehler beim Veröffentlichen einer Nachricht")
	log.Printf(" [x] Gesendet %s", body)
}

3. Empfangen von Nachrichten

Die ersten drei Schritte zum Empfangen von Nachrichten sind identisch mit dem Senden von Nachrichten und entsprechen den Abschnitten 2.1, 2.2 und 2.3. Der vollständige Code zum Empfangen von Nachrichten lautet wie folgt:

package main

// Pakete importieren
import (
	"log"
	"github.com/streadway/amqp"
)

// Fehlerbehandlung
func failOnError(err error, msg string) {
	if err != nil {
		log.Fatalf("%s: %s", msg, err)
	}
}

func main() {
	// Mit RabbitMQ verbinden
	conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
	failOnError(err, "Verbindung zu RabbitMQ fehlgeschlagen")
	defer conn.Close()

	// Einen Kanal erstellen
	ch, err := conn.Channel()
	failOnError(err, "Öffnen eines Kanals fehlgeschlagen")
	defer ch.Close()
	
	// Die zu bearbeitende Warteschlange deklarieren
	q, err := ch.QueueDeclare(
		"hello", // Der Warteschlangenname muss mit dem Namen der Warteschlange zum Senden von Nachrichten übereinstimmen
		false,   // dauerhaft
		false,   // löschen, wenn unbenutzt
		false,   // exklusiv
		false,   // no-wait
		nil,     // Argumente
	)
	failOnError(err, "Deklarieren der Warteschlange fehlgeschlagen")

	// Einen Nachrichtenkonsumenten erstellen
	msgs, err := ch.Consume(
		q.Name, // Warteschlangenname
		"",     // Name des Konsumenten, falls nicht ausgefüllt, wird automatisch eine eindeutige ID generiert
		true,   // Nachrichten automatisch bestätigen, d.h. automatisch an RabbitMQ mitteilen, dass die Nachricht erfolgreich verarbeitet wurde
		false,  // exklusiv
		false,  // no-local
		false,  // no-wait
		nil,    // Argumente
	)
	failOnError(err, "Registrieren eines Konsumenten fehlgeschlagen")
	
	// Nachrichten aus der Warteschlange in einer Schleife abrufen
	for d := range msgs {
		// Nachrichteninhalt ausgeben
		log.Printf("Nachricht erhalten: %s", d.Body)
	}
}