RabbitMQのルーティングパターン(ダイレクトモード)は、JavaでDirectExchangeタイプの交換箇所を使用します。パブリッシュ/サブスクライブパターンとの違いは、Direct交換が完全に一致するルーティングパラメータを持つキューにメッセージを配信することです。下の画像はその構造を示しています。

RabbitMQ ダイレクトモード

ヒント: 使用するRabbitMQのワーキングモードに関係なく、違いは使用する交換箇所のタイプとルーティングパラメータにあります。

1. 必須チュートリアル

関連する知識を理解するために、次のセクションを読んでください:

2. ダイレクト交換箇所の定義

Spring AMQPでは、Direct交換箇所に対応するクラスはDirectExchangeです。交換箇所はSpring Bootの構成クラスを通じて定義します。

package com.tizi365.rabbitmq.config;

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class QueueConfig {
	@Bean
    public DirectExchange direct() {
        // 交換箇所を定義
        // パラメータは交換箇所の名前であり、ユニークである必要があります
        return new DirectExchange("tizi365.direct");
    }
}

ヒント: メッセージプロデューサーとコンシューマーの両方で交換箇所が必要です。

3. メッセージの送信

メッセージを交換箇所に送信し、その後ルーティングルールに基づいてメッセージを対応するキューに配信します。

package com.tizi365.rabbitmq.service;

import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;

@Service
public class SendService {
    @Autowired
    private RabbitTemplate template;
    @Autowired
    private DirectExchange direct;

    // デモンストレーションのため、1秒ごとにメッセージを送信する予定のタスクを使用します
    @Scheduled(fixedDelay = 1000, initialDelay = 1000)
    public void send() {
        // メッセージの内容
        String message = "こんにちは、世界!";
        // メッセージを送信
        // 第1パラメータは交換箇所の名前
        // 第2パラメータはルーティングキーです。ダイレクト交換箇所では、このメッセージは"tizi365"に一致するルーティングキーを持つキューに配信されます
        // 第3パラメータはメッセージの内容であり、シリアル化できる限り、どの型でもサポートされます
        template.convertAndSend(direct.getName(), "tizi365", message);
        System.out.println("メッセージ '" + message + "'を送信しました");
    }
}

ヒント: convertAndSendメソッドの第2パラメータに注意してください。これは重要なパラメータです。

4. メッセージの受信

4.1 キューの定義とエクスチェンジのバインド

メッセージをキューから消費するためには、まずキューを定義し、その後にそのキューをターゲットのエクスチェンジにバインドする必要があります。 次のコードは、2つのキューを定義し、それらを同じエクスチェンジにバインドします:

package com.tizi365.rabbitmq.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class QueueConfig {
    @Bean
    public DirectExchange direct() {
        // エクスチェンジを定義
        // パラメータはエクスチェンジ名で、ユニークである必要があります
        return new DirectExchange("tizi365.direct");
    }

    @Bean
    public Queue queue1() {
        // キュー1を定義
        return new Queue("tizi365.direct.queue1");
    }

    @Bean
    public Queue queue2() {
        // キュー2を定義
        return new Queue("tizi365.direct.queue2");
    }

    @Bean
    public Binding binding1(DirectExchange direct, Queue queue1) {
        // バインディングを定義し、ルーティングキーが "tizi365" の場合にキュー1をダイレクトエクスチェンジにバインドする
		// ルーティングキーが "tizi365" に一致すると、エクスチェンジはメッセージをキュー1に配信します
        return BindingBuilder.bind(queue1).to(direct).with("tizi365");
    }

    @Bean
    public Binding binding2(DirectExchange direct, Queue queue2) {
        // バインディングを定義し、ルーティングキーが "baidu" の場合にキュー2をダイレクトエクスチェンジにバインドする
		// ルーティングキーが "baidu" に一致すると、エクスチェンジはメッセージをキュー2に配信します
        return BindingBuilder.bind(queue2).to(direct).with("baidu");
    }
}

4.2 キューリスナーの定義

特定のキューからメッセージを消費するために、RabbitListenerアノテーションを使用してメッセージリスナーを定義します:

package com.tizi365.rabbitmq.listener;

import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;

// Springに現在のクラスを管理させます
@Component
public class DemoListener {
    // キュー1からメッセージを消費するリスナーを定義
    @RabbitListener(queues = "tizi365.direct.queue1")
    public void receive1(String msg) {
        System.out.println("キュー1からメッセージを受信しました:" + msg);
    }

    // キュー2からメッセージを消費するリスナーを定義
    @RabbitListener(queues = "tizi365.direct.queue2")
    public void receive2(String msg) {
        System.out.println("キュー2からメッセージを受信しました:" + msg);
    }
}

ルーティングキーが "tizi365" に一致するため、エクスチェンジはキュー1だけにメッセージを配信します。ルーティングキーが一致しないため、キュー2はメッセージを受信しません。