В RabbitMQ, когда сообщение становится мертвым (т.е. сообщение, которое потребители не могут обработать) в очереди, оно перенаправляется в другой обмен, который мы называем обменом мертвых сообщений. Обмен мертвых сообщений затем доставляет мертвое сообщение в очередь, которая называется очередью мертвых сообщений.
Иллюстрация очереди мертвых сообщений
Вышеуказанная иллюстрация описывает весь процесс от создания мертвых сообщений до их обработки.
Создание мертвых сообщений
Следующие условия приводят к возникновению мертвых сообщений:
- Сообщение отклонено вручную потребителем (basic.reject / basic.nack), и requeue = false.
- Истекло время жизни сообщения (TTL).
- Очередь достигла максимальной длины.
Шаги обработки очереди мертвых сообщений
- Определите обмен мертвых сообщений (не путайтесь с названием, это просто обычный обмен, он называется так только в контексте обработки мертвых сообщений).
- Определите очередь для привязки к обмену мертвых сообщений (эта очередь называется очередью мертвых сообщений и также является обычной очередью).
- Определите потребителя мертвых сообщений для обработки очереди мертвых сообщений (не путайтесь с названием, это также обычный потребитель).
- Привяжите обмен мертвых сообщений к указанной очереди (очередь, которая должна обрабатывать мертвые сообщения, должна быть привязана).
Совет: Обратитесь к иллюстрации выше для принципа. Все языки программирования обрабатывают очереди мертвых сообщений аналогичным образом.
Обработка очереди мертвых сообщений на Golang
1. Определение обмена мертвых сообщений
Определите его как обычный обмен.
// Объявление обмена
err = ch.ExchangeDeclare(
"tizi365.dead", // Имя обмена
"topic", // Тип обмена
true, // Долговечный
false,
false,
false,
nil,
)
2. Определение очереди мертвых сообщений
Определите его как обычную очередь.
// Объявление очереди
q, err := ch.QueueDeclare(
"", // Имя очереди, оставьте пустым, чтобы сгенерировать случайное
false, // Долговечная очередь
false,
true,
false,
nil,
)
// Привязка очереди к обмену мертвых сообщений
err = ch.QueueBind(
q.Name, // Имя очереди
"#", // Ключ маршрутизации, # означает соответствие всем ключам маршрутизации, что означает получение всех сообщений мертвых сообщений
"tizi365.dead", // Имя обмена мертвых сообщений
false,
nil)
Совет: Рассматривайте очередь мертвых сообщений как обычную очередь.
3. Определение потребителя мертвых сообщений
// Создание потребителя
msgs, err := ch.Consume(
q.Name, // Ссылка на предыдущее имя очереди мертвых сообщений
"", // Имя потребителя, если не указано, будет сгенерировано случайное
true, // Авто-подтверждение обработки сообщения
false,
false,
false,
nil,
)
// Цикл для потребления сообщений из очереди мертвых сообщений
for d := range msgs {
log.Printf("Получено мертвое сообщение=%s", d.Body)
}
4. Привязка обмена мертвых сообщений к конкретной очереди
// Свойства очереди
props := make(map[string]interface{})
// Привязка обмена мертвых сообщений
props["x-dead-letter-exchange"] = "tizi365.dead"
// Необязательно: установить ключ маршрутизации при доставке мертвого сообщения в обмен мертвых сообщений. Если не установлен, будет использоваться ключ маршрутизации исходного сообщения.
// props["x-dead-letter-routing-key"] = "www.tizi365.com"
q, err := ch.QueueDeclare(
"tizi365.demo.hello", // Имя очереди
true, // Долговечная
false,
false,
false,
props, // Установка свойств очереди
)
Таким образом, если сообщения в очереди tizi365.demo.hello станут мертвыми, они будут перенаправлены в обмен мертвых сообщений tizi365.dead.