Режим работы RabbitMQ в Golang, обеспечивающий параллельное потребление несколькими потребителями.

Очередь выполнения Golang

Пояснение: P представляет производителя, C1 и C2 представляют потребителей, а красный цвет представляет очередь.

Совет: Каждое сообщение может быть потреблено только одним потребителем.

Предварительное руководство

Пожалуйста, сначала прочтите Быстрый старт по RabbitMQ в Golang, чтобы понять основные операции Golang с RabbitMQ. Если вы не знакомы с RabbitMQ, пожалуйста, сначала прочтите предыдущие главы.

Параллельное потребление

В Golang в основном используются горутины для реализации нескольких потребителей. Ниже приведена реализация нескольких потребителей.

Совет: Для отправки сообщений обратитесь к Быстрому старту по RabbitMQ в Golang.

package main

import (
	"log"
	"time"

	"github.com/streadway/amqp"
)

// Обработка ошибок
func failOnError(err error, msg string) {
	if err != nil {
		log.Fatalf("%s: %s", msg, err)
	}
}

func main() {
	// Подключение к RabbitMQ
	conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
	failOnError(err, "Не удалось подключиться к RabbitMQ")
	defer conn.Close()

	// Создание 5 потребителей с помощью горутин
	for i := 0; i < 5; i++ {
		go func(number int) {
			// Создание канала RabbitMQ для каждого потребителя
			ch, err := conn.Channel()
			failOnError(err, "Не удалось открыть канал")
			defer ch.Close()

			// Объявление очереди для операций
			q, err := ch.QueueDeclare(
				"hello", // Имя очереди
				false,   // Долговечность
				false,   // Удаление при неиспользовании
				false,   // Exclusive
				false,   // No-wait
				nil,     // Аргументы
			)
			failOnError(err, "Не удалось объявить очередь")

			// Создание потребителя
			msgs, err := ch.Consume(
				q.Name, // Название очереди для операций
				"",     // Уникальный идентификатор потребителя; если не указан, значение генерируется автоматически
				true,   // Автоматическое подтверждение сообщений (т. е. автоматическое подтверждение обработки сообщения)
				false,  // Exclusive
				false,  // No-local
				false,  // No-wait
				nil,    // Args
			)
			failOnError(err, "Не удалось зарегистрировать потребителя")

			// Обработка сообщений в цикле
			for d := range msgs {
				log.Printf("[Номер потребителя=%d] Получено сообщение: %s", number, d.Body)
				// Имитация бизнес-обработки, задержка на 1 секунду
				time.Sleep(time.Second)
			}
		}(i)
	}

	// Приостановка основной горутины, чтобы программа не завершалась
	вечно := make(chan bool)
	<-вечно
}

Совет: Не зависимо от типа обмена, используемого RabbitMQ, очереди могут иметь несколько потребителей, и способ запуска нескольких потребителей такой же, как в этом примере.