Prosty tryb kolejki Golang RabbitMQ
Wyjaśnienie: P oznacza producenta, C oznacza konsumenta, a czerwony kolor oznacza kolejkę.
Uwaga: Jeśli nie jesteś zaznajomiony z RabbitMQ, proszę przeczytaj najpierw sekcję Podstawowe pojęcia RabbitMQ.
1. Instalacja zależności
go get github.com/streadway/amqp
Zaimportuj pakiet zależności
import (
"github.com/streadway/amqp"
)
2. Wysyłanie wiadomości
Poniższe kroki pokazują, jak producent wiadomości wykonuje przekazywanie wiadomości.
2.1. Połączenie z serwerem RabbitMQ
// Połączenie z serwerem RabbitMQ
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
defer conn.Close()
Wyjaśnienie adresu połączenia:
amqp://nazwaużytkownika:hasło@adresRabbitMQ:port/
2.2. Utworzenie kanału
Większość operacji jest wykonywana na kanale.
ch, err := conn.Channel()
defer ch.Close()
2.3. Deklaracja kolejki
Oznacza kolejkę, z której będziemy czytać lub do której będziemy pisać.
q, err := ch.QueueDeclare(
"hello", // Nazwa kolejki
false, // Trwała wiadomość
false, // Usunięcie kolejki, gdy nie jest używana
false, // Wyłączna
false, // Bez oczekiwania
nil, // Argumenty
)
2.4. Przesyłanie wiadomości
// Zawartość wiadomości
body := "Witaj Świecie!"
// Przekaż wiadomość
err = ch.Publish(
"", // Wymiana (pomijamy tutaj)
q.Name, // Parametr routingu, użyj nazwy kolejki jako parametru routingu
false, // Wymagane
false, // Natychmiastowe
amqp.Publishing {
ContentType: "text/plain",
Body: []byte(body), // Zawartość wiadomości
})
2.5. Pełny kod wysyłania wiadomości
package main
// Importuj pakiety
import (
"log"
"github.com/streadway/amqp"
)
// Obsłuż błędy
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}
func main() {
// Połącz z RabbitMQ
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Nie udało się połączyć z RabbitMQ")
defer conn.Close()
// Utwórz kanał
ch, err := conn.Channel()
failOnError(err, "Nie udało się otworzyć kanału")
defer ch.Close()
// Zadeklaruj kolejkę do operacji
q, err := ch.QueueDeclare(
"hello", // Nazwa
false, // Trwała
false, // Usuń, gdy nie jest używana
false, // Wyłączna
false, // Bez oczekiwania
nil, // Argumenty
)
failOnError(err, "Nie udało się zadeklarować kolejki")
// Zawartość wiadomości do wysłania
body := "Witaj Świecie!"
// Wyślij wiadomość
err = ch.Publish(
"", // Wymiana
q.Name, // Klucz routingu
false, // Wymagane
false, // Natychmiastowe
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(body),
})
failOnError(err, "Nie udało się opublikować wiadomości")
log.Printf(" [x] Wysłano %s", body)
}
3. Odbieranie wiadomości
Pierwsze trzy kroki odbierania wiadomości są takie same jak przy wysyłaniu wiadomości, odpowiadające odpowiednio sekcjom 2.1, 2.2 i 2.3. Kompletny kod do odbierania wiadomości wygląda następująco:
package main
// Importowanie pakietów
import (
"log"
"github.com/streadway/amqp"
)
// Obsługa błędów
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}
func main() {
// Połączenie z RabbitMQ
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Błąd podczas łączenia z RabbitMQ")
defer conn.Close()
// Utworzenie kanału
ch, err := conn.Channel()
failOnError(err, "Błąd podczas otwierania kanału")
defer ch.Close()
// Deklaracja kolejki, na której będą operacje
q, err := ch.QueueDeclare(
"hello", // Nazwa kolejki musi być zgodna z nazwą kolejki do wysyłania wiadomości
false, // trwała
false, // usuń, gdy nieużywana
false, // wyłączna
false, // no-wait
nil, // argumenty
)
failOnError(err, "Błąd deklaracji kolejki")
// Utworzenie konsumenta wiadomości
msgs, err := ch.Consume(
q.Name, // Nazwa kolejki
"", // Nazwa konsumenta, jeśli nie zostanie wypełniona, to zostanie automatycznie wygenerowany unikalny identyfikator
true, // Automatyczne potwierdzanie wiadomości, czyli automatycznie informuje RabbitMQ, że wiadomość została pomyślnie przetworzona
false, // wyłączna
false, // brak lokalne
false, // no-wait
nil, // args
)
failOnError(err, "Błąd rejestracji konsumenta")
// Pobieranie wiadomości z kolejki w pętli
for d := range msgs {
// Wyświetlanie treści wiadomości
log.Printf("Odebrano wiadomość: %s", d.Body)
}
}