Prosty tryb kolejki Golang RabbitMQ

Obrazek: Golang RabbitMQ

Wyjaśnienie: P oznacza producenta, C oznacza konsumenta, a czerwony kolor oznacza kolejkę.

Uwaga: Jeśli nie jesteś zaznajomiony z RabbitMQ, proszę przeczytaj najpierw sekcję Podstawowe pojęcia RabbitMQ.

1. Instalacja zależności

go get github.com/streadway/amqp

Zaimportuj pakiet zależności

import (
  "github.com/streadway/amqp"
)

2. Wysyłanie wiadomości

Poniższe kroki pokazują, jak producent wiadomości wykonuje przekazywanie wiadomości.

2.1. Połączenie z serwerem RabbitMQ

// Połączenie z serwerem RabbitMQ
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
defer conn.Close()

Wyjaśnienie adresu połączenia:

amqp://nazwaużytkownika:hasło@adresRabbitMQ:port/

2.2. Utworzenie kanału

Większość operacji jest wykonywana na kanale.

ch, err := conn.Channel()
defer ch.Close()

2.3. Deklaracja kolejki

Oznacza kolejkę, z której będziemy czytać lub do której będziemy pisać.

q, err := ch.QueueDeclare(
  "hello", // Nazwa kolejki
  false,   // Trwała wiadomość
  false,   // Usunięcie kolejki, gdy nie jest używana
  false,   // Wyłączna
  false,   // Bez oczekiwania
  nil,     // Argumenty
)

2.4. Przesyłanie wiadomości

// Zawartość wiadomości
body := "Witaj Świecie!"

// Przekaż wiadomość
err = ch.Publish(
  "",     // Wymiana (pomijamy tutaj)
  q.Name, // Parametr routingu, użyj nazwy kolejki jako parametru routingu
  false,  // Wymagane
  false,  // Natychmiastowe
  amqp.Publishing {
    ContentType: "text/plain",
    Body:        []byte(body),  // Zawartość wiadomości
  })

2.5. Pełny kod wysyłania wiadomości

package main

// Importuj pakiety
import (
	"log"
	"github.com/streadway/amqp"
)

// Obsłuż błędy
func failOnError(err error, msg string) {
	if err != nil {
		log.Fatalf("%s: %s", msg, err)
	}
}

func main() {
	// Połącz z RabbitMQ
	conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
	failOnError(err, "Nie udało się połączyć z RabbitMQ")
	defer conn.Close()

	// Utwórz kanał
	ch, err := conn.Channel()
	failOnError(err, "Nie udało się otworzyć kanału")
	defer ch.Close()

	// Zadeklaruj kolejkę do operacji
	q, err := ch.QueueDeclare(
		"hello", // Nazwa
		false,   // Trwała
		false,   // Usuń, gdy nie jest używana
		false,   // Wyłączna
		false,   // Bez oczekiwania
		nil,     // Argumenty
	)
	failOnError(err, "Nie udało się zadeklarować kolejki")

	// Zawartość wiadomości do wysłania
	body := "Witaj Świecie!"

	// Wyślij wiadomość
	err = ch.Publish(
		"",     // Wymiana
		q.Name, // Klucz routingu
		false,  // Wymagane
		false,  // Natychmiastowe
		amqp.Publishing{
			ContentType: "text/plain",
			Body:        []byte(body),
		})
	failOnError(err, "Nie udało się opublikować wiadomości")
	log.Printf(" [x] Wysłano %s", body)
}

3. Odbieranie wiadomości

Pierwsze trzy kroki odbierania wiadomości są takie same jak przy wysyłaniu wiadomości, odpowiadające odpowiednio sekcjom 2.1, 2.2 i 2.3. Kompletny kod do odbierania wiadomości wygląda następująco:

package main

// Importowanie pakietów
import (
	"log"
	"github.com/streadway/amqp"
)

// Obsługa błędów
func failOnError(err error, msg string) {
	if err != nil {
		log.Fatalf("%s: %s", msg, err)
	}
}

func main() {
	// Połączenie z RabbitMQ
	conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
	failOnError(err, "Błąd podczas łączenia z RabbitMQ")
	defer conn.Close()

	// Utworzenie kanału
	ch, err := conn.Channel()
	failOnError(err, "Błąd podczas otwierania kanału")
	defer ch.Close()
	
	// Deklaracja kolejki, na której będą operacje
	q, err := ch.QueueDeclare(
		"hello", // Nazwa kolejki musi być zgodna z nazwą kolejki do wysyłania wiadomości
		false,   // trwała
		false,   // usuń, gdy nieużywana
		false,   // wyłączna
		false,   // no-wait
		nil,     // argumenty
	)
	failOnError(err, "Błąd deklaracji kolejki")

	// Utworzenie konsumenta wiadomości
	msgs, err := ch.Consume(
		q.Name, // Nazwa kolejki
		"",     // Nazwa konsumenta, jeśli nie zostanie wypełniona, to zostanie automatycznie wygenerowany unikalny identyfikator
		true,   // Automatyczne potwierdzanie wiadomości, czyli automatycznie informuje RabbitMQ, że wiadomość została pomyślnie przetworzona
		false,  // wyłączna
		false,  // brak lokalne
		false,  // no-wait
		nil,    // args
	)
	failOnError(err, "Błąd rejestracji konsumenta")
	
	// Pobieranie wiadomości z kolejki w pętli
	for d := range msgs {
		// Wyświetlanie treści wiadomości
		log.Printf("Odebrano wiadomość: %s", d.Body)
	}
}