Patrón de Publicación/Suscripción en Golang RabbitMQ (Modo de difusión, Modo de fanout)
El patrón de publicación/suscripción en RabbitMQ significa que un mensaje enviado por un productor será procesado por múltiples consumidores.
Explicación:
- P representa al productor, C1 y C2 representan consumidores, el rojo representa colas y X representa el exchange.
- El exchange es responsable de reenviar mensajes a todas las colas vinculadas al exchange.
- Se pueden definir múltiples colas, cada una vinculada al mismo exchange.
- Cada cola puede tener uno o más consumidores.
Nota: Si no estás familiarizado con RabbitMQ, por favor lee primero la sección Conceptos básicos de RabbitMQ.
1. Instalar el paquete de dependencia
go get github.com/streadway/amqp
2. Enviar mensajes
Los siguientes pasos demuestran cómo el productor de mensajes envía mensajes.
2.1. Conectar al servidor de RabbitMQ
// Conectar al servidor de RabbitMQ
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
defer conn.Close()
Explicación de la dirección de conexión:
amqp://usuario:contraseña@DirecciónRabbitMQ:Puerto/
2.2. Crear un canal
La mayoría de operaciones se realizan en el canal.
ch, err := conn.Channel()
defer ch.Close()
2.3. Declarar un exchange
Los mensajes se envían primero al exchange. El exchange reenvía mensajes a las colas según su estrategia.
err = ch.ExchangeDeclare(
"tizi365", // Nombre del exchange
"fanout", // Tipo de exchange, usando el tipo fanout aquí, es decir, patrón de publicación/suscripción
true, // Duradero
false, // Auto-eliminado
false, // Interno
false, // No esperar
nil, // Argumentos
)
2.4. Publicar un mensaje
// Contenido del mensaje
body := "¡Hola Tizi365.com!"
// Publicar el mensaje
err = ch.Publish(
"tizi365", // Exchange (nombre del exchange correspondiente a la declaración anterior)
"", // Clave de enrutamiento, para el exchange tipo fanout, la clave de enrutamiento se ignora automáticamente, por lo que no es necesario proporcionar una
false, // Obligatorio
false, // Inmediato
amqp.Publishing {
ContentType: "text/plain", // Tipo de contenido del mensaje, aquí es texto plano
Body: []byte(body), // Contenido del mensaje
})
2.5. Completar el código de envío de mensajes
package main
import (
"log"
"github.com/streadway/amqp"
)
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}
func main() {
// Conectar a RabbitMQ
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Fallo al conectar a RabbitMQ")
defer conn.Close()
// Crear un canal
ch, err := conn.Channel()
failOnError(err, "Fallo al abrir un canal")
defer ch.Close()
// Declarar un exchange
err = ch.ExchangeDeclare(
"tizi365", // Nombre del exchange
"fanout", // Tipo de exchange, fanout para modo de publicación/suscripción
true, // Duradero
false, // Auto-eliminar
false, // Interno
false, // No esperar
nil, // Argumentos
)
failOnError(err, "Fallo al declarar un exchange")
// Contenido del mensaje
body := "¡Hola Tizi365.com!"
// Enviar mensaje
err = ch.Publish(
"tizi365", // Exchange (coincide con la declaración anterior)
"", // Clave de enrutamiento, para exchanges tipo fanout, la clave de enrutamiento se ignora automáticamente
false, // Obligatorio
false, // Inmediato
amqp.Publishing {
ContentType: "text/plain", // Tipo de contenido del mensaje, aquí es texto plano
Body: []byte(body), // Contenido del mensaje
})
log.Printf("Se envió el contenido %s", body)
}
3. Recibir mensajes
Los primeros tres pasos para recibir mensajes: conectar a RabbitMQ, crear un canal y declarar un exchange, son iguales a los de enviar mensajes. Consulta las secciones anteriores 2.1, 2.2 y 2.3.
3.1. Declarar una Cola
Declara la cola en la que se va a operar
q, err := ch.QueueDeclare(
"", // Nombre de la cola, si no se especifica, se generará uno aleatorio
false, // Duradero
false, // Eliminar cuando no se use
true, // Exclusivo
false, // No esperar
nil, // Argumentos
)
3.2. Vincular la Cola a la Intercambio
La cola debe estar vinculada al intercambio para recibir mensajes
err = ch.QueueBind(
q.Name, // Nombre de la cola
"", // Clave de enrutamiento, para intercambios de tipo fanout, la clave de enrutamiento se ignora automáticamente
"tizi365", // Nombre del intercambio, debe coincidir con el definido por el remitente del mensaje
false,
nil)
Nota: En aplicaciones reales, podemos definir N colas, cada una vinculada al mismo intercambio, para recibir mensajes reenviados por el intercambio. Aquí es donde se refleja el patrón de publicación/suscripción.
3.3. Crear un Consumidor
msgs, err := ch.Consume(
q.Name, // Referencia al nombre de la cola de arriba
"", // Nombre del consumidor, si no se especifica, se generará automáticamente
true, // Reconocer automáticamente que el mensaje ha sido procesado
false, // Exclusivo
false, // No local
false, // No esperar
nil, // Argumentos
)
// Bucle para manejar mensajes
for d := range msgs {
log.Printf("Mensaje recibido=%s", d.Body)
}
3.4. Código del Consumidor Completo
package main
import (
"log"
"github.com/streadway/amqp"
)
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}
func main() {
// Conectar a RabbitMQ
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Error al conectarse a RabbitMQ")
defer conn.Close()
// Crear un canal, normalmente uno por consumidor
ch, err := conn.Channel()
failOnError(err, "Error al abrir un canal")
defer ch.Close()
// Declarar un intercambio
err = ch.ExchangeDeclare(
"tizi365", // Nombre del intercambio, debe coincidir con el utilizado por el remitente del mensaje
"fanout", // Tipo de intercambio
true, // Duradero
false, // Auto-eliminado
false, // Interno
false, // No esperar
nil, // Argumentos
)
failOnError(err, "Error al declarar un intercambio")
// Declarar la cola en la que operar
q, err := ch.QueueDeclare(
"", // Nombre de la cola, si está vacío se generará un nombre aleatorio
false, // Duradero
false, // Eliminar cuando no se use
true, // Exclusivo
false, // No esperar
nil, // Argumentos
)
failOnError(err, "Error al declarar una cola")
// Vincular la cola al intercambio especificado
err = ch.QueueBind(
q.Name, // Nombre de la cola
"", // Clave de enrutamiento, ignorada para intercambios fanout
"tizi365", // Nombre del intercambio, debe coincidir con el definido por el remitente del mensaje
false,
nil)
failOnError(err, "Error al vincular una cola")
// Crear un consumidor
msgs, err := ch.Consume(
q.Name, // Referencia al nombre de la cola anterior
"", // Nombre del consumidor, se generará automáticamente si está vacío
true, // Auto-aceptación
false, // Exclusivo
false, // No local
false, // No esperar
nil, // Argumentos
)
failOnError(err, "Error al registrar un consumidor")
// Consumir mensajes de la cola en un bucle
for d := range msgs {
log.Printf("Mensaje recibido: %s", d.Body)
}
}
3.5. Múltiples Consumidores
Consulte la sección Modo de trabajo y simplemente inicie múltiples consumidores utilizando goroutines.