Modello di pubblicazione/sottoscrizione in Golang RabbitMQ (Modalità broadcast, modalità fanout)

Il modello di pubblicazione/sottoscrizione in RabbitMQ significa che un messaggio inviato da un produttore verrà elaborato da più consumatori.

Modalità fanout

Spiegazione:

  • P rappresenta il produttore, C1 e C2 rappresentano i consumatori, il rosso rappresenta le code e X rappresenta lo scambio.
  • Lo scambio è responsabile di instradare i messaggi a tutte le code vincolate allo scambio.
  • Possono essere definite più code, ognuna vincolata allo stesso scambio.
  • Ogni coda può avere uno o più consumatori.

Nota: Se non sei familiare con RabbitMQ, leggi prima la sezione dei Concetti di base di RabbitMQ.

1. Installare il pacchetto di dipendenza

go get github.com/streadway/amqp

2. Inviare messaggi

I seguenti passaggi mostrano come il produttore di messaggi invia messaggi.

2.1. Connettersi al server RabbitMQ

// Connettiti al server RabbitMQ
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
defer conn.Close()

Spiegazione dell'indirizzo di connessione:

amqp://nomeutente:password@indirizzoRabbitMQ:porta/

2.2. Creare un canale

La maggior parte delle operazioni viene eseguita sul canale.

ch, err := conn.Channel()
defer ch.Close()

2.3. Dichiarare uno scambio

I messaggi vengono prima inviati allo scambio. Lo scambio inoltra i messaggi alle code in base alla sua strategia.

err = ch.ExchangeDeclare(
  "tizi365",   // Nome dello scambio
  "fanout", // Tipo di scambio, qui viene utilizzato il tipo fanout, cioè il modello di pubblicazione/sottoscrizione
  true,     // Durabilità
  false,    // Auto-cancellazione
  false,    // Interno
  false,    // No-wait
  nil,      // Argomenti
)

2.4. Pubblicare un messaggio

// Contenuto del messaggio
body := "Ciao Tizi365.com!"

// Pubblica il messaggio
err = ch.Publish(
  "tizi365",     // Scambio (nome dello scambio corrispondente alla dichiarazione precedente)
  "", // Chiave di routing, per lo scambio di tipo fanout, la chiave di routing viene automaticamente ignorata, quindi non è necessario fornirne una
  false,  // Obbligatorio
  false,  // Immediato
  amqp.Publishing {
    ContentType: "text/plain", // Tipo di contenuto del messaggio, qui è testo normale
    Body:        []byte(body),  // Contenuto del messaggio
  })

2.5. Codice completo per l'invio del messaggio

package main

import (
  "log"
  "github.com/streadway/amqp"
)

func failOnError(err error, msg string) {
  if err != nil {
    log.Fatalf("%s: %s", msg, err)
  }
}

func main() {
  // Connettersi a RabbitMQ
  conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
  failOnError(err, "Impossibile connettersi a RabbitMQ")
  defer conn.Close()

  // Creare un canale
  ch, err := conn.Channel()
  failOnError(err, "Impossibile aprire un canale")
  defer ch.Close()

  // Dichiarare uno scambio
  err = ch.ExchangeDeclare(
    "tizi365",   // Nome dello scambio
    "fanout", // Tipo di scambio, fanout per la modalità pubblicazione/sottoscrizione
    true,     // Durabilità
    false,    // Auto-cancellazione
    false,    // Interno
    false,    // No-wait
    nil,      // Argomenti
  )
  failOnError(err, "Impossibile dichiarare uno scambio")

  // Contenuto del messaggio
  body := "Ciao Tizi365.com!"
  // Inoltra il messaggio
  err = ch.Publish(
    "tizi365",     // Scambio (corrispondente alla dichiarazione sopra)
    "", // Chiave di routing, per gli scambi di tipo fanout, la chiave di routing viene automaticamente ignorata
    false,  // Obbligatorio
    false,  // Immediato
    amqp.Publishing {
      ContentType: "text/plain", // Tipo di contenuto del messaggio, qui è testo normale
      Body:        []byte(body),  // Contenuto del messaggio
    })

  log.Printf("Contenuto inviato %s", body)
}

3. Ricevere messaggi

I primi tre passaggi per la ricezione dei messaggi - connessione a RabbitMQ, creazione di un canale e dichiarazione di uno scambio - sono gli stessi dell'invio dei messaggi. Fare riferimento alle sezioni precedenti 2.1, 2.2 e 2.3.

3.1. Dichiarare una Coda

Dichiara la coda su cui operare

q, err := ch.QueueDeclare(
		"",    // Nome della coda, se non specificato, ne verrà generato uno casuale
		false, // Durabile
		false, // Elimina quando inutilizzata
		true,  // Esclusiva
		false, // No-wait
		nil,   // Argomenti
	)

3.2. Collegare la Coda all'Exchange

La coda deve essere collegata all'exchange per ricevere messaggi

err = ch.QueueBind(
		q.Name, // Nome della coda
		"",     // Chiave di instradamento, per gli exchange di tipo fanout, la chiave di instradamento viene automaticamente ignorata
		"tizi365", // Nome dell'exchange, deve corrispondere a quello definito dal mittente del messaggio
		false,
		nil)

Nota: Nelle applicazioni reali, possiamo definire N code, ognuna collegata allo stesso exchange, per ricevere i messaggi inoltrati dall'exchange. Questo è dove è riflessa la modalità di pubblicazione/sottoscrizione.

3.3. Creare un Consumatore

msgs, err := ch.Consume(
		q.Name, // Fai riferimento al nome della coda sopra
		"",     // Nome del consumatore, se non specificato, verrà generato automaticamente
		true,   // Acknowledge automatico del messaggio elaborato
		false,  // Esclusivo
		false,  // No-local
		false,  // No-wait
		nil,    // Argomenti
	)
	
// Loop per gestire i messaggi
for d := range msgs {
	log.Printf("Messaggio ricevuto=%s", d.Body)
}

3.4. Completa il Codice del Consumatore

package main

import (
	"log"

	"github.com/streadway/amqp"
)

func failOnError(err error, msg string) {
	if err != nil {
		log.Fatalf("%s: %s", msg, err)
	}
}

func main() {
	// Connettersi a RabbitMQ
	conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
	failOnError(err, "Connessione a RabbitMQ fallita")
	defer conn.Close()

	// Creare un canale, di solito uno per consumatore
	ch, err := conn.Channel()
	failOnError(err, "Apertura di un canale fallita")
	defer ch.Close()

	// Dichiarare un exchange
	err = ch.ExchangeDeclare(
		"tizi365",   // Nome dell'exchange, dovrebbe corrispondere a quello usato dal mittente del messaggio
		"fanout", // Tipo di exchange
		true,     // Durabile
		false,    // Auto-cancellazione
		false,    // Interno
		false,    // No-wait
		nil,      // Argomenti
	)
	failOnError(err, "Dichiarazione dell'exchange fallita")

	// Dichiarare la coda su cui operare
	q, err := ch.QueueDeclare(
		"",    // Nome della coda, se vuoto verrà generato un nome casuale
		false, // Durabile
		false, // Elimina quando inutilizzata
		true,  // Esclusiva
		false, // No-wait
		nil,   // Argomenti
	)
	failOnError(err, "Dichiarazione della coda fallita")

	// Collegare la coda all'exchange specificato
	err = ch.QueueBind(
		q.Name, // Nome della coda
		"",     // Chiave di instradamento, ignorata per gli exchange fanout
		"tizi365", // Nome dell'exchange, deve corrispondere a quello definito dal mittente del messaggio
		false,
		nil)
	failOnError(err, "Collegamento della coda fallito")

	// Creare un consumatore
	msgs, err := ch.Consume(
		q.Name, // Riferimento al nome della coda precedente
		"",     // Nome del consumatore, verrà generato automaticamente se vuoto
		true,   // Auto-ack
		false,  // Esclusivo
		false,  // No-local
		false,  // No-wait
		nil,    // Argomenti
	)
	failOnError(err, "Registrazione di un consumatore fallita")

	// Consumare i messaggi dalla coda in un loop
	for d := range msgs {
		log.Printf("Messaggio ricevuto: %s", d.Body)
	}
}

3.5. Consumatori Multipli

Consulta la sezione Modalità di lavoro e avvia semplicemente più consumatori utilizzando le goroutine.