RabbitMQ no admite nativamente mensajes demorados. Actualmente, las principales implementaciones incluyen el uso de un intercambio de mensajes inactivos + esquema de TTL de mensaje o el complemento rabbitmq-delayed-message-exchange.

Casos de uso para colas demoradas

  • Escenarios en los que hay requisitos de ventana de tiempo para la producción y consumo de mensajes. Por ejemplo, en transacciones de comercio electrónico, hay un escenario en el que los pedidos se cierran si el pago no se completa dentro de un período de tiempo de espera. En este caso, se envía un mensaje demorado cuando se crea el pedido. Este mensaje se entregará al consumidor 30 minutos después, y el consumidor debe verificar si el pedido correspondiente ha sido pagado. Si el pago no se ha completado, se cierra el pedido; si se ha completado el pago, se ignora el mensaje.
  • Escenarios en los que las tareas demoradas son desencadenadas por mensajes. Por ejemplo, enviar mensajes recordatorios a los usuarios después de un período de tiempo especificado.

Esquema de intercambio de mensajes inactivos + TTL de mensaje

La idea central de este enfoque es crear una cola sin consumidores y utilizar el tiempo de expiración del mensaje (TTL). Cuando un mensaje expira, se convierte en un mensaje inactivo, que se enruta a un intercambio de mensajes inactivos y luego a una cola de mensajes inactivos, que se puede consumir.

En este enfoque, el tiempo de expiración del mensaje sirve como tiempo de demora del mensaje. Por ejemplo, si se establece el TTL del mensaje en 30 segundos y no hay consumidores para la cola, el mensaje expirará después de 30 segundos y se convertirá en un mensaje inactivo, que será manejado por la cola de mensajes inactivos.

Para implementar este enfoque, se deben establecer las propiedades como se describe en los siguientes dos tutoriales:

Solución del complemento de mensajes demorados

1. Instalación del complemento

Repositorio de GitHub para rabbitmq-delayed-message-exchange:

https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases

Descargue el archivo rabbitmq_delayed_message_exchange-3.8.9-0199d11c.ez de la sección de activos de la página de lanzamiento en GitHub y colóquelo en el directorio de complementos de RabbitMQ (directorio de complementos).

Nota: El número de versión puede ser diferente al de este tutorial. Si su RabbitMQ es la última versión, simplemente elija la última versión del complemento.

2. Activación del complemento

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

3. Definición del intercambio

Establezca propiedades de intercambio personalizadas utilizando x-delayed-type para admitir el envío de mensajes demorados.

props := make(map[string]interface{})
// Parámetro clave para admitir el envío de mensajes demorados
props["x-delayed-type"] = "direct"

// Declare el intercambio
err = ch.ExchangeDeclare(
"cola-demorada", // Nombre del intercambio
"fanout",        // Tipo de intercambio
true,            // Duradero
false,
false,
false,
props,           // Establecer propiedades
)

4. Envío de mensajes demorados

Establezca el tiempo de demora del mensaje utilizando el encabezado del mensaje (x-delay).

msgHeaders := make(map[string]interface{})
// Establecer el tiempo de demora del mensaje utilizando el encabezado del mensaje, en milisegundos
msgHeaders["x-delay"] = 6000

err = ch.Publish(
"cola-demorada", // Nombre del intercambio
"",              // Parámetro de enrutamiento
false,
false,
amqp.Publishing{
Headers: msgHeaders,   // Establecer encabezados del mensaje
ContentType: "text/plain",
Body: []byte(body),
})

Nota: Si está utilizando el servicio de Cola de Mensajes de RabbitMQ de Alibaba Cloud directamente, puede establecer el tiempo de demora utilizando el atributo de encabezado del mensaje (delay) sin instalar el complemento. Alibaba Cloud ya ha ampliado RabbitMQ con este propósito.