JavaのRabbitMQトピックパターンは、TopicExchangeタイプを使用し、Directのルーティングパターンと異なる点は、ルーティングパラメータが曖昧なマッチングをサポートしていることです。ルーティングのマッチングが柔軟なため、より一般的に使用されるパターンです。アーキテクチャは以下の図の通りです:

RabbitMQ トピックパターン

ヒント: 使用するRabbitMQの動作パターンに関係なく、違いは使用する交換機のタイプとルーティングパラメータにあります。

1. 前提チュートリアル

関連知識を理解するために、最初に以下のセクションを読んでください:

2. トピック交換機の定義

Spring AMQPでは、Direct交換機に対応するクラスはTopicExchangeです。ExchangeはSpring Bootの設定クラスを介して定義します。

package com.tizi365.rabbitmq.config;

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class QueueConfig {
    @Bean
    public TopicExchange topic() {
        // 交換機を定義
        // パラメータは交換機の名前であり、一意である必要があります
        return new TopicExchange("tizi365.topic");
    }
}

ヒント: メッセージの生産者と消費者の両方が交換機が必要です。

3. メッセージの送信

メッセージを交換機に送信し、交換機はルーティングルールに基づいてメッセージを対応するキューに配信します。

package com.tizi365.rabbitmq.service;

import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;

@Service
public class SendService {
    @Autowired
    private RabbitTemplate template;
    @Autowired
    private TopicExchange topic;

    // デモ目的で、定格タスクを使用して毎秒メッセージを送信します
    @Scheduled(fixedDelay = 1000, initialDelay = 1000)
    public void send() {
        // メッセージの内容
        String message = "Hello World!";
        // メッセージを送信
        // 第一パラメータは交換機の名前
        // 第二パラメータはルーティングキーです。トピック交換機はルーティングキーに一致するキューにメッセージを配信します
        // 第三パラメータはメッセージの内容で、シリアル化できる限り、任意のタイプをサポートします
        template.convertAndSend(topic.getName(), "www.tizi365.com", message);
        System.out.println("Message sent: '" + message + "'");
    }
}

ヒント: convertAndSendメソッドの第二パラメータ "www.tizi365.com" に注意してください。これは重要なパラメータです。

4. メッセージの受信

4.1 キューの定義とエクスチェンジへのバインド

メッセージをキューから消費するためには、まずキューを定義し、その後そのキューをターゲットエクスチェンジにバインドする必要があります。以下では、2つのキューが定義され、同じエクスチェンジにバインドされています。

package com.tizi365.rabbitmq.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class QueueConfig {
    @Bean
    public TopicExchange topic() {
        // エクスチェンジを定義
        // パラメータは一意である必要があるエクスチェンジ名です
        return new TopicExchange("tizi365.topic");
    }

    @Bean
    public Queue queue1() {
        // キュー1を定義
        return new Queue("tizi365.topic.queue1");
    }

    @Bean
    public Queue queue2() {
        // キュー2を定義
        return new Queue("tizi365.topic.queue2");
    }

    @Bean
    public Binding binding1(TopicExchange topic, Queue queue1) {
        // バインド関係を定義し、キュー1を次のルーティングキーでエクスチェンジにバインドします: *.tizi365.com
        return BindingBuilder.bind(queue1).to(topic).with("*.tizi365.com");
    }

    @Bean
    public Binding binding2(TopicExchange topic, Queue queue2) {
        // バインド関係を定義し、キュー2を次のルーティングキーでエクスチェンジにバインドします: *.baidu.com
        return BindingBuilder.bind(queue2).to(topic).with("*.baidu.com");
    }
}

ヒント: キュー1とキュー2をエクスチェンジにバインドする際、利用するルーティングキーはどちらも *(ワイルドカード)を使用しており、これは1つの単語とマッチします。もし #(ハッシュ)に変更すると、複数の単語とマッチします。

4.2 キューリスナーの定義

特定のキューからメッセージを消費するためにRabbitListenerアノテーションを使用してメッセージリスナーを定義します。

package com.tizi365.rabbitmq.listener;

import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;

// Springに現在のクラスを管理させる
@Component
public class DemoListener {
    // リスナーを定義し、queuesパラメータを通じてどのキューをリッスンするかを指定します
    @RabbitListener(queues = "tizi365.topic.queue1")
    public void receive1(String msg) {
        System.out.println("キュー1からメッセージを受信: " + msg);
    }

    // リスナーを定義し、queuesパラメータを通じてどのキューをリッスンするかを指定します
    @RabbitListener(queues = "tizi365.topic.queue2")
    public void receive2(String msg) {
        System.out.println("キュー2からメッセージを受信: " + msg);
    }
}

キュー1はエクスチェンジにバインドする際、ルーティングキーが*.tizi365.comに設定されているため、ルーティングキー(www.tizi365.com)にマッチするメッセージのみキュー1が受信できます。そのため、キュー2は一致しないルーティングキーのため、メッセージを受信しません。