Простой режим очереди в Golang RabbitMQ
Пояснение: P представляет производителя, C - потребителя, а красный цвет представляет очередь.
Примечание: Если вы не знакомы с RabbitMQ, пожалуйста, сначала прочтите раздел Основные концепции RabbitMQ.
1. Установка зависимостей
go get github.com/streadway/amqp
Импорт зависимости
import (
"github.com/streadway/amqp"
)
2. Отправка сообщений
В следующих шагах показано, как производитель сообщений выполняет отправку сообщения.
2.1. Подключение к серверу RabbitMQ
// Подключение к серверу RabbitMQ
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
defer conn.Close()
Объяснение адреса подключения:
amqp://username:password@RabbitMQAddress:port/
2.2. Создание канала
Большинство операций выполняются на канале.
ch, err := conn.Channel()
defer ch.Close()
2.3. Объявление очереди
Представляет очередь, из которой мы собираемся читать или в которую писать.
q, err := ch.QueueDeclare(
"hello", // Имя очереди
false, // Сохранение сообщения
false, // Удаление очереди, когда она не используется
false, // Исключительность
false, // Без ожидания
nil, // Аргументы
)
2.4. Отправка сообщений
// Содержимое сообщения
body := "Привет, мир!"
// Отправка сообщения
err = ch.Publish(
"", // Обмен (игнорируется здесь)
q.Name, // Параметр маршрутизации, используется имя очереди в качестве параметра маршрутизации
false, // Обязательность
false, // Немедленность
amqp.Publishing {
ContentType: "text/plain",
Body: []byte(body), // Содержимое сообщения
})
2.5. Полный код для отправки сообщений
package main
// Импорт пакетов
import (
"log"
"github.com/streadway/amqp"
)
// Обработка ошибок
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}
func main() {
// Подключение к RabbitMQ
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Не удалось подключиться к RabbitMQ")
defer conn.Close()
// Создание канала
ch, err := conn.Channel()
failOnError(err, "Не удалось открыть канал")
defer ch.Close()
// Объявление очереди для работы
q, err := ch.QueueDeclare(
"hello", // Имя
false, // Долговечность
false, // Удаление при неиспользовании
false, // Исключительность
false, // Без ожидания
nil, // Аргументы
)
failOnError(err, "Не удалось объявить очередь")
// Содержимое сообщения для отправки
body := "Привет, мир!"
// Отправка сообщения
err = ch.Publish(
"", // Обмен
q.Name, // Ключ маршрутизации
false, // Обязательность
false, // Немедленность
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(body),
})
failOnError(err, "Не удалось опубликовать сообщение")
log.Printf(" [x] Отправлено %s", body)
}
3. Получение сообщений
Первые три шага получения сообщений аналогичны отправке сообщений, соответствующие разделам 2.1, 2.2 и 2.3 соответственно. Полный код для получения сообщений выглядит следующим образом:
package main
// Импорт пакетов
import (
"log"
"github.com/streadway/amqp"
)
// Обработка ошибок
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}
func main() {
// Подключение к RabbitMQ
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Не удалось подключиться к RabbitMQ")
defer conn.Close()
// Создание канала
ch, err := conn.Channel()
failOnError(err, "Не удалось открыть канал")
defer ch.Close()
// Объявление очереди для работы с ней
q, err := ch.QueueDeclare(
"hello", // Имя очереди должно совпадать с именем очереди для отправки сообщений
false, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
failOnError(err, "Не удалось объявить очередь")
// Создание потребителя сообщений
msgs, err := ch.Consume(
q.Name, // Имя очереди
"", // Имя потребителя, если не заполнено, будет автоматически генерироваться уникальный идентификатор
true, // Автоматически ли подтверждать получение сообщений, т.е. автоматически уведомлять RabbitMQ о том, что сообщение было успешно обработано
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
failOnError(err, "Не удалось зарегистрировать потребителя")
// Получение сообщений из очереди в цикле
for d := range msgs {
// Вывод содержимого сообщения
log.Printf("Получено сообщение: %s", d.Body)
}
}