Golang RabbitMQ Publish/Subscribe Pattern (Broadcast Mode, Fanout Mode)

Das Veröffentlichen/Abonnieren-Muster in RabbitMQ bedeutet, dass eine Nachricht, die von einem Produzenten gesendet wird, von mehreren Verbrauchern verarbeitet wird.

Fanout-Modus

Erklärung:

  • P stellt den Produzenten dar, C1 und C2 stellen Verbraucher dar, Rot stellt Warteschlangen dar und X stellt den Austausch dar.
  • Der Austausch ist dafür verantwortlich, Nachrichten an alle an den Austausch gebundenen Warteschlangen weiterzuleiten.
  • Es können mehrere Warteschlangen definiert werden, die jeweils an denselben Austausch gebunden sind.
  • Jede Warteschlange kann einen oder mehrere Verbraucher haben.

Hinweis: Wenn Sie mit RabbitMQ nicht vertraut sind, lesen Sie bitte zuerst den Abschnitt RabbitMQ-Grundkonzepte.

1. Abhängigkeitspaket installieren

go get github.com/streadway/amqp

2. Nachrichten senden

Die folgenden Schritte zeigen, wie der Nachrichtenproduzent Nachrichten sendet.

2.1. Mit dem RabbitMQ-Server verbinden

// Mit dem RabbitMQ-Server verbinden
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
defer conn.Close()

Erklärung der Verbindungsadresse:

amqp://Benutzername:Passwort@RabbitMQ-Adresse:Port/

2.2. Einen Kanal erstellen

Die meisten Operationen werden auf dem Kanal ausgeführt.

ch, err := conn.Channel()
defer ch.Close()

2.3. Einen Austausch deklarieren

Nachrichten werden zunächst an den Austausch gesendet. Der Austausch leitet Nachrichten basierend auf seiner Strategie an Warteschlangen weiter.

err = ch.ExchangeDeclare(
	"tizi365",   // Austauschname
	"fanout", // Austauschtyp, hier wird der Fanout-Typ verwendet, d. h. das Veröffentlichen/Abonnieren-Muster
	true,     // Dauerhaft
	false,    // Auto-gelöscht
	false,    // Intern
	false,    // Kein Warten
	nil,      // Argumente
)

2.4. Eine Nachricht veröffentlichen

// Nachrichteninhalt
body := "Hallo Tizi365.com!"

// Die Nachricht veröffentlichen
err = ch.Publish(
  "tizi365",     // Austausch (Name des vorherigen Austauschs)
  "", // Routing-Key, für den Austauschtyp "fanout" wird der Routing-Key automatisch ignoriert, daher ist es nicht notwendig, einen anzugeben
  false,  // Obligatorisch
  false,  // Sofort
  amqp.Publishing {
    ContentType: "text/plain", // Nachrichteninhaltstyp, hier handelt es sich um reinen Text
    Body:        []byte(body),  // Nachrichteninhalt
  })

2.5. Vollständigen Nachrichtenübertragungscode vervollständigen

package main

import (
	"log"
	"github.com/streadway/amqp"
)

func failOnError(err error, msg string) {
	if err != nil {
		log.Fatalf("%s: %s", msg, err)
	}
}

func main() {
	// Mit RabbitMQ verbinden
	conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
	failOnError(err, "Verbindung zu RabbitMQ fehlgeschlagen")
	defer conn.Close()

	// Einen Kanal erstellen
	ch, err := conn.Channel()
	failOnError(err, "Fehler beim Öffnen eines Kanals")
	defer ch.Close()

	// Einen Austausch deklarieren
	err = ch.ExchangeDeclare(
		"tizi365",   // Austauschname
		"fanout", // Austauschtyp, Fanout für das Veröffentlichen/Abonnieren-Muster
		true,     // Dauerhaft
		false,    // Auto-gelöscht
		false,    // Intern
		false,    // Kein Warten
		nil,      // Argumente
	)
	failOnError(err, "Fehler beim Deklarieren eines Austauschs")

	// Nachrichteninhalt
	body := "Hallo Tizi365.com!"
	// Nachricht senden
	err = ch.Publish(
		"tizi365",     // Austausch (entspricht der vorherigen Deklaration)
		"", // Routing-Key, für Austauschtypen "fanout" wird der Routing-Key automatisch ignoriert
		false,  // Obligatorisch
		false,  // Sofort
		amqp.Publishing {
			ContentType: "text/plain", // Nachrichteninhaltstyp, hier handelt es sich um reinen Text
			Body:        []byte(body),  // Nachrichteninhalt
		})

	log.Printf("Inhalt gesendet %s", body)
}

3. Nachrichten empfangen

Die ersten drei Schritte zum Empfangen von Nachrichten - Verbindung zum RabbitMQ herstellen, Kanal erstellen und Austausch deklarieren - sind dieselben wie beim Senden von Nachrichten. Siehe die vorherigen Abschnitte 2.1, 2.2 und 2.3.

3.1. Queue deklarieren

Definieren Sie die Queue, auf der gearbeitet werden soll

q, err := ch.QueueDeclare(
		"",    // Queue-Name, wenn nicht angegeben, wird ein zufälliger generiert
		false, // Dauerhaft
		false, // Löschen bei Inaktivität
		true,  // Exklusiv
		false, // No-wait
		nil,   // Argumente
	)

3.2. Die Queue an die Exchange binden

Die Queue muss an die Exchange gebunden werden, um Nachrichten zu empfangen

err = ch.QueueBind(
		q.Name, // Queue-Name
		"",     // Routing-Key, für Exchanges vom Typ "fanout" wird der Routing-Key automatisch ignoriert
		"tizi365", // Exchange-Name, muss mit dem vom Nachrichtensender definierten Namen übereinstimmen
		false,
		nil)

Hinweis: In tatsächlichen Anwendungen können wir N Queues definieren, die alle an die gleiche Exchange gebunden sind, um Nachrichten zu empfangen, die von der Exchange weitergeleitet wurden. Hier kommt das Publish/Subscribe-Muster zum Tragen.

3.3. Einen Consumer erstellen

msgs, err := ch.Consume(
		q.Name, // Verweis auf den Queue-Namen von oben
		"",     // Consumer-Name, wird automatisch generiert, wenn nicht angegeben
		true,   // Automatisches Bestätigen, dass die Nachricht verarbeitet wurde
		false,  // Exklusiv
		false,  // No-local
		false,  // No-wait
		nil,    // Argumente
	)
	
// Schleife zur Nachrichtenverarbeitung
for d := range msgs {
	log.Printf("Nachricht erhalten: %s", d.Body)
}

3.4. Vollständiger Consumer-Code

package main

import (
	"log"

	"github.com/streadway/amqp"
)

func failOnError(err error, msg string) {
	if err != nil {
		log.Fatalf("%s: %s", msg, err)
	}
}

func main() {
	// Verbindung zu RabbitMQ herstellen
	conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
	failOnError(err, "Verbindung zu RabbitMQ fehlgeschlagen")
	defer conn.Close()

	// Einen Channel erstellen, normalerweise einer pro Consumer
	ch, err := conn.Channel()
	failOnError(err, "Öffnen eines Channels fehlgeschlagen")
	defer ch.Close()

	// Eine Exchange deklarieren
	err = ch.ExchangeDeclare(
		"tizi365",   // Exchange-Name, sollte mit dem Namen übereinstimmen, der vom Nachrichtensender verwendet wird
		"fanout", // Exchange-Typ
		true,     // Dauerhaft
		false,    // Auto-gelöscht
		false,    // Intern
		false,    // No-wait
		nil,      // Argumente
	)
	failOnError(err, "Deklarieren der Exchange fehlgeschlagen")

	// Die Queue, auf der gearbeitet werden soll, deklarieren
	q, err := ch.QueueDeclare(
		"",    // Queue-Name, wenn leer, wird ein zufälliger Name generiert
		false, // Dauerhaft
		false, // Löschen bei Inaktivität
		true,  // Exklusiv
		false, // No-wait
		nil,   // Argumente
	)
	failOnError(err, "Deklarieren einer Queue fehlgeschlagen")

	// Die Queue an die spezifische Exchange binden
	err = ch.QueueBind(
		q.Name, // Queue-Name
		"",     // Routing-Key, für fanout-Exchanges ignoriert
		"tizi365", // Exchange-Name, sollte mit dem Namen übereinstimmen, der vom Nachrichtensender definiert wurde
		false,
		nil)
	failOnError(err, "Binden einer Queue fehlgeschlagen")

	// Einen Consumer erstellen
	msgs, err := ch.Consume(
		q.Name, // Verweis auf den oben erstellten Queue-Namen
		"",     // Consumer-Name, wird automatisch generiert, wenn leer
		true,   // Auto-Bestätigung
		false,  // Exklusiv
		false,  // No-local
		false,  // No-wait
		nil,    // Argumente
	)
	failOnError(err, "Registrieren eines Consumers fehlgeschlagen")

	// Nachrichten aus der Queue in einer Schleife verarbeiten
	for d := range msgs {
		log.Printf("Nachricht erhalten: %s", d.Body)
	}
}

3.5. Mehrere Consumer

Siehe Abschnitt zur Arbeitsweise und starten Sie einfach mehrere Consumer mithilfe von Goroutinen.