Modo di lavoro di RabbitMQ in Golang, ottenendo il consumo concorrente da parte di più consumatori.
Spiegazione: P rappresenta il produttore, C1 e C2 rappresentano i consumatori, e il colore rosso rappresenta la coda.
Suggerimento: Ogni messaggio può essere consumato solo da un consumatore.
Pre-tutorial
Per favore leggi prima il Tutorial di avvio rapido di Golang RabbitMQ per capire le operazioni di base di Golang su RabbitMQ. Se non sei familiare con RabbitMQ, per favore leggi prima i capitoli precedenti.
Consumo Concorrente
Golang principalmente utilizza le goroutine per implementare più consumatori, di seguito è riportata l'implementazione di più consumatori.
Suggerimento: Per sapere come inviare messaggi, consulta Tutorial di avvio rapido di Golang RabbitMQ.
package main
import (
"log"
"time"
"github.com/streadway/amqp"
)
// Gestione degli errori
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}
func main() {
// Connettersi a RabbitMQ
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Impossibile connettersi a RabbitMQ")
defer conn.Close()
// Creare 5 consumatori tramite goroutine
for i := 0; i < 5; i++ {
go func(number int) {
// Creare un canale rabbitmq per ogni consumatore
ch, err := conn.Channel()
failOnError(err, "Impossibile aprire un canale")
defer ch.Close()
// Dichiarare la coda da operare
q, err := ch.QueueDeclare(
"hello", // Nome coda
false, // Persistente
false, // Elimina quando non usata
false, // Esclusiva
false, // No-wait
nil, // Argomenti
)
failOnError(err, "Impossibile dichiarare una coda")
// Creare un consumatore
msgs, err := ch.Consume(
q.Name, // Nome coda da operare
"", // ID univoco del consumatore, se non compilato, viene generato automaticamente un valore univoco
true, // Auto-riconoscimento dei messaggi (ovvero conferma automatica che il messaggio è stato elaborato)
false, // Esclusivo
false, // No-local
false, // No-wait
nil, // Args
)
failOnError(err, "Impossibile registrare un consumatore")
// Elaborare i messaggi in un ciclo
for d := range msgs {
log.Printf("[Numero consumatore=%d] Messaggio ricevuto: %s", number, d.Body)
// Simulare l'elaborazione aziendale, dormire per 1 secondo
time.Sleep(time.Second)
}
}(i)
}
// Sospendere la goroutine principale per evitare che il programma esca
forever := make(chan bool)
<-forever
}
Suggerimento: Indipendentemente dal tipo di scambio utilizzato da RabbitMQ, le code possono avere più consumatori, e il modo di avviare più consumatori è lo stesso di questo esempio.