Простой режим очереди в Golang RabbitMQ

Golang RabbitMQ

Пояснение: P представляет производителя, C - потребителя, а красный цвет представляет очередь.

Примечание: Если вы не знакомы с RabbitMQ, пожалуйста, сначала прочтите раздел Основные концепции RabbitMQ.

1. Установка зависимостей

go get github.com/streadway/amqp

Импорт зависимости

import (
  "github.com/streadway/amqp"
)

2. Отправка сообщений

В следующих шагах показано, как производитель сообщений выполняет отправку сообщения.

2.1. Подключение к серверу RabbitMQ

// Подключение к серверу RabbitMQ
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
defer conn.Close()

Объяснение адреса подключения:

amqp://username:password@RabbitMQAddress:port/

2.2. Создание канала

Большинство операций выполняются на канале.

ch, err := conn.Channel()
defer ch.Close()

2.3. Объявление очереди

Представляет очередь, из которой мы собираемся читать или в которую писать.

q, err := ch.QueueDeclare(
  "hello", // Имя очереди
  false,   // Сохранение сообщения
  false,   // Удаление очереди, когда она не используется
  false,   // Исключительность
  false,   // Без ожидания
  nil,     // Аргументы
)

2.4. Отправка сообщений

// Содержимое сообщения
body := "Привет, мир!"

// Отправка сообщения
err = ch.Publish(
  "",     // Обмен (игнорируется здесь)
  q.Name, // Параметр маршрутизации, используется имя очереди в качестве параметра маршрутизации
  false,  // Обязательность
  false,  // Немедленность
  amqp.Publishing {
    ContentType: "text/plain",
    Body:        []byte(body),  // Содержимое сообщения
  })

2.5. Полный код для отправки сообщений

package main

// Импорт пакетов
import (
	"log"
	"github.com/streadway/amqp"
)

// Обработка ошибок
func failOnError(err error, msg string) {
	if err != nil {
		log.Fatalf("%s: %s", msg, err)
	}
}

func main() {
	// Подключение к RabbitMQ
	conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
	failOnError(err, "Не удалось подключиться к RabbitMQ")
	defer conn.Close()

	// Создание канала
	ch, err := conn.Channel()
	failOnError(err, "Не удалось открыть канал")
	defer ch.Close()

	// Объявление очереди для работы
	q, err := ch.QueueDeclare(
		"hello", // Имя
		false,   // Долговечность
		false,   // Удаление при неиспользовании
		false,   // Исключительность
		false,   // Без ожидания
		nil,     // Аргументы
	)
	failOnError(err, "Не удалось объявить очередь")

	// Содержимое сообщения для отправки
	body := "Привет, мир!"

	// Отправка сообщения
	err = ch.Publish(
		"",     // Обмен
		q.Name, // Ключ маршрутизации
		false,  // Обязательность
		false,  // Немедленность
		amqp.Publishing{
			ContentType: "text/plain",
			Body:        []byte(body),
		})
	failOnError(err, "Не удалось опубликовать сообщение")
	log.Printf(" [x] Отправлено %s", body)
}

3. Получение сообщений

Первые три шага получения сообщений аналогичны отправке сообщений, соответствующие разделам 2.1, 2.2 и 2.3 соответственно. Полный код для получения сообщений выглядит следующим образом:

package main

// Импорт пакетов
import (
	"log"
	"github.com/streadway/amqp"
)

// Обработка ошибок
func failOnError(err error, msg string) {
	if err != nil {
		log.Fatalf("%s: %s", msg, err)
	}
}

func main() {
	// Подключение к RabbitMQ
	conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
	failOnError(err, "Не удалось подключиться к RabbitMQ")
	defer conn.Close()

	// Создание канала
	ch, err := conn.Channel()
	failOnError(err, "Не удалось открыть канал")
	defer ch.Close()
	
	// Объявление очереди для работы с ней
	q, err := ch.QueueDeclare(
		"hello", // Имя очереди должно совпадать с именем очереди для отправки сообщений
		false,   // durable
		false,   // delete when unused
		false,   // exclusive
		false,   // no-wait
		nil,     // arguments
	)
	failOnError(err, "Не удалось объявить очередь")

	// Создание потребителя сообщений
	msgs, err := ch.Consume(
		q.Name, // Имя очереди
		"",     // Имя потребителя, если не заполнено, будет автоматически генерироваться уникальный идентификатор
		true,   // Автоматически ли подтверждать получение сообщений, т.е. автоматически уведомлять RabbitMQ о том, что сообщение было успешно обработано
		false,  // exclusive
		false,  // no-local
		false,  // no-wait
		nil,    // args
	)
	failOnError(err, "Не удалось зарегистрировать потребителя")
	
	// Получение сообщений из очереди в цикле
	for d := range msgs {
		// Вывод содержимого сообщения
		log.Printf("Получено сообщение: %s", d.Body)
	}
}