Patrón de Publicación/Suscripción en Golang RabbitMQ (Modo de difusión, Modo de fanout)

El patrón de publicación/suscripción en RabbitMQ significa que un mensaje enviado por un productor será procesado por múltiples consumidores.

Modo de fanout

Explicación:

  • P representa al productor, C1 y C2 representan consumidores, el rojo representa colas y X representa el exchange.
  • El exchange es responsable de reenviar mensajes a todas las colas vinculadas al exchange.
  • Se pueden definir múltiples colas, cada una vinculada al mismo exchange.
  • Cada cola puede tener uno o más consumidores.

Nota: Si no estás familiarizado con RabbitMQ, por favor lee primero la sección Conceptos básicos de RabbitMQ.

1. Instalar el paquete de dependencia

go get github.com/streadway/amqp

2. Enviar mensajes

Los siguientes pasos demuestran cómo el productor de mensajes envía mensajes.

2.1. Conectar al servidor de RabbitMQ

// Conectar al servidor de RabbitMQ
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
defer conn.Close()

Explicación de la dirección de conexión:

amqp://usuario:contraseña@DirecciónRabbitMQ:Puerto/

2.2. Crear un canal

La mayoría de operaciones se realizan en el canal.

ch, err := conn.Channel()
defer ch.Close()

2.3. Declarar un exchange

Los mensajes se envían primero al exchange. El exchange reenvía mensajes a las colas según su estrategia.

err = ch.ExchangeDeclare(
	"tizi365",   // Nombre del exchange
	"fanout", // Tipo de exchange, usando el tipo fanout aquí, es decir, patrón de publicación/suscripción
	true,     // Duradero
	false,    // Auto-eliminado
	false,    // Interno
	false,    // No esperar
	nil,      // Argumentos
)

2.4. Publicar un mensaje

// Contenido del mensaje
body := "¡Hola Tizi365.com!"

// Publicar el mensaje
err = ch.Publish(
  "tizi365",     // Exchange (nombre del exchange correspondiente a la declaración anterior)
  "", // Clave de enrutamiento, para el exchange tipo fanout, la clave de enrutamiento se ignora automáticamente, por lo que no es necesario proporcionar una
  false,  // Obligatorio
  false,  // Inmediato
  amqp.Publishing {
    ContentType: "text/plain", // Tipo de contenido del mensaje, aquí es texto plano
    Body:        []byte(body),  // Contenido del mensaje
  })

2.5. Completar el código de envío de mensajes

package main

import (
	"log"
	"github.com/streadway/amqp"
)

func failOnError(err error, msg string) {
	if err != nil {
		log.Fatalf("%s: %s", msg, err)
	}
}

func main() {
	// Conectar a RabbitMQ
	conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
	failOnError(err, "Fallo al conectar a RabbitMQ")
	defer conn.Close()

	// Crear un canal
	ch, err := conn.Channel()
	failOnError(err, "Fallo al abrir un canal")
	defer ch.Close()

	// Declarar un exchange
	err = ch.ExchangeDeclare(
		"tizi365",   // Nombre del exchange
		"fanout", // Tipo de exchange, fanout para modo de publicación/suscripción
		true,     // Duradero
		false,    // Auto-eliminar
		false,    // Interno
		false,    // No esperar
		nil,      // Argumentos
	)
	failOnError(err, "Fallo al declarar un exchange")

	// Contenido del mensaje
	body := "¡Hola Tizi365.com!"
	// Enviar mensaje
	err = ch.Publish(
		"tizi365",     // Exchange (coincide con la declaración anterior)
		"", // Clave de enrutamiento, para exchanges tipo fanout, la clave de enrutamiento se ignora automáticamente
		false,  // Obligatorio
		false,  // Inmediato
		amqp.Publishing {
			ContentType: "text/plain", // Tipo de contenido del mensaje, aquí es texto plano
			Body:        []byte(body),  // Contenido del mensaje
		})

	log.Printf("Se envió el contenido %s", body)
}

3. Recibir mensajes

Los primeros tres pasos para recibir mensajes: conectar a RabbitMQ, crear un canal y declarar un exchange, son iguales a los de enviar mensajes. Consulta las secciones anteriores 2.1, 2.2 y 2.3.

3.1. Declarar una Cola

Declara la cola en la que se va a operar

q, err := ch.QueueDeclare(
		"",    // Nombre de la cola, si no se especifica, se generará uno aleatorio
		false, // Duradero
		false, // Eliminar cuando no se use
		true,  // Exclusivo
		false, // No esperar
		nil,   // Argumentos
	)

3.2. Vincular la Cola a la Intercambio

La cola debe estar vinculada al intercambio para recibir mensajes

err = ch.QueueBind(
		q.Name, // Nombre de la cola
		"",     // Clave de enrutamiento, para intercambios de tipo fanout, la clave de enrutamiento se ignora automáticamente
		"tizi365", // Nombre del intercambio, debe coincidir con el definido por el remitente del mensaje
		false,
		nil)

Nota: En aplicaciones reales, podemos definir N colas, cada una vinculada al mismo intercambio, para recibir mensajes reenviados por el intercambio. Aquí es donde se refleja el patrón de publicación/suscripción.

3.3. Crear un Consumidor

msgs, err := ch.Consume(
		q.Name, // Referencia al nombre de la cola de arriba
		"",     // Nombre del consumidor, si no se especifica, se generará automáticamente
		true,   // Reconocer automáticamente que el mensaje ha sido procesado
		false,  // Exclusivo
		false,  // No local
		false,  // No esperar
		nil,    // Argumentos
	)
	
// Bucle para manejar mensajes
for d := range msgs {
	log.Printf("Mensaje recibido=%s", d.Body)
}

3.4. Código del Consumidor Completo

package main

import (
	"log"

	"github.com/streadway/amqp"
)

func failOnError(err error, msg string) {
	if err != nil {
		log.Fatalf("%s: %s", msg, err)
	}
}

func main() {
	// Conectar a RabbitMQ
	conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
	failOnError(err, "Error al conectarse a RabbitMQ")
	defer conn.Close()

	// Crear un canal, normalmente uno por consumidor
	ch, err := conn.Channel()
	failOnError(err, "Error al abrir un canal")
	defer ch.Close()

	// Declarar un intercambio
	err = ch.ExchangeDeclare(
		"tizi365",   // Nombre del intercambio, debe coincidir con el utilizado por el remitente del mensaje
		"fanout", // Tipo de intercambio
		true,     // Duradero
		false,    // Auto-eliminado
		false,    // Interno
		false,    // No esperar
		nil,      // Argumentos
	)
	failOnError(err, "Error al declarar un intercambio")

	// Declarar la cola en la que operar
	q, err := ch.QueueDeclare(
		"",    // Nombre de la cola, si está vacío se generará un nombre aleatorio
		false, // Duradero
		false, // Eliminar cuando no se use
		true,  // Exclusivo
		false, // No esperar
		nil,   // Argumentos
	)
	failOnError(err, "Error al declarar una cola")

	// Vincular la cola al intercambio especificado
	err = ch.QueueBind(
		q.Name, // Nombre de la cola
		"",     // Clave de enrutamiento, ignorada para intercambios fanout
		"tizi365", // Nombre del intercambio, debe coincidir con el definido por el remitente del mensaje
		false,
		nil)
	failOnError(err, "Error al vincular una cola")

	// Crear un consumidor
	msgs, err := ch.Consume(
		q.Name, // Referencia al nombre de la cola anterior
		"",     // Nombre del consumidor, se generará automáticamente si está vacío
		true,   // Auto-aceptación
		false,  // Exclusivo
		false,  // No local
		false,  // No esperar
		nil,    // Argumentos
	)
	failOnError(err, "Error al registrar un consumidor")

	// Consumir mensajes de la cola en un bucle
	for d := range msgs {
		log.Printf("Mensaje recibido: %s", d.Body)
	}
}

3.5. Múltiples Consumidores

Consulte la sección Modo de trabajo y simplemente inicie múltiples consumidores utilizando goroutines.