Modalità Coda Semplice di Golang RabbitMQ
Spiegazione: P rappresenta il produttore, C rappresenta il consumatore, e rosso rappresenta la coda.
Nota: Se non sei familiare con RabbitMQ, per favore leggi prima la sezione RabbitMQ Concetti di Base.
1. Installare le Dipendenze
go get github.com/streadway/amqp
Importa il pacchetto delle dipendenze
import (
"github.com/streadway/amqp"
)
2. Inviare Messaggi
I seguenti passaggi dimostrano come il produttore di messaggi completi effettua l'invio del messaggio.
2.1. Connettersi al Server RabbitMQ
// Connettersi al Server RabbitMQ
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
defer conn.Close()
Spiegazione dell'indirizzo di connessione:
amqp://username:password@RabbitMQAddress:port/
2.2. Creare un Canale
La maggior parte delle operazioni viene eseguita sul canale.
ch, err := conn.Channel()
defer ch.Close()
2.3. Dichiarare una Coda
Rappresenta la coda da cui leggere o a cui scrivere.
q, err := ch.QueueDeclare(
"hello", // Nome della coda
false, // Persistenza del messaggio
false, // Elimina la coda quando non in uso
false, // Esclusiva
false, // No-wait
nil, // Argomenti
)
2.4. Inviare Messaggi
// Contenuto del messaggio
body := "Ciao Mondo!"
// Inviare il messaggio
err = ch.Publish(
"", // Exchange (ignorato qui)
q.Name, // Parametro di routing, utilizza il nome della coda come parametro di routing
false, // Obbligatorio
false, // Immediato
amqp.Publishing {
ContentType: "text/plain",
Body: []byte(body), // Contenuto del messaggio
})
2.5. Codice Completo per l'Invio dei Messaggi
package main
// Importa i pacchetti
import (
"log"
"github.com/streadway/amqp"
)
// Gestire gli errori
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}
func main() {
// Connettersi a RabbitMQ
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Impossibile connettersi a RabbitMQ")
defer conn.Close()
// Creare un canale
ch, err := conn.Channel()
failOnError(err, "Impossibile aprire un canale")
defer ch.Close()
// Dichiarare la coda su cui operare
q, err := ch.QueueDeclare(
"hello", // Nome
false, // Durable
false, // Elimina quando inutilizzata
false, // Esclusiva
false, // No-wait
nil, // Argomenti
)
failOnError(err, "Impossibile dichiarare una coda")
// Contenuto del messaggio da inviare
body := "Ciao Mondo!"
// Inviare il messaggio
err = ch.Publish(
"", // Exchange
q.Name, // Chiave di routing
false, // Obbligatorio
false, // Immediato
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(body),
})
failOnError(err, "Impossibile pubblicare un messaggio")
log.Printf(" [x] Inviato %s", body)
}
3. Ricezione di messaggi
I primi tre passaggi per ricevere i messaggi sono gli stessi dell'invio dei messaggi, corrispondenti alle sezioni 2.1, 2.2 e 2.3 rispettivamente. Il codice completo per la ricezione dei messaggi è il seguente:
pacchetto principale
// Importa i pacchetti
import (
"log"
"github.com/streadway/amqp"
)
// Gestione degli errori
funzione failOnError(err errore, msg stringa) {
se err != nil {
log.Fatalf("%s: %s", msg, err)
}
}
funzione principale() {
// Connettiti a RabbitMQ
connessione, err := amqp.Dial("amqp://ospite:ospite@localhost:5672/")
failOnError(err, "Impossibile connettersi a RabbitMQ")
defer connessione.Close()
// Crea un canale
ch, err := connessione.Channel()
failOnError(err, "Impossibile aprire un canale")
defer ch.Close()
// Dichiara la coda su cui operare
q, err := ch.QueueDeclare(
"ciao", // Il nome della coda deve essere coerente con il nome della coda per l'invio dei messaggi
false, // persistente
false, // cancella quando inutilizzata
false, // esclusiva
false, // no-wait
nil, // argomenti
)
failOnError(err, "Impossibile dichiarare una coda")
// Crea un consumatore di messaggi
msgs, err := ch.Consume(
q.Name, // Nome della coda
"", // Nome del consumatore, se non compilato, verrà generato automaticamente un ID univoco
true, // Se auto-acknowledgement, cioè informare automaticamente RabbitMQ che il messaggio è stato elaborato con successo
false, // esclusivo
false, // no-local
false, // no-wait
nil, // args
)
failOnError(err, "Impossibile registrare un consumatore")
// Recupera i messaggi dalla coda in un ciclo
per d := range msgs {
// Stampa il contenuto del messaggio
log.Printf("Ricevuto un messaggio: %s", d.Body)
}
}