RabbitMQ nie obsługuje natywnie wiadomości opóźnionych. Obecnie główne implementacje obejmują użycie wymiany z martwymi wiadomościami + schematu TTL wiadomości lub wtyczki rabbitmq-delayed-message-exchange.
Przykłady użycia opóźnionych kolejek
- Przypadki, w których istnieją wymagania czasowe dotyczące produkcji i konsumpcji wiadomości. Na przykład, w transakcjach e-commerce istnieje scenariusz, w którym zamówienia są zamykane, jeśli płatność nie zostanie dokonana w określonym czasie. W tym przypadku opóźniona wiadomość jest wysyłana po utworzeniu zamówienia. Ta wiadomość zostanie dostarczona do konsumenta 30 minut później, a konsument musi sprawdzić, czy odpowiadające zamówienie zostało opłacone. Jeśli płatność nie została dokonana, zamówienie zostaje zamknięte; jeśli płatność została dokonana, wiadomość jest ignorowana.
- Przypadki, w których opóźnione zadania są wyzwalane przez wiadomości. Na przykład wysyłanie przypomnień do użytkowników po określonym okresie czasu.
Schemat wymiany z martwymi wiadomościami + TTL wiadomości
Główna idea tego podejścia polega na utworzeniu kolejki bez konsumentów i wykorzystaniu czasu wygaśnięcia wiadomości (TTL). Gdy wiadomość wygasa, staje się martwą wiadomością, którą przekierowuje się do wymiany z martwymi wiadomościami, a następnie do kolejki z martwymi wiadomościami, która może być skonsumowana.
W tym podejściu czas wygaśnięcia wiadomości pełni rolę czasu opóźnienia wiadomości. Na przykład, jeśli TTL wiadomości jest ustawione na 30 sekund i nie ma konsumentów dla kolejki, wiadomość wygaśnie po 30 sekundach i stanie się martwą wiadomością, którą obsłuży kolejka z martwymi wiadomościami.
Aby zaimplementować to podejście, właściwości muszą być ustawione zgodnie z opisem w następujących dwóch samouczkach:
Rozwiązanie za pomocą wtyczki do opóźnionych wiadomości
1. Instalacja wtyczki
Repozytorium GitHub rabbitmq-delayed-message-exchange:
https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases
Pobierz plik rabbitmq_delayed_message_exchange-3.8.9-0199d11c.ez z sekcji zasobów na stronie wydania na GitHub i umieść plik w katalogu wtyczek RabbitMQ (katalogi wtyczek).
Uwaga: Numer wersji może się różnić od tego w samouczku. Jeśli używasz najnowszej wersji RabbitMQ, po prostu wybierz najnowszą wersję wtyczki.
2. Aktywacja wtyczki
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
3. Definiowanie wymiany
Ustaw niestandardowe właściwości wymiany, korzystając z x-delayed-type, aby obsługiwać wysyłanie opóźnionych wiadomości.
props := make(map[string]interface{})
// Parametr klucza do obsługi wysyłania opóźnionych wiadomości
props["x-delayed-type"] = "direct"
// Zadeklaruj wymianę
err = ch.ExchangeDeclare(
"opozniona.kolejka", // Nazwa wymiany
"fanout", // Typ wymiany
true, // Trwała
false,
false,
false,
props, // Ustaw właściwości
)
4. Wysyłanie opóźnionych wiadomości
Ustaw czas opóźnienia wiadomości, korzystając z nagłówka wiadomości (x-delay).
msgHeaders := make(map[string]interface{})
// Ustaw czas opóźnienia wiadomości, korzystając z nagłówka wiadomości, w milisekundach
msgHeaders["x-delay"] = 6000
err = ch.Publish(
"opozniona.kolejka", // Nazwa wymiany
"", // Parametr routingu
false,
false,
amqp.Publishing{
Headers: msgHeaders, // Ustaw nagłówki wiadomości
ContentType: "text/plain",
Body: []byte(body),
})
Uwaga: Jeśli korzystasz bezpośrednio z usługi kolejki wiadomości RabbitMQ w chmurze Alibaba Cloud, możesz ustawić czas opóźnienia, korzystając z atrybutu nagłówka wiadomości (delay) bez konieczności instalowania wtyczki. Alibaba Cloud już rozszerzył RabbitMQ w tym celu.