Einfacher Warteschlangenmodus von Golang RabbitMQ
Erklärung: P stellt den Produzenten dar, C den Verbraucher und Rot die Warteschlange.
Hinweis: Wenn Sie mit RabbitMQ nicht vertraut sind, lesen Sie bitte zuerst den Abschnitt RabbitMQ-Grundkonzepte.
1. Abhängigkeiten installieren
go get github.com/streadway/amqp
Importieren Sie das Abhängigkeitspaket
import (
"github.com/streadway/amqp"
)
2. Nachrichten senden
Die folgenden Schritte zeigen, wie der Nachrichtenproduzent die Nachrichtenübermittlung abschließt.
2.1. Verbindung zum RabbitMQ-Server herstellen
// Verbindung zum RabbitMQ-Server herstellen
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
defer conn.Close()
Erklärung der Verbindungsadresse:
amqp://benutzername:passwort@RabbitMQ-Adresse:port/
2.2. Einen Kanal erstellen
Die meisten Operationen werden auf dem Kanal ausgeführt.
ch, err := conn.Channel()
defer ch.Close()
2.3. Eine Warteschlange deklarieren
Stellt die Warteschlange dar, von der wir lesen oder in die wir schreiben müssen.
q, err := ch.QueueDeclare(
"hallo", // Queue-Name
false, // Nachrichtendauerhaftigkeit
false, // Löschen der Warteschlange bei Nichtverwendung
false, // Exklusiv
false, // Kein Warten
nil, // Argumente
)
2.4. Nachrichten übermitteln
// Nachrichteninhalt
body := "Hallo Welt!"
// Nachricht übermitteln
err = ch.Publish(
"", // Austausch (hier ignorieren)
q.Name, // Routing-Parameter, Queue-Name als Routing-Parameter verwenden
false, // Obligatorisch
false, // Sofort
amqp.Publishing {
ContentType: "text/plain",
Body: []byte(body), // Nachrichteninhalt
})
2.5. Vollständiger Code zum Senden von Nachrichten
package main
// Pakete importieren
import (
"log"
"github.com/streadway/amqp"
)
// Fehler behandeln
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}
func main() {
// Mit RabbitMQ verbinden
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Verbindung zu RabbitMQ fehlgeschlagen")
defer conn.Close()
// Einen Kanal erstellen
ch, err := conn.Channel()
failOnError(err, "Fehler beim Öffnen eines Kanals")
defer ch.Close()
// Die Warteschlange zur Bearbeitung deklarieren
q, err := ch.QueueDeclare(
"hallo", // Name
false, // Dauerhaft
false, // Löschen bei Nichtverwendung
false, // Exklusiv
false, // Kein Warten
nil, // Argumente
)
failOnError(err, "Fehler beim Deklarieren einer Warteschlange")
// Zu sendender Nachrichteninhalt
body := "Hallo Welt!"
// Nachricht senden
err = ch.Publish(
"", // Austausch
q.Name, // Routing-Schlüssel
false, // Obligatorisch
false, // Sofort
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(body),
})
failOnError(err, "Fehler beim Veröffentlichen einer Nachricht")
log.Printf(" [x] Gesendet %s", body)
}
3. Empfangen von Nachrichten
Die ersten drei Schritte zum Empfangen von Nachrichten sind identisch mit dem Senden von Nachrichten und entsprechen den Abschnitten 2.1, 2.2 und 2.3. Der vollständige Code zum Empfangen von Nachrichten lautet wie folgt:
package main
// Pakete importieren
import (
"log"
"github.com/streadway/amqp"
)
// Fehlerbehandlung
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}
func main() {
// Mit RabbitMQ verbinden
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Verbindung zu RabbitMQ fehlgeschlagen")
defer conn.Close()
// Einen Kanal erstellen
ch, err := conn.Channel()
failOnError(err, "Öffnen eines Kanals fehlgeschlagen")
defer ch.Close()
// Die zu bearbeitende Warteschlange deklarieren
q, err := ch.QueueDeclare(
"hello", // Der Warteschlangenname muss mit dem Namen der Warteschlange zum Senden von Nachrichten übereinstimmen
false, // dauerhaft
false, // löschen, wenn unbenutzt
false, // exklusiv
false, // no-wait
nil, // Argumente
)
failOnError(err, "Deklarieren der Warteschlange fehlgeschlagen")
// Einen Nachrichtenkonsumenten erstellen
msgs, err := ch.Consume(
q.Name, // Warteschlangenname
"", // Name des Konsumenten, falls nicht ausgefüllt, wird automatisch eine eindeutige ID generiert
true, // Nachrichten automatisch bestätigen, d.h. automatisch an RabbitMQ mitteilen, dass die Nachricht erfolgreich verarbeitet wurde
false, // exklusiv
false, // no-local
false, // no-wait
nil, // Argumente
)
failOnError(err, "Registrieren eines Konsumenten fehlgeschlagen")
// Nachrichten aus der Warteschlange in einer Schleife abrufen
for d := range msgs {
// Nachrichteninhalt ausgeben
log.Printf("Nachricht erhalten: %s", d.Body)
}
}