Golang RabbitMQ Arbeitsmodus, Erzielung von gleichzeitiger Verarbeitung durch mehrere Verbraucher.

Golang Arbeitswarteschlange

Erklärung: P steht für den Produzenten, C1 und C2 stehen für Verbraucher, und die rote Farbe repräsentiert die Warteschlange.

Tipp: Jede Nachricht kann nur von einem Verbraucher verarbeitet werden.

Vorbereitendes Tutorial

Bitte lesen Sie zuerst das Schnellstart-Tutorial zu Golang RabbitMQ, um die grundlegenden Operationen von Golang in Bezug auf RabbitMQ zu verstehen. Wenn Sie mit RabbitMQ nicht vertraut sind, lesen Sie bitte zuerst die vorherigen Kapitel.

Gleichzeitige Verarbeitung

Golang verwendet hauptsächlich Goroutinen, um mehrere Verbraucher zu implementieren. Im Folgenden finden Sie die Implementierung mehrerer Verbraucher.

Tipp: Für das Senden von Nachrichten, siehe Schnellstart-Tutorial zu Golang RabbitMQ.

package main

import (
	"log"
	"time"

	"github.com/streadway/amqp"
)

// Fehlerbehandlung
func failOnError(err error, msg string) {
	if err != nil {
		log.Fatalf("%s: %s", msg, err)
	}
}

func main() {
	// Verbindung zu RabbitMQ herstellen
	conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
	failOnError(err, "Verbindung zu RabbitMQ fehlgeschlagen")
	defer conn.Close()

	// Durch Goroutinen 5 Verbraucher erstellen
	for i := 0; i < 5; i++ {
		go func(number int) {
			// Für jeden Verbraucher einen RabbitMQ-Kanal erstellen
			ch, err := conn.Channel()
			failOnError(err, "Fehler beim Öffnen eines Kanals")
			defer ch.Close()

			// Die zu bearbeitende Warteschlange deklarieren
			q, err := ch.QueueDeclare(
				"hello", // Warteschlangenname
				false,   // Dauerhaft
				false,   // Löschen, wenn ungenutzt
				false,   // Exklusiv
				false,   // Kein Warten
				nil,     // Argumente
			)
			failOnError(err, "Fehler beim Deklarieren einer Warteschlange")

			// Verbraucher erstellen
			msgs, err := ch.Consume(
				q.Name, // Zu bearbeitender Warteschlangenname
				"",     // Eindeutige Verbraucher-ID, wenn nicht ausgefüllt, wird automatisch ein eindeutiger Wert generiert
				true,   // Nachrichten automatisch bestätigen (d. h. automatisch bestätigen, dass die Nachricht verarbeitet wurde)
				false,  // Exklusiv
				false,  // Nicht lokal
				false,  // Kein Warten
				nil,    // Argumente
			)
			failOnError(err, "Fehler beim Registrieren eines Verbrauchers")

			// Nachrichten in einer Schleife verarbeiten
			for d := range msgs {
				log.Printf("[Verbraucher Nummer=%d] Nachricht erhalten: %s", number, d.Body)
				// Geschäftsverarbeitung simulieren, 1 Sekunde warten
				time.Sleep(time.Second)
			}
		}(i)
	}

	// Die Hauptgoroutine anhalten, um zu verhindern, dass das Programm beendet wird
	immerda := make(chan bool)
	<-immerda
}

Tipp: Unabhängig von der Art des von RabbitMQ verwendeten Austauschs können Warteschlangen mehrere Verbraucher haben, und die Art und Weise, wie mehrere Verbraucher gestartet werden, ist in diesem Beispiel identisch.