RabbitMQのルーティングパターン(ダイレクトモード)は、JavaでDirectExchangeタイプの交換箇所を使用します。パブリッシュ/サブスクライブパターンとの違いは、Direct交換が完全に一致するルーティングパラメータを持つキューにメッセージを配信することです。下の画像はその構造を示しています。
ヒント: 使用するRabbitMQのワーキングモードに関係なく、違いは使用する交換箇所のタイプとルーティングパラメータにあります。
1. 必須チュートリアル
関連する知識を理解するために、次のセクションを読んでください:
- RabbitMQ基本概念
- RabbitMQのルーティングモード原則
- RabbitMQを使ったJavaのクイックスタート(後続のセクションではコードを重複させず、重要なコードのみを表示しますので、必読です)
- RabbitMQを使ったJavaのパブリッシュ/サブスクライブパターン(交換箇所のタイプとルーティングパラメータが異なるだけで、コード構造はほぼ同じですので、必読です)
2. ダイレクト交換箇所の定義
Spring AMQPでは、Direct交換箇所に対応するクラスはDirectExchangeです。交換箇所はSpring Bootの構成クラスを通じて定義します。
package com.tizi365.rabbitmq.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class QueueConfig {
@Bean
public DirectExchange direct() {
// 交換箇所を定義
// パラメータは交換箇所の名前であり、ユニークである必要があります
return new DirectExchange("tizi365.direct");
}
}
ヒント: メッセージプロデューサーとコンシューマーの両方で交換箇所が必要です。
3. メッセージの送信
メッセージを交換箇所に送信し、その後ルーティングルールに基づいてメッセージを対応するキューに配信します。
package com.tizi365.rabbitmq.service;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
@Service
public class SendService {
@Autowired
private RabbitTemplate template;
@Autowired
private DirectExchange direct;
// デモンストレーションのため、1秒ごとにメッセージを送信する予定のタスクを使用します
@Scheduled(fixedDelay = 1000, initialDelay = 1000)
public void send() {
// メッセージの内容
String message = "こんにちは、世界!";
// メッセージを送信
// 第1パラメータは交換箇所の名前
// 第2パラメータはルーティングキーです。ダイレクト交換箇所では、このメッセージは"tizi365"に一致するルーティングキーを持つキューに配信されます
// 第3パラメータはメッセージの内容であり、シリアル化できる限り、どの型でもサポートされます
template.convertAndSend(direct.getName(), "tizi365", message);
System.out.println("メッセージ '" + message + "'を送信しました");
}
}
ヒント: convertAndSendメソッドの第2パラメータに注意してください。これは重要なパラメータです。
4. メッセージの受信
4.1 キューの定義とエクスチェンジのバインド
メッセージをキューから消費するためには、まずキューを定義し、その後にそのキューをターゲットのエクスチェンジにバインドする必要があります。 次のコードは、2つのキューを定義し、それらを同じエクスチェンジにバインドします:
package com.tizi365.rabbitmq.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class QueueConfig {
@Bean
public DirectExchange direct() {
// エクスチェンジを定義
// パラメータはエクスチェンジ名で、ユニークである必要があります
return new DirectExchange("tizi365.direct");
}
@Bean
public Queue queue1() {
// キュー1を定義
return new Queue("tizi365.direct.queue1");
}
@Bean
public Queue queue2() {
// キュー2を定義
return new Queue("tizi365.direct.queue2");
}
@Bean
public Binding binding1(DirectExchange direct, Queue queue1) {
// バインディングを定義し、ルーティングキーが "tizi365" の場合にキュー1をダイレクトエクスチェンジにバインドする
// ルーティングキーが "tizi365" に一致すると、エクスチェンジはメッセージをキュー1に配信します
return BindingBuilder.bind(queue1).to(direct).with("tizi365");
}
@Bean
public Binding binding2(DirectExchange direct, Queue queue2) {
// バインディングを定義し、ルーティングキーが "baidu" の場合にキュー2をダイレクトエクスチェンジにバインドする
// ルーティングキーが "baidu" に一致すると、エクスチェンジはメッセージをキュー2に配信します
return BindingBuilder.bind(queue2).to(direct).with("baidu");
}
}
4.2 キューリスナーの定義
特定のキューからメッセージを消費するために、RabbitListenerアノテーションを使用してメッセージリスナーを定義します:
package com.tizi365.rabbitmq.listener;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;
// Springに現在のクラスを管理させます
@Component
public class DemoListener {
// キュー1からメッセージを消費するリスナーを定義
@RabbitListener(queues = "tizi365.direct.queue1")
public void receive1(String msg) {
System.out.println("キュー1からメッセージを受信しました:" + msg);
}
// キュー2からメッセージを消費するリスナーを定義
@RabbitListener(queues = "tizi365.direct.queue2")
public void receive2(String msg) {
System.out.println("キュー2からメッセージを受信しました:" + msg);
}
}
ルーティングキーが "tizi365" に一致するため、エクスチェンジはキュー1だけにメッセージを配信します。ルーティングキーが一致しないため、キュー2はメッセージを受信しません。