Modo de trabajo de RabbitMQ en Golang, logrando el consumo concurrente por múltiples consumidores.
Explicación: P representa el productor, C1 y C2 representan los consumidores, y el color rojo representa la cola.
Consejo: Cada mensaje solo puede ser consumido por un consumidor.
Pre-tutorial
Por favor lee primero el Tutorial de Inicio Rápido de RabbitMQ en Golang para entender las operaciones básicas de Golang en RabbitMQ. Si no estás familiarizado con RabbitMQ, por favor lee los capítulos anteriores.
Consumo Concurrente
Golang principalmente utiliza goroutines para implementar múltiples consumidores, a continuación se muestra la implementación de múltiples consumidores.
Consejo: Para saber cómo enviar mensajes, consulta Tutorial de Inicio Rápido de RabbitMQ en Golang.
package main
import (
"log"
"time"
"github.com/streadway/amqp"
)
// Manejo de errores
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}
func main() {
// Conexión a RabbitMQ
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Error al conectar a RabbitMQ")
defer conn.Close()
// Crear 5 consumidores a través de goroutines
for i := 0; i < 5; i++ {
go func(number int) {
// Crear un canal de rabbitmq para cada consumidor
ch, err := conn.Channel()
failOnError(err, "Error al abrir un canal")
defer ch.Close()
// Declarar la cola a operar
q, err := ch.QueueDeclare(
"hello", // Nombre de la cola
false, // Duradera
false, // Eliminar cuando no se use
false, // Exclusiva
false, // No esperar
nil, // Argumentos
)
failOnError(err, "Error al declarar una cola")
// Crear un consumidor
msgs, err := ch.Consume(
q.Name, // Nombre de la cola a operar
"", // Identificación única del consumidor, si no se completa, se genera un valor único automáticamente
true, // Auto-reconocer mensajes (es decir, confirmar automáticamente que el mensaje ha sido procesado)
false, // Exclusiva
false, // No local
false, // No esperar
nil, // Argumentos
)
failOnError(err, "Error al registrar un consumidor")
// Procesar mensajes en un bucle
for d := range msgs {
log.Printf("[Número de consumidor=%d] Mensaje recibido: %s", number, d.Body)
// Simular procesamiento de negocio, dormir durante 1 segundo
time.Sleep(time.Second)
}
}(i)
}
// Colgar la goroutine principal para evitar que el programa salga
forever := make(chan bool)
<-forever
}
Consejo: Independientemente del tipo de intercambio utilizado por RabbitMQ, las colas pueden tener múltiples consumidores, y la forma de iniciar múltiples consumidores es la misma que en este ejemplo.