Golang RabbitMQ Publish/Subscribe Pattern (Broadcast Mode, Fanout Mode)
Das Veröffentlichen/Abonnieren-Muster in RabbitMQ bedeutet, dass eine Nachricht, die von einem Produzenten gesendet wird, von mehreren Verbrauchern verarbeitet wird.
Erklärung:
- P stellt den Produzenten dar, C1 und C2 stellen Verbraucher dar, Rot stellt Warteschlangen dar und X stellt den Austausch dar.
- Der Austausch ist dafür verantwortlich, Nachrichten an alle an den Austausch gebundenen Warteschlangen weiterzuleiten.
- Es können mehrere Warteschlangen definiert werden, die jeweils an denselben Austausch gebunden sind.
- Jede Warteschlange kann einen oder mehrere Verbraucher haben.
Hinweis: Wenn Sie mit RabbitMQ nicht vertraut sind, lesen Sie bitte zuerst den Abschnitt RabbitMQ-Grundkonzepte.
1. Abhängigkeitspaket installieren
go get github.com/streadway/amqp
2. Nachrichten senden
Die folgenden Schritte zeigen, wie der Nachrichtenproduzent Nachrichten sendet.
2.1. Mit dem RabbitMQ-Server verbinden
// Mit dem RabbitMQ-Server verbinden
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
defer conn.Close()
Erklärung der Verbindungsadresse:
amqp://Benutzername:Passwort@RabbitMQ-Adresse:Port/
2.2. Einen Kanal erstellen
Die meisten Operationen werden auf dem Kanal ausgeführt.
ch, err := conn.Channel()
defer ch.Close()
2.3. Einen Austausch deklarieren
Nachrichten werden zunächst an den Austausch gesendet. Der Austausch leitet Nachrichten basierend auf seiner Strategie an Warteschlangen weiter.
err = ch.ExchangeDeclare(
"tizi365", // Austauschname
"fanout", // Austauschtyp, hier wird der Fanout-Typ verwendet, d. h. das Veröffentlichen/Abonnieren-Muster
true, // Dauerhaft
false, // Auto-gelöscht
false, // Intern
false, // Kein Warten
nil, // Argumente
)
2.4. Eine Nachricht veröffentlichen
// Nachrichteninhalt
body := "Hallo Tizi365.com!"
// Die Nachricht veröffentlichen
err = ch.Publish(
"tizi365", // Austausch (Name des vorherigen Austauschs)
"", // Routing-Key, für den Austauschtyp "fanout" wird der Routing-Key automatisch ignoriert, daher ist es nicht notwendig, einen anzugeben
false, // Obligatorisch
false, // Sofort
amqp.Publishing {
ContentType: "text/plain", // Nachrichteninhaltstyp, hier handelt es sich um reinen Text
Body: []byte(body), // Nachrichteninhalt
})
2.5. Vollständigen Nachrichtenübertragungscode vervollständigen
package main
import (
"log"
"github.com/streadway/amqp"
)
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}
func main() {
// Mit RabbitMQ verbinden
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Verbindung zu RabbitMQ fehlgeschlagen")
defer conn.Close()
// Einen Kanal erstellen
ch, err := conn.Channel()
failOnError(err, "Fehler beim Öffnen eines Kanals")
defer ch.Close()
// Einen Austausch deklarieren
err = ch.ExchangeDeclare(
"tizi365", // Austauschname
"fanout", // Austauschtyp, Fanout für das Veröffentlichen/Abonnieren-Muster
true, // Dauerhaft
false, // Auto-gelöscht
false, // Intern
false, // Kein Warten
nil, // Argumente
)
failOnError(err, "Fehler beim Deklarieren eines Austauschs")
// Nachrichteninhalt
body := "Hallo Tizi365.com!"
// Nachricht senden
err = ch.Publish(
"tizi365", // Austausch (entspricht der vorherigen Deklaration)
"", // Routing-Key, für Austauschtypen "fanout" wird der Routing-Key automatisch ignoriert
false, // Obligatorisch
false, // Sofort
amqp.Publishing {
ContentType: "text/plain", // Nachrichteninhaltstyp, hier handelt es sich um reinen Text
Body: []byte(body), // Nachrichteninhalt
})
log.Printf("Inhalt gesendet %s", body)
}
3. Nachrichten empfangen
Die ersten drei Schritte zum Empfangen von Nachrichten - Verbindung zum RabbitMQ herstellen, Kanal erstellen und Austausch deklarieren - sind dieselben wie beim Senden von Nachrichten. Siehe die vorherigen Abschnitte 2.1, 2.2 und 2.3.
3.1. Queue deklarieren
Definieren Sie die Queue, auf der gearbeitet werden soll
q, err := ch.QueueDeclare(
"", // Queue-Name, wenn nicht angegeben, wird ein zufälliger generiert
false, // Dauerhaft
false, // Löschen bei Inaktivität
true, // Exklusiv
false, // No-wait
nil, // Argumente
)
3.2. Die Queue an die Exchange binden
Die Queue muss an die Exchange gebunden werden, um Nachrichten zu empfangen
err = ch.QueueBind(
q.Name, // Queue-Name
"", // Routing-Key, für Exchanges vom Typ "fanout" wird der Routing-Key automatisch ignoriert
"tizi365", // Exchange-Name, muss mit dem vom Nachrichtensender definierten Namen übereinstimmen
false,
nil)
Hinweis: In tatsächlichen Anwendungen können wir N Queues definieren, die alle an die gleiche Exchange gebunden sind, um Nachrichten zu empfangen, die von der Exchange weitergeleitet wurden. Hier kommt das Publish/Subscribe-Muster zum Tragen.
3.3. Einen Consumer erstellen
msgs, err := ch.Consume(
q.Name, // Verweis auf den Queue-Namen von oben
"", // Consumer-Name, wird automatisch generiert, wenn nicht angegeben
true, // Automatisches Bestätigen, dass die Nachricht verarbeitet wurde
false, // Exklusiv
false, // No-local
false, // No-wait
nil, // Argumente
)
// Schleife zur Nachrichtenverarbeitung
for d := range msgs {
log.Printf("Nachricht erhalten: %s", d.Body)
}
3.4. Vollständiger Consumer-Code
package main
import (
"log"
"github.com/streadway/amqp"
)
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}
func main() {
// Verbindung zu RabbitMQ herstellen
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Verbindung zu RabbitMQ fehlgeschlagen")
defer conn.Close()
// Einen Channel erstellen, normalerweise einer pro Consumer
ch, err := conn.Channel()
failOnError(err, "Öffnen eines Channels fehlgeschlagen")
defer ch.Close()
// Eine Exchange deklarieren
err = ch.ExchangeDeclare(
"tizi365", // Exchange-Name, sollte mit dem Namen übereinstimmen, der vom Nachrichtensender verwendet wird
"fanout", // Exchange-Typ
true, // Dauerhaft
false, // Auto-gelöscht
false, // Intern
false, // No-wait
nil, // Argumente
)
failOnError(err, "Deklarieren der Exchange fehlgeschlagen")
// Die Queue, auf der gearbeitet werden soll, deklarieren
q, err := ch.QueueDeclare(
"", // Queue-Name, wenn leer, wird ein zufälliger Name generiert
false, // Dauerhaft
false, // Löschen bei Inaktivität
true, // Exklusiv
false, // No-wait
nil, // Argumente
)
failOnError(err, "Deklarieren einer Queue fehlgeschlagen")
// Die Queue an die spezifische Exchange binden
err = ch.QueueBind(
q.Name, // Queue-Name
"", // Routing-Key, für fanout-Exchanges ignoriert
"tizi365", // Exchange-Name, sollte mit dem Namen übereinstimmen, der vom Nachrichtensender definiert wurde
false,
nil)
failOnError(err, "Binden einer Queue fehlgeschlagen")
// Einen Consumer erstellen
msgs, err := ch.Consume(
q.Name, // Verweis auf den oben erstellten Queue-Namen
"", // Consumer-Name, wird automatisch generiert, wenn leer
true, // Auto-Bestätigung
false, // Exklusiv
false, // No-local
false, // No-wait
nil, // Argumente
)
failOnError(err, "Registrieren eines Consumers fehlgeschlagen")
// Nachrichten aus der Queue in einer Schleife verarbeiten
for d := range msgs {
log.Printf("Nachricht erhalten: %s", d.Body)
}
}
3.5. Mehrere Consumer
Siehe Abschnitt zur Arbeitsweise und starten Sie einfach mehrere Consumer mithilfe von Goroutinen.