Tryb pracy RabbitMQ w języku Golang, osiąganie współbieżnej konsumpcji przez wielu konsumentów.

Kolejka pracy Golang

Wyjaśnienie: P oznacza producenta, a C1 i C2 oznaczają konsumentów, a czerwony kolor reprezentuje kolejkę.

Wskazówka: Każda wiadomość może być skonsumowana tylko przez jednego konsumenta.

Przed samouczkiem

Proszę najpierw przeczytać Poradnik szybkiego startu RabbitMQ dla języka Golang, aby zrozumieć podstawowe operacje Golang dotyczące RabbitMQ. Jeśli nie jesteś zaznajomiony z RabbitMQ, przeczytaj najpierw poprzednie rozdziały.

Współbieżna konsumpcja

Golang głównie używa gorutyny do implementacji wielu konsumentów, poniżej znajduje się implementacja wielu konsumentów.

Wskazówka: Aby dowiedzieć się, jak wysyłać wiadomości, proszę odnieść się do Poradnik szybkiego startu RabbitMQ dla języka Golang.

package main

import (
	"log"
	"time"

	"github.com/streadway/amqp"
)

// Obsługa błędów
func failOnError(err error, msg string) {
	if err != nil {
		log.Fatalf("%s: %s", msg, err)
	}
}

func main() {
	// Połączenie z RabbitMQ
	conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
	failOnError(err, "Nie udało się połączyć z RabbitMQ")
	defer conn.Close()

	// Utwórz 5 konsumentów za pomocą gorutyn
	for i := 0; i < 5; i++ {
		go func(number int) {
			// Utwórz kanał RabbitMQ dla każdego konsumenta
			ch, err := conn.Channel()
			failOnError(err, "Nie udało się otworzyć kanału")
			defer ch.Close()

			// Zadeklaruj kolejkę do obsługi
			q, err := ch.QueueDeclare(
				"hello", // Nazwa kolejki
				false,   // Trwała
				false,   // Usuń, gdy nieużywana
				false,   // Wyłączna
				false,   // Bez oczekiwania
				nil,     // Argumenty
			)
			failOnError(err, "Nie udało się zadeklarować kolejki")

			// Utwórz konsumenta
			msgs, err := ch.Consume(
				q.Name, // Nazwa kolejki do obsługi
				"",     // Unikalny identyfikator konsumenta, jeśli nie jest wypełniony, zostaje automatycznie wygenerowana unikalna wartość
				true,   // Automatyczne potwierdzanie wiadomości (czyli automatyczne potwierdzenie przetworzenia wiadomości)
				false,  // Wyłączne
				false,  // Bez lokalnej
				false,  // Bez oczekiwania
				nil,    // Argumenty
			)
			failOnError(err, "Nie udało się zarejestrować konsumenta")

			// Przetwarzaj wiadomości w pętli
			for d := range msgs {
				log.Printf("[Numer konsumenta=%d] Otrzymano wiadomość: %s", number, d.Body)
				// Symuluj przetwarzanie biznesowe, uśpij na 1 sekundę
				time.Sleep(time.Second)
			}
		}(i)
	}

	// Zawieś główną gorutynę, aby zapobiec wyjściu z programu
	nigdy := make(chan bool)
	<-nigdy
}

Wskazówka: Bez względu na rodzaj wymiany używanej przez RabbitMQ, kolejki mogą mieć wielu konsumentów, a sposób uruchamiania wielu konsumentów jest taki sam jak w tym przykładzie.