Паттерн Публикация/Подписка в RabbitMQ (Режим трансляции, Режим мультикастинга)
Паттерн публикации/подписки в RabbitMQ означает, что сообщение, отправленное производителем, будет обработано несколькими потребителями.
Пояснение:
- P представляет производителя, C1 и C2 представляют потребителей, красный цвет представляет очереди, а X представляет обмен.
- Обмен отвечает за пересылку сообщений во все очереди, привязанные к этому обмену.
- Можно определить несколько очередей, каждая из которых привязана к тому же обмену.
- У каждой очереди может быть один или более потребителей.
Примечание: Если вы не знакомы с RabbitMQ, пожалуйста, сначала прочтите раздел Основные концепции RabbitMQ.
1. Установить зависимость пакета
go get github.com/streadway/amqp
2. Отправка сообщений
Ниже приведены шаги для демонстрации отправки сообщений производителем.
2.1. Подключение к серверу RabbitMQ
// Подключение к серверу RabbitMQ
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
defer conn.Close()
Объяснение адреса подключения:
amqp://имяпользователя:пароль@АдресRabbitMQ:Порт/
2.2. Создание канала
Большинство операций выполняются на канале.
ch, err := conn.Channel()
defer ch.Close()
2.3. Объявление обмена
Сообщения сначала отправляются на обмен. Обмен пересылает сообщения в очереди на основе своей стратегии.
err = ch.ExchangeDeclare(
"tizi365", // Имя обмена
"fanout", // Тип обмена, здесь используется тип fanout, т.е. режим публикации/подписки
true, // Устойчивый
false, // Автоудаление
false, // Внутренний
false, // Без ожидания
nil, // Аргументы
)
2.4. Опубликовать сообщение
// Содержимое сообщения
body := "Привет Tizi365.com!"
// Опубликовать сообщение
err = ch.Publish(
"tizi365", // Обмен (имя обмена, соответствующее предыдущему объявлению)
"", // Маршрутизационный ключ, для обмена типа fanout маршрутизационный ключ автоматически игнорируется, поэтому его указывать не обязательно
false, // Обязательный
false, // Немедленный
amqp.Publishing {
ContentType: "text/plain", // Тип содержимого сообщения, здесь это обычный текст
Body: []byte(body), // Содержимое сообщения
})
2.5. Завершить код для отправки сообщения
package main
import (
"log"
"github.com/streadway/amqp"
)
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}
func main() {
// Подключение к RabbitMQ
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Не удалось подключиться к RabbitMQ")
defer conn.Close()
// Создание канала
ch, err := conn.Channel()
failOnError(err, "Не удалось открыть канал")
defer ch.Close()
// Объявление обмена
err = ch.ExchangeDeclare(
"tizi365", // Имя обмена
"fanout", // Тип обмена, fanout для режима публикации/подписки
true, // Устойчивый
false, // Автоудаление
false, // Внутренний
false, // Без ожидания
nil, // Аргументы
)
failOnError(err, "Не удалось объявить обмен")
// Содержимое сообщения
body := "Привет Tizi365.com!"
// Отправить сообщение
err = ch.Publish(
"tizi365", // Обмен (соответствует объявлению выше)
"", // Маршрутизационный ключ, для обменов типа fanout маршрутизационный ключ автоматически игнорируется
false, // Обязательный
false, // Немедленный
amqp.Publishing {
ContentType: "text/plain", // Тип содержимого сообщения, здесь обычный текст
Body: []byte(body), // Содержимое сообщения
})
log.Printf("Отправлено содержимое %s", body)
}
3. Получение сообщений
Первые три шага для получения сообщений — подключение к RabbitMQ, создание канала и объявление обмена — такие же, как при отправке сообщений. См. предыдущие разделы 2.1, 2.2 и 2.3.
3.1. Объявление очереди
Объявим очередь для операций
q, err := ch.QueueDeclare(
"", // Имя очереди, если не указано, будет сгенерировано случайное
false, // Долговечность
false, // Удалить при неиспользовании
true, // Исключительность
false, // Без ожидания
nil, // Аргументы
)
3.2. Привязка очереди к обменнику
Чтобы получать сообщения, очередь должна быть привязана к обменнику
err = ch.QueueBind(
q.Name, // Имя очереди
"", // Ключ маршрутизации, для обменников типа fanout ключ маршрутизации автоматически игнорируется
"tizi365", // Имя обменника, должно совпадать с тем, что задан отправителем сообщения
false,
nil)
Примечание: В реальных приложениях мы можем определить N очередей, каждая привязана к тому же обменнику, чтобы получать сообщения, пересылаемые обменником. Здесь проявляется паттерн издатель/подписчик.
3.3. Создание потребителя
msgs, err := ch.Consume(
q.Name, // Ссылка на имя очереди из предыдущего шага
"", // Имя потребителя, если не указано, будет сгенерировано автоматически
true, // Автоматически подтверждать обработку сообщения
false, // Исключительность
false, // Не использовать локально
false, // Без ожидания
nil, // Аргументы
)
// Цикл обработки сообщений
for d := range msgs {
log.Printf("Получено сообщение=%s", d.Body)
}
3.4. Завершение кода потребителя
package main
import (
"log"
"github.com/streadway/amqp"
)
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}
func main() {
// Подключение к RabbitMQ
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Не удалось подключиться к RabbitMQ")
defer conn.Close()
// Создание канала, обычно по одному на каждого потребителя
ch, err := conn.Channel()
failOnError(err, "Не удалось открыть канал")
defer ch.Close()
// Объявление обменника
err = ch.ExchangeDeclare(
"tizi365", // Имя обменника, должно совпадать с тем, что используется отправителем сообщения
"fanout", // Тип обменника
true, // Долговечность
false, // Автоматически удалить
false, // Внутренний
false, // Без ожидания
nil, // Аргументы
)
failOnError(err, "Не удалось объявить обменник")
// Объявление очереди для операций
q, err := ch.QueueDeclare(
"", // Имя очереди, если пусто, будет сгенерировано случайное
false, // Долговечность
false, // Удалить при неиспользовании
true, // Исключительность
false, // Без ожидания
nil, // Аргументы
)
failOnError(err, "Не удалось объявить очередь")
// Привязка очереди к указанному обменнику
err = ch.QueueBind(
q.Name, // Имя очереди
"", // Ключ маршрутизации, игнорируется для обменников fanout
"tizi365", // Имя обменника, должно совпадать с тем, что определено отправителем сообщения
false,
nil)
failOnError(err, "Не удалось привязать очередь")
// Создание потребителя
msgs, err := ch.Consume(
q.Name, // Ссылка на имя ранее созданной очереди
"", // Имя потребителя, будет сгенерировано автоматически, если пусто
true, // Авто-подтверждение
false, // Исключительность
false, // Не использовать локально
false, // Без ожидания
nil, // Аргументы
)
failOnError(err, "Не удалось зарегистрировать потребителя")
// Получение сообщений из очереди в цикле
for d := range msgs {
log.Printf("Получено сообщение: %s", d.Body)
}
}
3.5. Несколько потребителей
См. раздел Режим работы и просто запустите несколько потребителей, используя горутины.