Golang RabbitMQ Wzorzec Publikuj/Subskrybuj (Tryb nadawania rozgłoszeniowego, Tryb fanout)

Wzorzec publikuj/subskrybuj w RabbitMQ oznacza, że wiadomość wysłana przez producenta będzie przetwarzana przez wielu konsumentów.

Tryb fanout

Wyjaśnienie:

  • P oznacza producenta, C1 i C2 oznaczają konsumentów, czerwony kolor reprezentuje kolejki, a X reprezentuje wymianę (exchange).
  • Wymiana jest odpowiedzialna za przekazywanie wiadomości do wszystkich kolejek powiązanych z wymianą.
  • Można zdefiniować wiele kolejek, związanych każda z tą samą wymianą.
  • Każda kolejka może mieć jednego lub więcej konsumentów.

Uwaga: Jeśli nie jesteś zaznajomiony z RabbitMQ, proszę najpierw przeczytaj sekcję Podstawowe pojęcia RabbitMQ.

1. Instalacja pakietu zależności

go get github.com/streadway/amqp

2. Wysyłanie wiadomości

Poniższe kroki demonstrują, jak producent wiadomości wysyła wiadomości.

2.1. Połączenie z serwerem RabbitMQ

// Połączenie z serwerem RabbitMQ
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
defer conn.Close()

Wyjaśnienie adresu połączenia:

amqp://nazwaużytkownika:hasło@AdresRabbitMQ:Port/

2.2. Utworzenie kanału

Większość operacji jest wykonywana na kanale.

ch, err := conn.Channel()
defer ch.Close()

2.3. Deklaracja wymiany

Wiadomości są najpierw wysyłane do wymiany. Wymiana przekazuje wiadomości do kolejek na podstawie swojej strategii.

err = ch.ExchangeDeclare(
	"tizi365",   // Nazwa wymiany
	"fanout", // Typ wymiany, tutaj używamy typu fanout, czyli wzorzec publikuj/subskrybuj
	true,     // Trwała
	false,    // Automatycznie usuwana
	false,    // Wewnętrzna
	false,    // No-wait
	nil,      // Argumenty
)

2.4. Publikowanie wiadomości

// Treść wiadomości
body := "Witaj Tizi365.com!"

// Publikowanie wiadomości
err = ch.Publish(
  "tizi365",     // Wymiana (nazwa wymiany z poprzedniej deklaracji)
  "", // Klucz routingu, dla wymiany typu fanout klucz routingu jest automatycznie ignorowany, więc nie jest konieczne jego podanie
  false,  // Wymagane
  false,  // Natychmiastowe
  amqp.Publishing {
    ContentType: "text/plain", // Typ treści wiadomości, tutaj jest to zwykły tekst
    Body:        []byte(body),  // Treść wiadomości
  })

2.5. Zakończenie kodu wysyłania wiadomości

package main

import (
	"log"
	"github.com/streadway/amqp"
)

func failOnError(err error, msg string) {
	if err != nil {
		log.Fatalf("%s: %s", msg, err)
	}
}

func main() {
	// Połączenie z RabbitMQ
	conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
	failOnError(err, "Błąd podczas łączenia się z RabbitMQ")
	defer conn.Close()

	// Utworzenie kanału
	ch, err := conn.Channel()
	failOnError(err, "Błąd podczas otwierania kanału")
	defer ch.Close()

	// Deklaracja wymiany
	err = ch.ExchangeDeclare(
		"tizi365",   // Nazwa wymiany
		"fanout", // Typ wymiany, fanout dla trybu publikuj/subskrybuj
		true,     // Trwała
		false,    // Automatycznie usuwana
		false,    // Wewnętrzna
		false,    // No-wait
		nil,      // Argumenty
	)
	failOnError(err, "Błąd podczas deklaracji wymiany")

	// Treść wiadomości
	body := "Witaj Tizi365.com!"
	// Wysłanie wiadomości
	err = ch.Publish(
		"tizi365",     // Wymiana (zgodna z deklaracją powyżej)
		"", // Klucz routingu, dla wymian typu fanout klucz routingu jest automatycznie ignorowany
		false,  // Wymagane
		false,  // Natychmiastowe
		amqp.Publishing {
			ContentType: "text/plain", // Typ treści wiadomości, tutaj zwykły tekst
			Body:        []byte(body),  // Treść wiadomości
		})

	log.Printf("Wysłano treść %s", body)
}

3. Odbieranie wiadomości

Pierwsze trzy kroki odbierania wiadomości – połączenie z RabbitMQ, utworzenie kanału i deklaracja wymiany – są takie same jak przy wysyłaniu wiadomości. Należy się odnieść do powyższych sekcji 2.1, 2.2 i 2.3.

3.1. Deklaracja kolejki

Zadeklaruj kolejkę, na której będą wykonywane operacje

q, err := ch.QueueDeclare(
		"",    // Nazwa kolejki, jeśli nie jest określona, zostanie wygenerowana losowa nazwa
		false, // Trwała
		false, // Usuń, gdy nieużywana
		true,  // Wyłączna
		false, // Bez oczekiwania
		nil,   // Argumenty
	)

3.2. Powiąż kolejkę z wymianą

Aby otrzymywać wiadomości, kolejka musi zostać powiązana z wymianą

err = ch.QueueBind(
		q.Name, // Nazwa kolejki
		"",     // Klucz routingu, dla wymian typu fanout, klucz routingu jest automatycznie ignorowany
		"tizi365", // Nazwa wymiany, musi pasować do tej zdefiniowanej przez nadawcę wiadomości
		false,
		nil)

Uwaga: W rzeczywistych aplikacjach możemy zdefiniować N kolejek, każda z nich powiązana z tą samą wymianą, aby otrzymywać wiadomości przekierowane przez wymianę. Tutaj odzwierciedla się wzorzec publikuj/subskrybuj.

3.3. Utwórz konsumenta

msgs, err := ch.Consume(
		q.Name, // Odwołanie do nazwy kolejki z powyższego
		"",     // Nazwa konsumenta, jeśli nie jest określona, zostanie wygenerowana automatycznie
		true,   // Automatyczne potwierdzenie, że wiadomość została przetworzona
		false,  // Wyłączne
		false,  // Brak lokalnego
		false,  // Bez oczekiwania
		nil,    // Argumenty
	)
	
// Pętla obsługi wiadomości
for d := range msgs {
	log.Printf("Otrzymano wiadomość=%s", d.Body)
}

3.4. Kompletny kod konsumenta

package main

import (
	"log"

	"github.com/streadway/amqp"
)

func failOnError(err error, msg string) {
	if err != nil {
		log.Fatalf("%s: %s", msg, err)
	}
}

func main() {
	// Połączenie z RabbitMQ
	conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
	failOnError(err, "Nie udało się połączyć z RabbitMQ")
	defer conn.Close()

	// Utwórz kanał, zazwyczaj jeden na konsumenta
	ch, err := conn.Channel()
	failOnError(err, "Nie udało się otworzyć kanału")
	defer ch.Close()

	// Zadeklaruj wymianę
	err = ch.ExchangeDeclare(
		"tizi365",   // Nazwa wymiany, powinna pasować do tej używanej przez nadawcę wiadomości
		"fanout", // Typ wymiany
		true,     // Trwała
		false,    // Automatyczne usuwanie
		false,    // Wewnętrzne
		false,    // Bez oczekiwania
		nil,      // Argumenty
	)
	failOnError(err, "Nie udało się zadeklarować wymiany")

	// Zadeklaruj kolejkę, na której będą wykonywane operacje
	q, err := ch.QueueDeclare(
		"",    // Nazwa kolejki, jeśli jest pusta, zostanie wygenerowana losowa nazwa
		false, // Trwała
		false, // Usuń, gdy nieużywana
		true,  // Wyłączna
		false, // Bez oczekiwania
		nil,   // Argumenty
	)
	failOnError(err, "Nie udało się zadeklarować kolejki")

	// Powiąż kolejkę z określoną wymianą
	err = ch.QueueBind(
		q.Name, // Nazwa kolejki
		"",     // Klucz routingu, ignorowany dla wymian typu fanout
		"tizi365", // Nazwa wymiany, powinna pasować do tej zdefiniowanej przez nadawcę wiadomości
		false,
		nil)
	failOnError(err, "Nie udało się powiązać kolejki z wymianą")

	// Utwórz konsumenta
	msgs, err := ch.Consume(
		q.Name, // Odwołanie do nazwy wcześniejszej kolejki
		"",     // Nazwa konsumenta, zostanie automatycznie wygenerowana, jeśli jest pusta
		true,   // Auto-potwierdzenie
		false,  // Wyłączne
		false,  // Brak lokalnego
		false,  // Bez oczekiwania
		nil,    // Argumenty
	)
	failOnError(err, "Nie udało się zarejestrować konsumenta")

	// Konsumuj wiadomości z kolejki w pętli
	for d := range msgs {
		log.Printf("Otrzymano wiadomość: %s", d.Body)
	}
}

3.5. Wielu konsumentów

Odniesienie do sekcji Sposób pracy i po prostu uruchom kilku konsumentów przy użyciu goroutine.