RabbitMQ non supporta nativamente i messaggi ritardati. Attualmente, le principali implementazioni includono l'uso di uno scambio di messaggi non recapitabili (dead-letter exchange) + uno schema TTL dei messaggi o il plugin rabbitmq-delayed-message-exchange.
Casi d'uso per le code ritardate
- Scenario in cui sono richiesti intervalli temporali per la produzione e il consumo di messaggi. Ad esempio, nelle transazioni di e-commerce, esiste uno scenario in cui gli ordini vengono chiusi se il pagamento non viene completato entro un certo periodo di timeout. In questo caso, viene inviato un messaggio ritardato quando l'ordine viene creato. Questo messaggio verrà consegnato al consumatore dopo 30 minuti e il consumatore deve verificare se l'ordine corrispondente è stato pagato. Se il pagamento non è stato completato, l'ordine viene chiuso; se il pagamento è stato completato, il messaggio viene ignorato.
- Scenari in cui i compiti ritardati vengono attivati dai messaggi. Ad esempio, l'invio di messaggi promemoria agli utenti dopo un periodo di tempo specificato.
Schema TTL dei messaggi + scambio di messaggi non recapitabili
L'idea principale di questo approccio è quella di creare una coda senza consumatori e utilizzare il tempo di scadenza del messaggio (TTL). Quando un messaggio scade, diventa un messaggio non recapitabile, che viene instradato verso uno scambio di messaggi non recapitabili e quindi verso una coda di messaggi non recapitabili, che può essere consumata.
In questo approccio, il tempo di scadenza del messaggio funge da tempo di ritardo del messaggio. Ad esempio, se il TTL del messaggio è impostato su 30 secondi e non ci sono consumatori per la coda, il messaggio scadrà dopo 30 secondi e diventerà un messaggio non recapitabile, che verrà gestito dalla coda di messaggi non recapitabili.
Per implementare questo approccio, è necessario impostare le proprietà come descritto nei seguenti due tutorial:
Soluzione del plugin per i messaggi ritardati
1. Installazione del plugin
Repository GitHub di rabbitmq-delayed-message-exchange:
https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases
Scarica il file rabbitmq_delayed_message_exchange-3.8.9-0199d11c.ez dalla sezione degli asset della pagina di rilascio su GitHub e posizionalo nella directory dei plugin di RabbitMQ (directory dei plugin).
Nota: Il numero di versione può essere diverso da quello di questo tutorial. Se il tuo RabbitMQ è l'ultima versione, scegli semplicemente l'ultima versione del plugin.
2. Attivazione del plugin
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
3. Definizione dello scambio
Imposta le proprietà personalizzate dello scambio usando x-delayed-type per supportare l'invio di messaggi ritardati.
props := make(map[string]interface{})
// Parametro chiave per supportare l'invio di messaggi ritardati
props["x-delayed-type"] = "direct"
// Dichiarare lo scambio
err = ch.ExchangeDeclare(
"delay.queue", // Nome dello scambio
"fanout", // Tipo di scambio
true, // Durabilità
false,
false,
false,
props, // Impostare le proprietà
)
4. Invio di messaggi ritardati
Imposta il tempo di ritardo del messaggio utilizzando l'intestazione del messaggio (x-delay).
msgHeaders := make(map[string]interface{})
// Imposta il tempo di ritardo del messaggio utilizzando l'intestazione del messaggio, in millisecondi
msgHeaders["x-delay"] = 6000
err = ch.Publish(
"delay.queue", // Nome dello scambio
"", // Parametro di instradamento
false,
false,
amqp.Publishing{
Headers: msgHeaders, // Impostare gli header del messaggio
ContentType: "text/plain",
Body: []byte(body),
})
Nota: Se stai utilizzando il servizio di coda di messaggi RabbitMQ di Alibaba Cloud direttamente, puoi impostare il tempo di ritardo utilizzando l'attributo dell'intestazione del messaggio (delay) senza installare il plugin. Alibaba Cloud ha già esteso RabbitMQ a tal fine.