Java RabbitMQ パブリッシュ/サブスクライブ パターン(ブロードキャストモード、ファンアウトモード)
FanoutExchange 交換タイプは、RabbitMQ におけるパブリッシュ/サブスクライブ パターンに使用されます。ここではプロデューサーによって送信されるメッセージが複数のコンシューマーキューによって処理されるアーキテクチャが以下の図に示されています。
Fanout 交換はメッセージをすべてのバインドされたキューに転送できます。
ヒント: 使用する RabbitMQ の動作モードに関係なく、違いは使用する交換タイプとルーティング パラメーターにあります。
1. 必須チュートリアル
関連する知識を理解するために、まず以下のセクションを読んでください。
- RabbitMQ 基本概念
- RabbitMQ パブリッシュ/サブスクライブ パターン
- RabbitMQ Java クイックスタートガイド(後続のセクションではコードを繰り返し示さず、主要なコードスニペットのみを表示します)
2. ファンアウト交換の定義
Spring AMQP では、FanoutExchange クラスがファンアウト 交換に対応しています。Exchange は Spring Boot 設定クラスを介して定義します。
package com.tizi365.rabbitmq.config;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class QueueConfig {
// 交換を定義します
@Bean
public FanoutExchange fanout() {
// パラメーターはユニークである必要がある交換の名前です
return new FanoutExchange("tizi365.fanout");
}
}
ヒント: メッセージのプロデューサーとコンシューマーの両方が交換を必要とします。
3. メッセージの送信
メッセージを交換に送信し、ルーティング規則に基づいてメッセージを対応するキューに配信します。
package com.tizi365.rabbitmq.service;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
@Service
public class SendService {
@Autowired
private RabbitTemplate template;
@Autowired
private FanoutExchange fanout;
// デモ目的のため、スケジュールされたタスクを使用してメッセージを毎秒送信します
@Scheduled(fixedDelay = 1000, initialDelay = 1000)
public void send() {
// メッセージの内容
String message = "Hello World!";
// メッセージを送信します
// 最初のパラメーターは交換の名前です
// 2 番目のパラメーターはルーティング キーですが、ファンアウト交換はルーティング キーを無視するため、設定する必要はありません
// 3 番目のパラメーターはメッセージの内容で、シリアル化できればどのようなタイプでもサポートされます
template.convertAndSend(fanout.getName(), "", message);
System.out.println("メッセージを送信しました: '" + message + "'");
}
}
4. メッセージの受信
4.1 キューの定義および交換のバインド
キューのメッセージを消費するには、まずキューを定義し、その後そのキューを対象の交換にバインドする必要があります。 以下では、2 つのキューを定義し、それらを同じ交換にバインドします。
package com.tizi365.rabbitmq.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class QueueConfig {
@Bean
public FanoutExchange fanout() {
// 交換を定義します
// パラメーターはユニークである必要がある交換の名前です
return new FanoutExchange("tizi365.fanout");
}
@Bean
public Queue queue1() {
// キュー 1 を定義します
return new Queue("tizi365.fanout.queue1");
}
@Bean
public Queue queue2() {
// キュー 2 を定義します
return new Queue("tizi365.fanout.queue2");
}
@Bean
public Binding binding1(FanoutExchange fanout, Queue queue1) {
// バインディング関係を定義し、キュー 1 をファンアウト交換にバインドします
return BindingBuilder.bind(queue1).to(fanout);
}
@Bean
public Binding binding2(FanoutExchange fanout, Queue queue2) {
// バインディング関係を定義し、キュー 2 をファンアウト交換にバインドします
return BindingBuilder.bind(queue2).to(fanout);
}
}
4.2 キューのリスナーを定義する
特定のキューからメッセージを消費するために、RabbitListenerアノテーションを使用してメッセージリスナーを定義します。
package com.tizi365.rabbitmq.listener;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
// 現在のクラスをSpringで管理可能にする
@Component
public class DemoListener {
// リスナーを定義し、queuesパラメータを使用してリッスンするキューを指定します
@RabbitListener(queues = "tizi365.fanout.queue1")
public void receive1(String msg) {
System.out.println("キュー1からメッセージを受信しました = " + msg);
}
// リスナーを定義し、queuesパラメータを使用してリッスンするキューを指定します
@RabbitListener(queues = "tizi365.fanout.queue2")
public void receive2(String msg) {
System.out.println("キュー2からメッセージを受信しました = " + msg);
}
}
前述のExchangeがfanoutタイプとして定義されたため、各メッセージは現在のExchangeにバインドされたすべてのキューに配信され、メッセージは上記の2つのメソッドで個別に処理されます。
注:RabbitListenerアノテーションはクラスまたはメソッドに適用できます。クラスにRabbitListenerアノテーションが定義されている場合は、RabbitHandlerアノテーションと組み合わせて、どのクラスメソッドがメッセージ処理を実行するかをマークする必要があります。
4.3 フルアノテーションを使用してキューリスナーを定義する
スプリングブートの構成クラスを使用して、交換、キュー、およびバインディングの関係を定義する必要はありません。 RabbitListenerアノテーションのbindingsパラメータを使用して、直接バインディングの関係、キュー、および交換を定義できます。
@RabbitListener(
bindings = {
@QueueBinding(
value = @Queue(name = "tizi365.fanout.queue3", durable = "true"),
exchange = @Exchange(name = "tizi365.fanout", durable = "true", type = ExchangeTypes.FANOUT)
)
}
)
public void receive3(String msg) {
System.out.println("キュー3からメッセージを受信しました = " + msg);
}
説明:
- QueueBindingアノテーション:キューと交換のバインディング関係を定義します。valueパラメータはキューを定義し、exchangeは交換を定義します。
- Queueアノテーション:キューを定義します。nameパラメータはキュー名を定義し(一意である必要があります)、durableパラメータはキューを永続化するかどうかを示します。
- Exchangeアノテーション:交換を定義します。nameパラメータは交換名を定義し、typeパラメータは交換タイプを示します。