Modo de trabajo de RabbitMQ en Golang, logrando el consumo concurrente por múltiples consumidores.

Cola de trabajo de Golang

Explicación: P representa el productor, C1 y C2 representan los consumidores, y el color rojo representa la cola.

Consejo: Cada mensaje solo puede ser consumido por un consumidor.

Pre-tutorial

Por favor lee primero el Tutorial de Inicio Rápido de RabbitMQ en Golang para entender las operaciones básicas de Golang en RabbitMQ. Si no estás familiarizado con RabbitMQ, por favor lee los capítulos anteriores.

Consumo Concurrente

Golang principalmente utiliza goroutines para implementar múltiples consumidores, a continuación se muestra la implementación de múltiples consumidores.

Consejo: Para saber cómo enviar mensajes, consulta Tutorial de Inicio Rápido de RabbitMQ en Golang.

package main

import (
	"log"
	"time"
	"github.com/streadway/amqp"
)

// Manejo de errores
func failOnError(err error, msg string) {
	if err != nil {
		log.Fatalf("%s: %s", msg, err)
	}
}

func main() {
	// Conexión a RabbitMQ
	conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
	failOnError(err, "Error al conectar a RabbitMQ")
	defer conn.Close()

	// Crear 5 consumidores a través de goroutines
	for i := 0; i < 5; i++ {
		go func(number int) {
			// Crear un canal de rabbitmq para cada consumidor
			ch, err := conn.Channel()
			failOnError(err, "Error al abrir un canal")
			defer ch.Close()

			// Declarar la cola a operar
			q, err := ch.QueueDeclare(
				"hello", // Nombre de la cola
				false,   // Duradera
				false,   // Eliminar cuando no se use
				false,   // Exclusiva
				false,   // No esperar
				nil,     // Argumentos
			)
			failOnError(err, "Error al declarar una cola")

			// Crear un consumidor
			msgs, err := ch.Consume(
				q.Name, // Nombre de la cola a operar
				"",     // Identificación única del consumidor, si no se completa, se genera un valor único automáticamente
				true,   // Auto-reconocer mensajes (es decir, confirmar automáticamente que el mensaje ha sido procesado)
				false,  // Exclusiva
				false,  // No local
				false,  // No esperar
				nil,    // Argumentos
			)
			failOnError(err, "Error al registrar un consumidor")

			// Procesar mensajes en un bucle
			for d := range msgs {
				log.Printf("[Número de consumidor=%d] Mensaje recibido: %s", number, d.Body)
				// Simular procesamiento de negocio, dormir durante 1 segundo
				time.Sleep(time.Second)
			}
		}(i)
	}

	// Colgar la goroutine principal para evitar que el programa salga
	forever := make(chan bool)
	<-forever
}

Consejo: Independientemente del tipo de intercambio utilizado por RabbitMQ, las colas pueden tener múltiples consumidores, y la forma de iniciar múltiples consumidores es la misma que en este ejemplo.