Modello di pubblicazione/sottoscrizione in Golang RabbitMQ (Modalità broadcast, modalità fanout)
Il modello di pubblicazione/sottoscrizione in RabbitMQ significa che un messaggio inviato da un produttore verrà elaborato da più consumatori.
Spiegazione:
- P rappresenta il produttore, C1 e C2 rappresentano i consumatori, il rosso rappresenta le code e X rappresenta lo scambio.
- Lo scambio è responsabile di instradare i messaggi a tutte le code vincolate allo scambio.
- Possono essere definite più code, ognuna vincolata allo stesso scambio.
- Ogni coda può avere uno o più consumatori.
Nota: Se non sei familiare con RabbitMQ, leggi prima la sezione dei Concetti di base di RabbitMQ.
1. Installare il pacchetto di dipendenza
go get github.com/streadway/amqp
2. Inviare messaggi
I seguenti passaggi mostrano come il produttore di messaggi invia messaggi.
2.1. Connettersi al server RabbitMQ
// Connettiti al server RabbitMQ
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
defer conn.Close()
Spiegazione dell'indirizzo di connessione:
amqp://nomeutente:password@indirizzoRabbitMQ:porta/
2.2. Creare un canale
La maggior parte delle operazioni viene eseguita sul canale.
ch, err := conn.Channel()
defer ch.Close()
2.3. Dichiarare uno scambio
I messaggi vengono prima inviati allo scambio. Lo scambio inoltra i messaggi alle code in base alla sua strategia.
err = ch.ExchangeDeclare(
"tizi365", // Nome dello scambio
"fanout", // Tipo di scambio, qui viene utilizzato il tipo fanout, cioè il modello di pubblicazione/sottoscrizione
true, // Durabilità
false, // Auto-cancellazione
false, // Interno
false, // No-wait
nil, // Argomenti
)
2.4. Pubblicare un messaggio
// Contenuto del messaggio
body := "Ciao Tizi365.com!"
// Pubblica il messaggio
err = ch.Publish(
"tizi365", // Scambio (nome dello scambio corrispondente alla dichiarazione precedente)
"", // Chiave di routing, per lo scambio di tipo fanout, la chiave di routing viene automaticamente ignorata, quindi non è necessario fornirne una
false, // Obbligatorio
false, // Immediato
amqp.Publishing {
ContentType: "text/plain", // Tipo di contenuto del messaggio, qui è testo normale
Body: []byte(body), // Contenuto del messaggio
})
2.5. Codice completo per l'invio del messaggio
package main
import (
"log"
"github.com/streadway/amqp"
)
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}
func main() {
// Connettersi a RabbitMQ
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Impossibile connettersi a RabbitMQ")
defer conn.Close()
// Creare un canale
ch, err := conn.Channel()
failOnError(err, "Impossibile aprire un canale")
defer ch.Close()
// Dichiarare uno scambio
err = ch.ExchangeDeclare(
"tizi365", // Nome dello scambio
"fanout", // Tipo di scambio, fanout per la modalità pubblicazione/sottoscrizione
true, // Durabilità
false, // Auto-cancellazione
false, // Interno
false, // No-wait
nil, // Argomenti
)
failOnError(err, "Impossibile dichiarare uno scambio")
// Contenuto del messaggio
body := "Ciao Tizi365.com!"
// Inoltra il messaggio
err = ch.Publish(
"tizi365", // Scambio (corrispondente alla dichiarazione sopra)
"", // Chiave di routing, per gli scambi di tipo fanout, la chiave di routing viene automaticamente ignorata
false, // Obbligatorio
false, // Immediato
amqp.Publishing {
ContentType: "text/plain", // Tipo di contenuto del messaggio, qui è testo normale
Body: []byte(body), // Contenuto del messaggio
})
log.Printf("Contenuto inviato %s", body)
}
3. Ricevere messaggi
I primi tre passaggi per la ricezione dei messaggi - connessione a RabbitMQ, creazione di un canale e dichiarazione di uno scambio - sono gli stessi dell'invio dei messaggi. Fare riferimento alle sezioni precedenti 2.1, 2.2 e 2.3.
3.1. Dichiarare una Coda
Dichiara la coda su cui operare
q, err := ch.QueueDeclare(
"", // Nome della coda, se non specificato, ne verrà generato uno casuale
false, // Durabile
false, // Elimina quando inutilizzata
true, // Esclusiva
false, // No-wait
nil, // Argomenti
)
3.2. Collegare la Coda all'Exchange
La coda deve essere collegata all'exchange per ricevere messaggi
err = ch.QueueBind(
q.Name, // Nome della coda
"", // Chiave di instradamento, per gli exchange di tipo fanout, la chiave di instradamento viene automaticamente ignorata
"tizi365", // Nome dell'exchange, deve corrispondere a quello definito dal mittente del messaggio
false,
nil)
Nota: Nelle applicazioni reali, possiamo definire N code, ognuna collegata allo stesso exchange, per ricevere i messaggi inoltrati dall'exchange. Questo è dove è riflessa la modalità di pubblicazione/sottoscrizione.
3.3. Creare un Consumatore
msgs, err := ch.Consume(
q.Name, // Fai riferimento al nome della coda sopra
"", // Nome del consumatore, se non specificato, verrà generato automaticamente
true, // Acknowledge automatico del messaggio elaborato
false, // Esclusivo
false, // No-local
false, // No-wait
nil, // Argomenti
)
// Loop per gestire i messaggi
for d := range msgs {
log.Printf("Messaggio ricevuto=%s", d.Body)
}
3.4. Completa il Codice del Consumatore
package main
import (
"log"
"github.com/streadway/amqp"
)
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}
func main() {
// Connettersi a RabbitMQ
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Connessione a RabbitMQ fallita")
defer conn.Close()
// Creare un canale, di solito uno per consumatore
ch, err := conn.Channel()
failOnError(err, "Apertura di un canale fallita")
defer ch.Close()
// Dichiarare un exchange
err = ch.ExchangeDeclare(
"tizi365", // Nome dell'exchange, dovrebbe corrispondere a quello usato dal mittente del messaggio
"fanout", // Tipo di exchange
true, // Durabile
false, // Auto-cancellazione
false, // Interno
false, // No-wait
nil, // Argomenti
)
failOnError(err, "Dichiarazione dell'exchange fallita")
// Dichiarare la coda su cui operare
q, err := ch.QueueDeclare(
"", // Nome della coda, se vuoto verrà generato un nome casuale
false, // Durabile
false, // Elimina quando inutilizzata
true, // Esclusiva
false, // No-wait
nil, // Argomenti
)
failOnError(err, "Dichiarazione della coda fallita")
// Collegare la coda all'exchange specificato
err = ch.QueueBind(
q.Name, // Nome della coda
"", // Chiave di instradamento, ignorata per gli exchange fanout
"tizi365", // Nome dell'exchange, deve corrispondere a quello definito dal mittente del messaggio
false,
nil)
failOnError(err, "Collegamento della coda fallito")
// Creare un consumatore
msgs, err := ch.Consume(
q.Name, // Riferimento al nome della coda precedente
"", // Nome del consumatore, verrà generato automaticamente se vuoto
true, // Auto-ack
false, // Esclusivo
false, // No-local
false, // No-wait
nil, // Argomenti
)
failOnError(err, "Registrazione di un consumatore fallita")
// Consumare i messaggi dalla coda in un loop
for d := range msgs {
log.Printf("Messaggio ricevuto: %s", d.Body)
}
}
3.5. Consumatori Multipli
Consulta la sezione Modalità di lavoro e avvia semplicemente più consumatori utilizzando le goroutine.