В RabbitMQ, когда сообщение становится мертвым (т.е. сообщение, которое потребители не могут обработать) в очереди, оно перенаправляется в другой обмен, который мы называем обменом мертвых сообщений. Обмен мертвых сообщений затем доставляет мертвое сообщение в очередь, которая называется очередью мертвых сообщений.

Иллюстрация очереди мертвых сообщений

Очередь мертвых сообщений

Вышеуказанная иллюстрация описывает весь процесс от создания мертвых сообщений до их обработки.

Создание мертвых сообщений

Следующие условия приводят к возникновению мертвых сообщений:

  • Сообщение отклонено вручную потребителем (basic.reject / basic.nack), и requeue = false.
  • Истекло время жизни сообщения (TTL).
  • Очередь достигла максимальной длины.

Шаги обработки очереди мертвых сообщений

  1. Определите обмен мертвых сообщений (не путайтесь с названием, это просто обычный обмен, он называется так только в контексте обработки мертвых сообщений).
  2. Определите очередь для привязки к обмену мертвых сообщений (эта очередь называется очередью мертвых сообщений и также является обычной очередью).
  3. Определите потребителя мертвых сообщений для обработки очереди мертвых сообщений (не путайтесь с названием, это также обычный потребитель).
  4. Привяжите обмен мертвых сообщений к указанной очереди (очередь, которая должна обрабатывать мертвые сообщения, должна быть привязана).

Совет: Обратитесь к иллюстрации выше для принципа. Все языки программирования обрабатывают очереди мертвых сообщений аналогичным образом.

Обработка очереди мертвых сообщений на Golang

1. Определение обмена мертвых сообщений

Определите его как обычный обмен.

// Объявление обмена
err = ch.ExchangeDeclare(
    "tizi365.dead",   // Имя обмена
    "topic", // Тип обмена
    true,     // Долговечный
    false,
    false,
    false,
    nil,
)

2. Определение очереди мертвых сообщений

Определите его как обычную очередь.

    // Объявление очереди
    q, err := ch.QueueDeclare(
        "",    // Имя очереди, оставьте пустым, чтобы сгенерировать случайное
        false, // Долговечная очередь
        false,
        true,
        false,
        nil,
    )

    // Привязка очереди к обмену мертвых сообщений
    err = ch.QueueBind(
        q.Name, // Имя очереди
        "#",     // Ключ маршрутизации, # означает соответствие всем ключам маршрутизации, что означает получение всех сообщений мертвых сообщений
        "tizi365.dead", // Имя обмена мертвых сообщений
        false,
        nil)

Совет: Рассматривайте очередь мертвых сообщений как обычную очередь.

3. Определение потребителя мертвых сообщений

// Создание потребителя
msgs, err := ch.Consume(
    q.Name, // Ссылка на предыдущее имя очереди мертвых сообщений
    "",     // Имя потребителя, если не указано, будет сгенерировано случайное
    true,   // Авто-подтверждение обработки сообщения
    false, 
    false, 
    false, 
    nil,
)

// Цикл для потребления сообщений из очереди мертвых сообщений
for d := range msgs {
    log.Printf("Получено мертвое сообщение=%s", d.Body)
}

4. Привязка обмена мертвых сообщений к конкретной очереди

	// Свойства очереди
	props := make(map[string]interface{})
	// Привязка обмена мертвых сообщений
	props["x-dead-letter-exchange"] = "tizi365.dead"
	// Необязательно: установить ключ маршрутизации при доставке мертвого сообщения в обмен мертвых сообщений. Если не установлен, будет использоваться ключ маршрутизации исходного сообщения.
	// props["x-dead-letter-routing-key"] = "www.tizi365.com"

	q, err := ch.QueueDeclare(
		"tizi365.demo.hello", // Имя очереди
		true,   // Долговечная
		false, 
		false, 
		false,   
		props,     // Установка свойств очереди
	)

Таким образом, если сообщения в очереди tizi365.demo.hello станут мертвыми, они будут перенаправлены в обмен мертвых сообщений tizi365.dead.