Java RabbitMQ パブリッシュ/サブスクライブ パターン(ブロードキャストモード、ファンアウトモード)

FanoutExchange 交換タイプは、RabbitMQ におけるパブリッシュ/サブスクライブ パターンに使用されます。ここではプロデューサーによって送信されるメッセージが複数のコンシューマーキューによって処理されるアーキテクチャが以下の図に示されています。

RabbitMQ Work Mode

Fanout 交換はメッセージをすべてのバインドされたキューに転送できます。

ヒント: 使用する RabbitMQ の動作モードに関係なく、違いは使用する交換タイプとルーティング パラメーターにあります。

1. 必須チュートリアル

関連する知識を理解するために、まず以下のセクションを読んでください。

2. ファンアウト交換の定義

Spring AMQP では、FanoutExchange クラスがファンアウト 交換に対応しています。Exchange は Spring Boot 設定クラスを介して定義します。

package com.tizi365.rabbitmq.config;

import org.springframework.amqp.core.FanoutExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class QueueConfig {
	// 交換を定義します
    @Bean
    public FanoutExchange fanout() {
        // パラメーターはユニークである必要がある交換の名前です
        return new FanoutExchange("tizi365.fanout");
    }
}

ヒント: メッセージのプロデューサーとコンシューマーの両方が交換を必要とします。

3. メッセージの送信

メッセージを交換に送信し、ルーティング規則に基づいてメッセージを対応するキューに配信します。

package com.tizi365.rabbitmq.service;

import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;

@Service
public class SendService {
    @Autowired
    private RabbitTemplate template;
    @Autowired
    private FanoutExchange fanout;

	// デモ目的のため、スケジュールされたタスクを使用してメッセージを毎秒送信します
    @Scheduled(fixedDelay = 1000, initialDelay = 1000)
    public void send() {
        // メッセージの内容
        String message = "Hello World!";
        // メッセージを送信します
        // 最初のパラメーターは交換の名前です
        // 2 番目のパラメーターはルーティング キーですが、ファンアウト交換はルーティング キーを無視するため、設定する必要はありません
        // 3 番目のパラメーターはメッセージの内容で、シリアル化できればどのようなタイプでもサポートされます
        template.convertAndSend(fanout.getName(), "", message);
        System.out.println("メッセージを送信しました: '" + message + "'");
    }
}

4. メッセージの受信

4.1 キューの定義および交換のバインド

キューのメッセージを消費するには、まずキューを定義し、その後そのキューを対象の交換にバインドする必要があります。 以下では、2 つのキューを定義し、それらを同じ交換にバインドします。

package com.tizi365.rabbitmq.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class QueueConfig {
    @Bean
    public FanoutExchange fanout() {
        // 交換を定義します
        // パラメーターはユニークである必要がある交換の名前です
        return new FanoutExchange("tizi365.fanout");
    }

    @Bean
    public Queue queue1() {
        // キュー 1 を定義します
        return new Queue("tizi365.fanout.queue1");
    }

    @Bean
    public Queue queue2() {
        // キュー 2 を定義します
        return new Queue("tizi365.fanout.queue2");
    }

    @Bean
    public Binding binding1(FanoutExchange fanout, Queue queue1) {
        // バインディング関係を定義し、キュー 1 をファンアウト交換にバインドします
        return BindingBuilder.bind(queue1).to(fanout);
    }

    @Bean
    public Binding binding2(FanoutExchange fanout, Queue queue2) {
        // バインディング関係を定義し、キュー 2 をファンアウト交換にバインドします
        return BindingBuilder.bind(queue2).to(fanout);
    }
}

4.2 キューのリスナーを定義する

特定のキューからメッセージを消費するために、RabbitListenerアノテーションを使用してメッセージリスナーを定義します。

package com.tizi365.rabbitmq.listener;

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

// 現在のクラスをSpringで管理可能にする
@Component
public class DemoListener {
    // リスナーを定義し、queuesパラメータを使用してリッスンするキューを指定します
    @RabbitListener(queues = "tizi365.fanout.queue1")
    public void receive1(String msg) {
        System.out.println("キュー1からメッセージを受信しました = " + msg);
    }

    // リスナーを定義し、queuesパラメータを使用してリッスンするキューを指定します
    @RabbitListener(queues = "tizi365.fanout.queue2")
    public void receive2(String msg) {
        System.out.println("キュー2からメッセージを受信しました = " + msg);
    }
}

前述のExchangeがfanoutタイプとして定義されたため、各メッセージは現在のExchangeにバインドされたすべてのキューに配信され、メッセージは上記の2つのメソッドで個別に処理されます。

注:RabbitListenerアノテーションはクラスまたはメソッドに適用できます。クラスにRabbitListenerアノテーションが定義されている場合は、RabbitHandlerアノテーションと組み合わせて、どのクラスメソッドがメッセージ処理を実行するかをマークする必要があります。

4.3 フルアノテーションを使用してキューリスナーを定義する

スプリングブートの構成クラスを使用して、交換、キュー、およびバインディングの関係を定義する必要はありません。 RabbitListenerアノテーションのbindingsパラメータを使用して、直接バインディングの関係、キュー、および交換を定義できます。

@RabbitListener(
            bindings = {
                    @QueueBinding(
                            value = @Queue(name = "tizi365.fanout.queue3", durable = "true"),
                            exchange = @Exchange(name = "tizi365.fanout", durable = "true", type = ExchangeTypes.FANOUT)
                    )
            }
    )
public void receive3(String msg) {
    System.out.println("キュー3からメッセージを受信しました = " + msg);
}

説明:

  • QueueBindingアノテーション:キューと交換のバインディング関係を定義します。valueパラメータはキューを定義し、exchangeは交換を定義します。
  • Queueアノテーション:キューを定義します。nameパラメータはキュー名を定義し(一意である必要があります)、durableパラメータはキューを永続化するかどうかを示します。
  • Exchangeアノテーション:交換を定義します。nameパラメータは交換名を定義し、typeパラメータは交換タイプを示します。