Паттерн Публикация/Подписка в RabbitMQ (Режим трансляции, Режим мультикастинга)

Паттерн публикации/подписки в RabbitMQ означает, что сообщение, отправленное производителем, будет обработано несколькими потребителями.

Режим мультикастинга

Пояснение:

  • P представляет производителя, C1 и C2 представляют потребителей, красный цвет представляет очереди, а X представляет обмен.
  • Обмен отвечает за пересылку сообщений во все очереди, привязанные к этому обмену.
  • Можно определить несколько очередей, каждая из которых привязана к тому же обмену.
  • У каждой очереди может быть один или более потребителей.

Примечание: Если вы не знакомы с RabbitMQ, пожалуйста, сначала прочтите раздел Основные концепции RabbitMQ.

1. Установить зависимость пакета

go get github.com/streadway/amqp

2. Отправка сообщений

Ниже приведены шаги для демонстрации отправки сообщений производителем.

2.1. Подключение к серверу RabbitMQ

// Подключение к серверу RabbitMQ
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
defer conn.Close()

Объяснение адреса подключения:

amqp://имяпользователя:пароль@АдресRabbitMQ:Порт/

2.2. Создание канала

Большинство операций выполняются на канале.

ch, err := conn.Channel()
defer ch.Close()

2.3. Объявление обмена

Сообщения сначала отправляются на обмен. Обмен пересылает сообщения в очереди на основе своей стратегии.

err = ch.ExchangeDeclare(
	"tizi365",   // Имя обмена
	"fanout", // Тип обмена, здесь используется тип fanout, т.е. режим публикации/подписки
	true,     // Устойчивый
	false,    // Автоудаление
	false,    // Внутренний
	false,    // Без ожидания
	nil,      // Аргументы
)

2.4. Опубликовать сообщение

// Содержимое сообщения
body := "Привет Tizi365.com!"

// Опубликовать сообщение
err = ch.Publish(
  "tizi365",     // Обмен (имя обмена, соответствующее предыдущему объявлению)
  "", // Маршрутизационный ключ, для обмена типа fanout маршрутизационный ключ автоматически игнорируется, поэтому его указывать не обязательно
  false,  // Обязательный
  false,  // Немедленный
  amqp.Publishing {
    ContentType: "text/plain", // Тип содержимого сообщения, здесь это обычный текст
    Body:        []byte(body),  // Содержимое сообщения
  })

2.5. Завершить код для отправки сообщения

package main

import (
	"log"
	"github.com/streadway/amqp"
)

func failOnError(err error, msg string) {
	if err != nil {
		log.Fatalf("%s: %s", msg, err)
	}
}

func main() {
	// Подключение к RabbitMQ
	conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
	failOnError(err, "Не удалось подключиться к RabbitMQ")
	defer conn.Close()

	// Создание канала
	ch, err := conn.Channel()
	failOnError(err, "Не удалось открыть канал")
	defer ch.Close()

	// Объявление обмена
	err = ch.ExchangeDeclare(
		"tizi365",   // Имя обмена
		"fanout", // Тип обмена, fanout для режима публикации/подписки
		true,     // Устойчивый
		false,    // Автоудаление
		false,    // Внутренний
		false,    // Без ожидания
		nil,      // Аргументы
	)
	failOnError(err, "Не удалось объявить обмен")

	// Содержимое сообщения
	body := "Привет Tizi365.com!"
	// Отправить сообщение
	err = ch.Publish(
		"tizi365",     // Обмен (соответствует объявлению выше)
		"", // Маршрутизационный ключ, для обменов типа fanout маршрутизационный ключ автоматически игнорируется
		false,  // Обязательный
		false,  // Немедленный
		amqp.Publishing {
			ContentType: "text/plain", // Тип содержимого сообщения, здесь обычный текст
			Body:        []byte(body),  // Содержимое сообщения
		})

	log.Printf("Отправлено содержимое %s", body)
}

3. Получение сообщений

Первые три шага для получения сообщений — подключение к RabbitMQ, создание канала и объявление обмена — такие же, как при отправке сообщений. См. предыдущие разделы 2.1, 2.2 и 2.3.

3.1. Объявление очереди

Объявим очередь для операций

q, err := ch.QueueDeclare(
		"",    // Имя очереди, если не указано, будет сгенерировано случайное
		false, // Долговечность
		false, // Удалить при неиспользовании
		true,  // Исключительность
		false, // Без ожидания
		nil,   // Аргументы
	)

3.2. Привязка очереди к обменнику

Чтобы получать сообщения, очередь должна быть привязана к обменнику

err = ch.QueueBind(
		q.Name, // Имя очереди
		"",     // Ключ маршрутизации, для обменников типа fanout ключ маршрутизации автоматически игнорируется
		"tizi365", // Имя обменника, должно совпадать с тем, что задан отправителем сообщения
		false,
		nil)

Примечание: В реальных приложениях мы можем определить N очередей, каждая привязана к тому же обменнику, чтобы получать сообщения, пересылаемые обменником. Здесь проявляется паттерн издатель/подписчик.

3.3. Создание потребителя

msgs, err := ch.Consume(
		q.Name, // Ссылка на имя очереди из предыдущего шага
		"",     // Имя потребителя, если не указано, будет сгенерировано автоматически
		true,   // Автоматически подтверждать обработку сообщения
		false,  // Исключительность
		false,  // Не использовать локально
		false,  // Без ожидания
		nil,    // Аргументы
	)
	
// Цикл обработки сообщений
for d := range msgs {
	log.Printf("Получено сообщение=%s", d.Body)
}

3.4. Завершение кода потребителя

package main

import (
	"log"

	"github.com/streadway/amqp"
)

func failOnError(err error, msg string) {
	if err != nil {
		log.Fatalf("%s: %s", msg, err)
	}
}

func main() {
	// Подключение к RabbitMQ
	conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
	failOnError(err, "Не удалось подключиться к RabbitMQ")
	defer conn.Close()

	// Создание канала, обычно по одному на каждого потребителя
	ch, err := conn.Channel()
	failOnError(err, "Не удалось открыть канал")
	defer ch.Close()

	// Объявление обменника
	err = ch.ExchangeDeclare(
		"tizi365",   // Имя обменника, должно совпадать с тем, что используется отправителем сообщения
		"fanout", // Тип обменника
		true,     // Долговечность
		false,    // Автоматически удалить
		false,    // Внутренний
		false,    // Без ожидания
		nil,      // Аргументы
	)
	failOnError(err, "Не удалось объявить обменник")

	// Объявление очереди для операций
	q, err := ch.QueueDeclare(
		"",    // Имя очереди, если пусто, будет сгенерировано случайное
		false, // Долговечность
		false, // Удалить при неиспользовании
		true,  // Исключительность
		false, // Без ожидания
		nil,   // Аргументы
	)
	failOnError(err, "Не удалось объявить очередь")

	// Привязка очереди к указанному обменнику
	err = ch.QueueBind(
		q.Name, // Имя очереди
		"",     // Ключ маршрутизации, игнорируется для обменников fanout
		"tizi365", // Имя обменника, должно совпадать с тем, что определено отправителем сообщения
		false,
		nil)
	failOnError(err, "Не удалось привязать очередь")

	// Создание потребителя
	msgs, err := ch.Consume(
		q.Name, // Ссылка на имя ранее созданной очереди
		"",     // Имя потребителя, будет сгенерировано автоматически, если пусто
		true,   // Авто-подтверждение
		false,  // Исключительность
		false,  // Не использовать локально
		false,  // Без ожидания
		nil,    // Аргументы
	)
	failOnError(err, "Не удалось зарегистрировать потребителя")

	// Получение сообщений из очереди в цикле
	for d := range msgs {
		log.Printf("Получено сообщение: %s", d.Body)
	}
}

3.5. Несколько потребителей

См. раздел Режим работы и просто запустите несколько потребителей, используя горутины.