RabbitMQ nie obsługuje natywnie wiadomości opóźnionych. Obecnie główne implementacje obejmują użycie wymiany z martwymi wiadomościami + schematu TTL wiadomości lub wtyczki rabbitmq-delayed-message-exchange.

Przykłady użycia opóźnionych kolejek

  • Przypadki, w których istnieją wymagania czasowe dotyczące produkcji i konsumpcji wiadomości. Na przykład, w transakcjach e-commerce istnieje scenariusz, w którym zamówienia są zamykane, jeśli płatność nie zostanie dokonana w określonym czasie. W tym przypadku opóźniona wiadomość jest wysyłana po utworzeniu zamówienia. Ta wiadomość zostanie dostarczona do konsumenta 30 minut później, a konsument musi sprawdzić, czy odpowiadające zamówienie zostało opłacone. Jeśli płatność nie została dokonana, zamówienie zostaje zamknięte; jeśli płatność została dokonana, wiadomość jest ignorowana.
  • Przypadki, w których opóźnione zadania są wyzwalane przez wiadomości. Na przykład wysyłanie przypomnień do użytkowników po określonym okresie czasu.

Schemat wymiany z martwymi wiadomościami + TTL wiadomości

Główna idea tego podejścia polega na utworzeniu kolejki bez konsumentów i wykorzystaniu czasu wygaśnięcia wiadomości (TTL). Gdy wiadomość wygasa, staje się martwą wiadomością, którą przekierowuje się do wymiany z martwymi wiadomościami, a następnie do kolejki z martwymi wiadomościami, która może być skonsumowana.

W tym podejściu czas wygaśnięcia wiadomości pełni rolę czasu opóźnienia wiadomości. Na przykład, jeśli TTL wiadomości jest ustawione na 30 sekund i nie ma konsumentów dla kolejki, wiadomość wygaśnie po 30 sekundach i stanie się martwą wiadomością, którą obsłuży kolejka z martwymi wiadomościami.

Aby zaimplementować to podejście, właściwości muszą być ustawione zgodnie z opisem w następujących dwóch samouczkach:

Rozwiązanie za pomocą wtyczki do opóźnionych wiadomości

1. Instalacja wtyczki

Repozytorium GitHub rabbitmq-delayed-message-exchange:

https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases

Pobierz plik rabbitmq_delayed_message_exchange-3.8.9-0199d11c.ez z sekcji zasobów na stronie wydania na GitHub i umieść plik w katalogu wtyczek RabbitMQ (katalogi wtyczek).

Uwaga: Numer wersji może się różnić od tego w samouczku. Jeśli używasz najnowszej wersji RabbitMQ, po prostu wybierz najnowszą wersję wtyczki.

2. Aktywacja wtyczki

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

3. Definiowanie wymiany

Ustaw niestandardowe właściwości wymiany, korzystając z x-delayed-type, aby obsługiwać wysyłanie opóźnionych wiadomości.

props := make(map[string]interface{})
// Parametr klucza do obsługi wysyłania opóźnionych wiadomości
props["x-delayed-type"] = "direct"

// Zadeklaruj wymianę
err = ch.ExchangeDeclare(
"opozniona.kolejka", // Nazwa wymiany
"fanout",            // Typ wymiany
true,                // Trwała
false,
false,
false,
props,               // Ustaw właściwości
)

4. Wysyłanie opóźnionych wiadomości

Ustaw czas opóźnienia wiadomości, korzystając z nagłówka wiadomości (x-delay).

msgHeaders := make(map[string]interface{})
// Ustaw czas opóźnienia wiadomości, korzystając z nagłówka wiadomości, w milisekundach
msgHeaders["x-delay"] = 6000

err = ch.Publish(
"opozniona.kolejka", // Nazwa wymiany
"",                  // Parametr routingu
false,
false,
amqp.Publishing{
Headers: msgHeaders,   // Ustaw nagłówki wiadomości
ContentType: "text/plain",
Body: []byte(body),
})

Uwaga: Jeśli korzystasz bezpośrednio z usługi kolejki wiadomości RabbitMQ w chmurze Alibaba Cloud, możesz ustawić czas opóźnienia, korzystając z atrybutu nagłówka wiadomości (delay) bez konieczności instalowania wtyczki. Alibaba Cloud już rozszerzył RabbitMQ w tym celu.