Golang RabbitMQ Wzorzec Publikuj/Subskrybuj (Tryb nadawania rozgłoszeniowego, Tryb fanout)
Wzorzec publikuj/subskrybuj w RabbitMQ oznacza, że wiadomość wysłana przez producenta będzie przetwarzana przez wielu konsumentów.
Wyjaśnienie:
- P oznacza producenta, C1 i C2 oznaczają konsumentów, czerwony kolor reprezentuje kolejki, a X reprezentuje wymianę (exchange).
- Wymiana jest odpowiedzialna za przekazywanie wiadomości do wszystkich kolejek powiązanych z wymianą.
- Można zdefiniować wiele kolejek, związanych każda z tą samą wymianą.
- Każda kolejka może mieć jednego lub więcej konsumentów.
Uwaga: Jeśli nie jesteś zaznajomiony z RabbitMQ, proszę najpierw przeczytaj sekcję Podstawowe pojęcia RabbitMQ.
1. Instalacja pakietu zależności
go get github.com/streadway/amqp
2. Wysyłanie wiadomości
Poniższe kroki demonstrują, jak producent wiadomości wysyła wiadomości.
2.1. Połączenie z serwerem RabbitMQ
// Połączenie z serwerem RabbitMQ
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
defer conn.Close()
Wyjaśnienie adresu połączenia:
amqp://nazwaużytkownika:hasło@AdresRabbitMQ:Port/
2.2. Utworzenie kanału
Większość operacji jest wykonywana na kanale.
ch, err := conn.Channel()
defer ch.Close()
2.3. Deklaracja wymiany
Wiadomości są najpierw wysyłane do wymiany. Wymiana przekazuje wiadomości do kolejek na podstawie swojej strategii.
err = ch.ExchangeDeclare(
"tizi365", // Nazwa wymiany
"fanout", // Typ wymiany, tutaj używamy typu fanout, czyli wzorzec publikuj/subskrybuj
true, // Trwała
false, // Automatycznie usuwana
false, // Wewnętrzna
false, // No-wait
nil, // Argumenty
)
2.4. Publikowanie wiadomości
// Treść wiadomości
body := "Witaj Tizi365.com!"
// Publikowanie wiadomości
err = ch.Publish(
"tizi365", // Wymiana (nazwa wymiany z poprzedniej deklaracji)
"", // Klucz routingu, dla wymiany typu fanout klucz routingu jest automatycznie ignorowany, więc nie jest konieczne jego podanie
false, // Wymagane
false, // Natychmiastowe
amqp.Publishing {
ContentType: "text/plain", // Typ treści wiadomości, tutaj jest to zwykły tekst
Body: []byte(body), // Treść wiadomości
})
2.5. Zakończenie kodu wysyłania wiadomości
package main
import (
"log"
"github.com/streadway/amqp"
)
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}
func main() {
// Połączenie z RabbitMQ
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Błąd podczas łączenia się z RabbitMQ")
defer conn.Close()
// Utworzenie kanału
ch, err := conn.Channel()
failOnError(err, "Błąd podczas otwierania kanału")
defer ch.Close()
// Deklaracja wymiany
err = ch.ExchangeDeclare(
"tizi365", // Nazwa wymiany
"fanout", // Typ wymiany, fanout dla trybu publikuj/subskrybuj
true, // Trwała
false, // Automatycznie usuwana
false, // Wewnętrzna
false, // No-wait
nil, // Argumenty
)
failOnError(err, "Błąd podczas deklaracji wymiany")
// Treść wiadomości
body := "Witaj Tizi365.com!"
// Wysłanie wiadomości
err = ch.Publish(
"tizi365", // Wymiana (zgodna z deklaracją powyżej)
"", // Klucz routingu, dla wymian typu fanout klucz routingu jest automatycznie ignorowany
false, // Wymagane
false, // Natychmiastowe
amqp.Publishing {
ContentType: "text/plain", // Typ treści wiadomości, tutaj zwykły tekst
Body: []byte(body), // Treść wiadomości
})
log.Printf("Wysłano treść %s", body)
}
3. Odbieranie wiadomości
Pierwsze trzy kroki odbierania wiadomości – połączenie z RabbitMQ, utworzenie kanału i deklaracja wymiany – są takie same jak przy wysyłaniu wiadomości. Należy się odnieść do powyższych sekcji 2.1, 2.2 i 2.3.
3.1. Deklaracja kolejki
Zadeklaruj kolejkę, na której będą wykonywane operacje
q, err := ch.QueueDeclare(
"", // Nazwa kolejki, jeśli nie jest określona, zostanie wygenerowana losowa nazwa
false, // Trwała
false, // Usuń, gdy nieużywana
true, // Wyłączna
false, // Bez oczekiwania
nil, // Argumenty
)
3.2. Powiąż kolejkę z wymianą
Aby otrzymywać wiadomości, kolejka musi zostać powiązana z wymianą
err = ch.QueueBind(
q.Name, // Nazwa kolejki
"", // Klucz routingu, dla wymian typu fanout, klucz routingu jest automatycznie ignorowany
"tizi365", // Nazwa wymiany, musi pasować do tej zdefiniowanej przez nadawcę wiadomości
false,
nil)
Uwaga: W rzeczywistych aplikacjach możemy zdefiniować N kolejek, każda z nich powiązana z tą samą wymianą, aby otrzymywać wiadomości przekierowane przez wymianę. Tutaj odzwierciedla się wzorzec publikuj/subskrybuj.
3.3. Utwórz konsumenta
msgs, err := ch.Consume(
q.Name, // Odwołanie do nazwy kolejki z powyższego
"", // Nazwa konsumenta, jeśli nie jest określona, zostanie wygenerowana automatycznie
true, // Automatyczne potwierdzenie, że wiadomość została przetworzona
false, // Wyłączne
false, // Brak lokalnego
false, // Bez oczekiwania
nil, // Argumenty
)
// Pętla obsługi wiadomości
for d := range msgs {
log.Printf("Otrzymano wiadomość=%s", d.Body)
}
3.4. Kompletny kod konsumenta
package main
import (
"log"
"github.com/streadway/amqp"
)
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}
func main() {
// Połączenie z RabbitMQ
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Nie udało się połączyć z RabbitMQ")
defer conn.Close()
// Utwórz kanał, zazwyczaj jeden na konsumenta
ch, err := conn.Channel()
failOnError(err, "Nie udało się otworzyć kanału")
defer ch.Close()
// Zadeklaruj wymianę
err = ch.ExchangeDeclare(
"tizi365", // Nazwa wymiany, powinna pasować do tej używanej przez nadawcę wiadomości
"fanout", // Typ wymiany
true, // Trwała
false, // Automatyczne usuwanie
false, // Wewnętrzne
false, // Bez oczekiwania
nil, // Argumenty
)
failOnError(err, "Nie udało się zadeklarować wymiany")
// Zadeklaruj kolejkę, na której będą wykonywane operacje
q, err := ch.QueueDeclare(
"", // Nazwa kolejki, jeśli jest pusta, zostanie wygenerowana losowa nazwa
false, // Trwała
false, // Usuń, gdy nieużywana
true, // Wyłączna
false, // Bez oczekiwania
nil, // Argumenty
)
failOnError(err, "Nie udało się zadeklarować kolejki")
// Powiąż kolejkę z określoną wymianą
err = ch.QueueBind(
q.Name, // Nazwa kolejki
"", // Klucz routingu, ignorowany dla wymian typu fanout
"tizi365", // Nazwa wymiany, powinna pasować do tej zdefiniowanej przez nadawcę wiadomości
false,
nil)
failOnError(err, "Nie udało się powiązać kolejki z wymianą")
// Utwórz konsumenta
msgs, err := ch.Consume(
q.Name, // Odwołanie do nazwy wcześniejszej kolejki
"", // Nazwa konsumenta, zostanie automatycznie wygenerowana, jeśli jest pusta
true, // Auto-potwierdzenie
false, // Wyłączne
false, // Brak lokalnego
false, // Bez oczekiwania
nil, // Argumenty
)
failOnError(err, "Nie udało się zarejestrować konsumenta")
// Konsumuj wiadomości z kolejki w pętli
for d := range msgs {
log.Printf("Otrzymano wiadomość: %s", d.Body)
}
}
3.5. Wielu konsumentów
Odniesienie do sekcji Sposób pracy i po prostu uruchom kilku konsumentów przy użyciu goroutine.