Режим работы RabbitMQ в Golang, обеспечивающий параллельное потребление несколькими потребителями.
Пояснение: P представляет производителя, C1 и C2 представляют потребителей, а красный цвет представляет очередь.
Совет: Каждое сообщение может быть потреблено только одним потребителем.
Предварительное руководство
Пожалуйста, сначала прочтите Быстрый старт по RabbitMQ в Golang, чтобы понять основные операции Golang с RabbitMQ. Если вы не знакомы с RabbitMQ, пожалуйста, сначала прочтите предыдущие главы.
Параллельное потребление
В Golang в основном используются горутины для реализации нескольких потребителей. Ниже приведена реализация нескольких потребителей.
Совет: Для отправки сообщений обратитесь к Быстрому старту по RabbitMQ в Golang.
package main
import (
"log"
"time"
"github.com/streadway/amqp"
)
// Обработка ошибок
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}
func main() {
// Подключение к RabbitMQ
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Не удалось подключиться к RabbitMQ")
defer conn.Close()
// Создание 5 потребителей с помощью горутин
for i := 0; i < 5; i++ {
go func(number int) {
// Создание канала RabbitMQ для каждого потребителя
ch, err := conn.Channel()
failOnError(err, "Не удалось открыть канал")
defer ch.Close()
// Объявление очереди для операций
q, err := ch.QueueDeclare(
"hello", // Имя очереди
false, // Долговечность
false, // Удаление при неиспользовании
false, // Exclusive
false, // No-wait
nil, // Аргументы
)
failOnError(err, "Не удалось объявить очередь")
// Создание потребителя
msgs, err := ch.Consume(
q.Name, // Название очереди для операций
"", // Уникальный идентификатор потребителя; если не указан, значение генерируется автоматически
true, // Автоматическое подтверждение сообщений (т. е. автоматическое подтверждение обработки сообщения)
false, // Exclusive
false, // No-local
false, // No-wait
nil, // Args
)
failOnError(err, "Не удалось зарегистрировать потребителя")
// Обработка сообщений в цикле
for d := range msgs {
log.Printf("[Номер потребителя=%d] Получено сообщение: %s", number, d.Body)
// Имитация бизнес-обработки, задержка на 1 секунду
time.Sleep(time.Second)
}
}(i)
}
// Приостановка основной горутины, чтобы программа не завершалась
вечно := make(chan bool)
<-вечно
}
Совет: Не зависимо от типа обмена, используемого RabbitMQ, очереди могут иметь несколько потребителей, и способ запуска нескольких потребителей такой же, как в этом примере.