JavaのRabbitMQにおける最も簡単なキューモードは、プロデューサーとコンシューマーで構成されています。

RabbitMQ Simple Queue

現在、RabbitMQのJava操作では主にSpring Bootのspring-boot-starter-amqpパッケージを使用しており、基本的にはSpring AMQPを使用してキューを操作しています。

1. 前提チュートリアル

関連する知識を理解するために、以下のチャプターを読んでください:

2. 依存パッケージ

<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

3. RabbitMQの設定

application.ymlの設定を変更してください:

spring:
  rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest

4. キューの宣言

Spring Bootの設定クラスでキューを設定してください:

package com.tizi365.rabbitmq.config;

import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class QueueConfig {
    @Bean
    public Queue helloQueue() {
        // キューを宣言し、キュー名はユニークである必要があります
        return new Queue("hello");
    }
}

ヒント: ビジネスのニーズに応じて複数のキューを定義することができます。キュー名とQueueのbean IDは異なる必要があります。ここでは、メソッド名がbean IDです。

5. メッセージの送信

メッセージの送信には、Spring Bootがすでに初期化しているRabbitTemplateクラスが必要です。単純にインスタンスをインジェクトしてください:

package com.tizi365.rabbitmq.service;

import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;

@Service
public class SendService {
    // RabbitTemplateインスタンスをインジェクト
    @Autowired
    private RabbitTemplate template;
    
    // 以前に定義したキューをインジェクト
    @Autowired
    @Qualifier("helloQueue")
    private Queue helloQueue;
    
    // デモのために、Springの組み込みスケジュールタスクを使用して定期的にメッセージを送信します(1秒ごとに1つのメッセージ)
    @Scheduled(fixedDelay = 1000, initialDelay = 1000)
    public void send() {
        // メッセージの内容
        String message = "Hello World!";
        // メッセージを送信
        // 最初のパラメータはルーティングキーで、ここではキュー名をルーティングキーとして使用しています
        // 2番目のパラメータはメッセージの内容で、シリアル化をサポートしている限り、任意の型がサポートされます
        template.convertAndSend(helloQueue.getName(), message);
        System.out.println("メッセージを送信中: '" + message + "'");
    }
}

ヒント: ここでは、直接交換を使用していません。基礎となるデフォルトの交換(ダイレクト交換)が使用されます。

6. メッセージの受信

コンシューマーがメッセージを受信するのも簡単です:

package com.tizi365.rabbitmq.listener;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
// メッセージリスナーを宣言し、`queues`パラメータを使用してリスンするキューを指定します。これは以前のキュー名と一致している必要があります
@RabbitListener(queues = "hello")
public class HelloListener {
	// メッセージハンドラとしてのマークにRabbitHandlerを使用し、メッセージ処理ロジックを実行するために使用されます
    @RabbitHandler
    public void receive(String msg) {
        System.out.println("コンシューマー - 受信したメッセージ: '" + msg + "'");
    }
}

7. カスタムメッセージのタイプ

以前は文字列型のメッセージを送信しましたが、実際のビジネスシナリオでは、さまざまなカスタムJavaオブジェクトタイプのデータを直接送信することを好むでしょう。

エンティティオブジェクトの定義

package com.tizi365.rabbitmq.domain;

import java.io.Serializable;
import lombok.Data;

// ブログの内容
@Data
public class Blog implements Serializable {
    // ID
    private Integer id;
    // タイトル
    private String title;
}

カスタムタイプのメッセージを送信

Blog blog = new Blog();
blog.setId(100);
blog.setTitle("Tizi365 RabbitMQのチュートリアル");

// メッセージを送信
template.convertAndSend(helloQueue.getName(), blog);

カスタムタイプのメッセージを受信する

@RabbitHandler
// メソッドのパラメーターをカスタムメッセージタイプに単純に変更する
public void receive(Blog msg) {
    System.out.println("Consumer - メッセージを受信しました '" + msg.getTitle() + "'");
}

メッセージ内容のJSONシリアリゼーションの使用

RabbitMQがJavaエンティティオブジェクトデータを送信する際、デフォルトでJDKのオブジェクトシリアリゼーションツールを使用します。これをデータシリアリゼーションのためにJSON形式を使用するよう変更することができ、これにより他の言語でJavaから送信されたメッセージを消費することができ、メッセージ形式がより読みやすくなります。

以前の構成クラスを修正し、次の構成を追加して、メッセージデータのシリアリゼーションとデシリアリゼーションにJackson JSONパーサーを使用します。

@Bean
public Jackson2JsonMessageConverter messageConverter() {
    // デフォルトのメッセージコンバータを設定
    return new Jackson2JsonMessageConverter();
}