RabbitMQは遅延メッセージをネイティブでサポートしていません。現在、主要な実装方法には、デッドレターエクスチェンジ+メッセージTTLスキーム、またはrabbitmq-delayed-message-exchangeプラグインがあります。
遅延キューの使用事例
- メッセージの生成と消費に時間ウィンドウの要件があるシナリオ。たとえば、eコマース取引では、支払いがタイムアウト期間内に完了しない場合、注文がキャンセルされるシナリオがあります。この場合、注文が作成された時点で遅延メッセージが送信されます。このメッセージは消費者には30分後に配信され、消費者は対応する注文が支払われたかどうかを確認する必要があります。支払いが完了していない場合、注文はキャンセルされます。支払いが完了している場合、メッセージは無視されます。
- メッセージによって遅延タスクがトリガーされるシナリオ。たとえば、特定の時間経過後にユーザーにリマインダーメッセージを送信する場合。
デッドレターエクスチェンジ+メッセージTTLスキーム
このアプローチの中心的な考え方は、消費者のいないキューを作成し、メッセージの有効期限時間(TTL)を利用することです。メッセージが有効期限切れになると、それはデッドレターになり、デッドレターエクスチェンジにルーティングされ、その後デッドレターキューにルーティングされ、それを消費できます。
このアプローチでは、メッセージの有効期限時間がメッセージの遅延時間として機能します。たとえば、メッセージのTTLが30秒に設定されており、キューに消費者がいない場合、メッセージは30秒後に有効期限切れになり、デッドレターとなり、デッドレターキューで処理されます。
このアプローチを実装するには、次の2つのチュートリアルで説明されているプロパティを設定する必要があります:
遅延メッセージプラグインソリューション
1. プラグインのインストール
rabbitmq-delayed-message-exchange GitHub リポジトリ:
https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases
GitHubのリリースページのアセットセクションからrabbitmq_delayed_message_exchange-3.8.9-0199d11c.ezファイルをダウンロードし、RabbitMQプラグインディレクトリ(pluginsディレクトリ)に配置します。
注意:バージョン番号はこのチュートリアルのものと異なる場合があります。RabbitMQが最新バージョンである場合は、プラグインの最新バージョンを選択してください。
2. プラグインの有効化
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
3. エクスチェンジの定義
遅延メッセージの送信をサポートするために、x-delayed-typeを使ってカスタムエクスチェンジプロパティを設定します。
props := make(map[string]interface{})
// 遅延メッセージの送信をサポートするためのキーのパラメータ
props["x-delayed-type"] = "direct"
// エクスチェンジを宣言
err = ch.ExchangeDeclare(
"delay.queue", // エクスチェンジ名
"fanout", // エクスチェンジの種類
true, // 耐久性
false,
false,
false,
props, // プロパティを設定
)
4. 遅延メッセージの送信
メッセージヘッダー(x-delay)を使用してメッセージの遅延時間を設定します。
msgHeaders := make(map[string]interface{})
// メッセージヘッダーを使用してメッセージの遅延時間をミリ秒単位で設定
msgHeaders["x-delay"] = 6000
err = ch.Publish(
"delay.queue", // エクスチェンジ名
"", // ルーティングパラメータ
false,
false,
amqp.Publishing{
Headers: msgHeaders, // メッセージヘッダーを設定
ContentType: "text/plain",
Body: []byte(body),
})
注意:Alibaba CloudのRabbitMQメッセージキューサービスを直接使用する場合、プラグインをインストールせずにメッセージヘッダー属性(delay)を使用して遅延時間を設定できます。Alibaba Cloudはすでにこの目的のためにRabbitMQを拡張しています。