JavaのRabbitMQトピックパターンは、TopicExchangeタイプを使用し、Directのルーティングパターンと異なる点は、ルーティングパラメータが曖昧なマッチングをサポートしていることです。ルーティングのマッチングが柔軟なため、より一般的に使用されるパターンです。アーキテクチャは以下の図の通りです:
ヒント: 使用するRabbitMQの動作パターンに関係なく、違いは使用する交換機のタイプとルーティングパラメータにあります。
1. 前提チュートリアル
関連知識を理解するために、最初に以下のセクションを読んでください:
- RabbitMQ基本概念
- RabbitMQ トピックパターンの原則
- JavaでのRabbitMQクイックスタート (後続のセクションではコードの繰り返しはしないため、必読です)
- Java RabbitMQ パブリッシュ-サブスクライブ パターンセクション (コードの構文はほぼ同じですが、交換タイプとルーティングパラメータが異なります)
2. トピック交換機の定義
Spring AMQPでは、Direct交換機に対応するクラスはTopicExchangeです。ExchangeはSpring Bootの設定クラスを介して定義します。
package com.tizi365.rabbitmq.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class QueueConfig {
@Bean
public TopicExchange topic() {
// 交換機を定義
// パラメータは交換機の名前であり、一意である必要があります
return new TopicExchange("tizi365.topic");
}
}
ヒント: メッセージの生産者と消費者の両方が交換機が必要です。
3. メッセージの送信
メッセージを交換機に送信し、交換機はルーティングルールに基づいてメッセージを対応するキューに配信します。
package com.tizi365.rabbitmq.service;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
@Service
public class SendService {
@Autowired
private RabbitTemplate template;
@Autowired
private TopicExchange topic;
// デモ目的で、定格タスクを使用して毎秒メッセージを送信します
@Scheduled(fixedDelay = 1000, initialDelay = 1000)
public void send() {
// メッセージの内容
String message = "Hello World!";
// メッセージを送信
// 第一パラメータは交換機の名前
// 第二パラメータはルーティングキーです。トピック交換機はルーティングキーに一致するキューにメッセージを配信します
// 第三パラメータはメッセージの内容で、シリアル化できる限り、任意のタイプをサポートします
template.convertAndSend(topic.getName(), "www.tizi365.com", message);
System.out.println("Message sent: '" + message + "'");
}
}
ヒント: convertAndSendメソッドの第二パラメータ "www.tizi365.com" に注意してください。これは重要なパラメータです。
4. メッセージの受信
4.1 キューの定義とエクスチェンジへのバインド
メッセージをキューから消費するためには、まずキューを定義し、その後そのキューをターゲットエクスチェンジにバインドする必要があります。以下では、2つのキューが定義され、同じエクスチェンジにバインドされています。
package com.tizi365.rabbitmq.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class QueueConfig {
@Bean
public TopicExchange topic() {
// エクスチェンジを定義
// パラメータは一意である必要があるエクスチェンジ名です
return new TopicExchange("tizi365.topic");
}
@Bean
public Queue queue1() {
// キュー1を定義
return new Queue("tizi365.topic.queue1");
}
@Bean
public Queue queue2() {
// キュー2を定義
return new Queue("tizi365.topic.queue2");
}
@Bean
public Binding binding1(TopicExchange topic, Queue queue1) {
// バインド関係を定義し、キュー1を次のルーティングキーでエクスチェンジにバインドします: *.tizi365.com
return BindingBuilder.bind(queue1).to(topic).with("*.tizi365.com");
}
@Bean
public Binding binding2(TopicExchange topic, Queue queue2) {
// バインド関係を定義し、キュー2を次のルーティングキーでエクスチェンジにバインドします: *.baidu.com
return BindingBuilder.bind(queue2).to(topic).with("*.baidu.com");
}
}
ヒント: キュー1とキュー2をエクスチェンジにバインドする際、利用するルーティングキーはどちらも *(ワイルドカード)を使用しており、これは1つの単語とマッチします。もし #(ハッシュ)に変更すると、複数の単語とマッチします。
4.2 キューリスナーの定義
特定のキューからメッセージを消費するためにRabbitListenerアノテーションを使用してメッセージリスナーを定義します。
package com.tizi365.rabbitmq.listener;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;
// Springに現在のクラスを管理させる
@Component
public class DemoListener {
// リスナーを定義し、queuesパラメータを通じてどのキューをリッスンするかを指定します
@RabbitListener(queues = "tizi365.topic.queue1")
public void receive1(String msg) {
System.out.println("キュー1からメッセージを受信: " + msg);
}
// リスナーを定義し、queuesパラメータを通じてどのキューをリッスンするかを指定します
@RabbitListener(queues = "tizi365.topic.queue2")
public void receive2(String msg) {
System.out.println("キュー2からメッセージを受信: " + msg);
}
}
キュー1はエクスチェンジにバインドする際、ルーティングキーが*.tizi365.comに設定されているため、ルーティングキー(www.tizi365.com)にマッチするメッセージのみキュー1が受信できます。そのため、キュー2は一致しないルーティングキーのため、メッセージを受信しません。