Tryb pracy RabbitMQ w języku Golang, osiąganie współbieżnej konsumpcji przez wielu konsumentów.
Wyjaśnienie: P oznacza producenta, a C1 i C2 oznaczają konsumentów, a czerwony kolor reprezentuje kolejkę.
Wskazówka: Każda wiadomość może być skonsumowana tylko przez jednego konsumenta.
Przed samouczkiem
Proszę najpierw przeczytać Poradnik szybkiego startu RabbitMQ dla języka Golang, aby zrozumieć podstawowe operacje Golang dotyczące RabbitMQ. Jeśli nie jesteś zaznajomiony z RabbitMQ, przeczytaj najpierw poprzednie rozdziały.
Współbieżna konsumpcja
Golang głównie używa gorutyny do implementacji wielu konsumentów, poniżej znajduje się implementacja wielu konsumentów.
Wskazówka: Aby dowiedzieć się, jak wysyłać wiadomości, proszę odnieść się do Poradnik szybkiego startu RabbitMQ dla języka Golang.
package main
import (
"log"
"time"
"github.com/streadway/amqp"
)
// Obsługa błędów
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}
func main() {
// Połączenie z RabbitMQ
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Nie udało się połączyć z RabbitMQ")
defer conn.Close()
// Utwórz 5 konsumentów za pomocą gorutyn
for i := 0; i < 5; i++ {
go func(number int) {
// Utwórz kanał RabbitMQ dla każdego konsumenta
ch, err := conn.Channel()
failOnError(err, "Nie udało się otworzyć kanału")
defer ch.Close()
// Zadeklaruj kolejkę do obsługi
q, err := ch.QueueDeclare(
"hello", // Nazwa kolejki
false, // Trwała
false, // Usuń, gdy nieużywana
false, // Wyłączna
false, // Bez oczekiwania
nil, // Argumenty
)
failOnError(err, "Nie udało się zadeklarować kolejki")
// Utwórz konsumenta
msgs, err := ch.Consume(
q.Name, // Nazwa kolejki do obsługi
"", // Unikalny identyfikator konsumenta, jeśli nie jest wypełniony, zostaje automatycznie wygenerowana unikalna wartość
true, // Automatyczne potwierdzanie wiadomości (czyli automatyczne potwierdzenie przetworzenia wiadomości)
false, // Wyłączne
false, // Bez lokalnej
false, // Bez oczekiwania
nil, // Argumenty
)
failOnError(err, "Nie udało się zarejestrować konsumenta")
// Przetwarzaj wiadomości w pętli
for d := range msgs {
log.Printf("[Numer konsumenta=%d] Otrzymano wiadomość: %s", number, d.Body)
// Symuluj przetwarzanie biznesowe, uśpij na 1 sekundę
time.Sleep(time.Second)
}
}(i)
}
// Zawieś główną gorutynę, aby zapobiec wyjściu z programu
nigdy := make(chan bool)
<-nigdy
}
Wskazówka: Bez względu na rodzaj wymiany używanej przez RabbitMQ, kolejki mogą mieć wielu konsumentów, a sposób uruchamiania wielu konsumentów jest taki sam jak w tym przykładzie.