Golang RabbitMQ Arbeitsmodus, Erzielung von gleichzeitiger Verarbeitung durch mehrere Verbraucher.
Erklärung: P steht für den Produzenten, C1 und C2 stehen für Verbraucher, und die rote Farbe repräsentiert die Warteschlange.
Tipp: Jede Nachricht kann nur von einem Verbraucher verarbeitet werden.
Vorbereitendes Tutorial
Bitte lesen Sie zuerst das Schnellstart-Tutorial zu Golang RabbitMQ, um die grundlegenden Operationen von Golang in Bezug auf RabbitMQ zu verstehen. Wenn Sie mit RabbitMQ nicht vertraut sind, lesen Sie bitte zuerst die vorherigen Kapitel.
Gleichzeitige Verarbeitung
Golang verwendet hauptsächlich Goroutinen, um mehrere Verbraucher zu implementieren. Im Folgenden finden Sie die Implementierung mehrerer Verbraucher.
Tipp: Für das Senden von Nachrichten, siehe Schnellstart-Tutorial zu Golang RabbitMQ.
package main
import (
"log"
"time"
"github.com/streadway/amqp"
)
// Fehlerbehandlung
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}
func main() {
// Verbindung zu RabbitMQ herstellen
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Verbindung zu RabbitMQ fehlgeschlagen")
defer conn.Close()
// Durch Goroutinen 5 Verbraucher erstellen
for i := 0; i < 5; i++ {
go func(number int) {
// Für jeden Verbraucher einen RabbitMQ-Kanal erstellen
ch, err := conn.Channel()
failOnError(err, "Fehler beim Öffnen eines Kanals")
defer ch.Close()
// Die zu bearbeitende Warteschlange deklarieren
q, err := ch.QueueDeclare(
"hello", // Warteschlangenname
false, // Dauerhaft
false, // Löschen, wenn ungenutzt
false, // Exklusiv
false, // Kein Warten
nil, // Argumente
)
failOnError(err, "Fehler beim Deklarieren einer Warteschlange")
// Verbraucher erstellen
msgs, err := ch.Consume(
q.Name, // Zu bearbeitender Warteschlangenname
"", // Eindeutige Verbraucher-ID, wenn nicht ausgefüllt, wird automatisch ein eindeutiger Wert generiert
true, // Nachrichten automatisch bestätigen (d. h. automatisch bestätigen, dass die Nachricht verarbeitet wurde)
false, // Exklusiv
false, // Nicht lokal
false, // Kein Warten
nil, // Argumente
)
failOnError(err, "Fehler beim Registrieren eines Verbrauchers")
// Nachrichten in einer Schleife verarbeiten
for d := range msgs {
log.Printf("[Verbraucher Nummer=%d] Nachricht erhalten: %s", number, d.Body)
// Geschäftsverarbeitung simulieren, 1 Sekunde warten
time.Sleep(time.Second)
}
}(i)
}
// Die Hauptgoroutine anhalten, um zu verhindern, dass das Programm beendet wird
immerda := make(chan bool)
<-immerda
}
Tipp: Unabhängig von der Art des von RabbitMQ verwendeten Austauschs können Warteschlangen mehrere Verbraucher haben, und die Art und Weise, wie mehrere Verbraucher gestartet werden, ist in diesem Beispiel identisch.