JavaのRabbitMQにおける最も簡単なキューモードは、プロデューサーとコンシューマーで構成されています。
現在、RabbitMQのJava操作では主にSpring Bootのspring-boot-starter-amqp
パッケージを使用しており、基本的にはSpring AMQPを使用してキューを操作しています。
1. 前提チュートリアル
関連する知識を理解するために、以下のチャプターを読んでください:
2. 依存パッケージ
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
3. RabbitMQの設定
application.yml
の設定を変更してください:
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
4. キューの宣言
Spring Bootの設定クラスでキューを設定してください:
package com.tizi365.rabbitmq.config;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class QueueConfig {
@Bean
public Queue helloQueue() {
// キューを宣言し、キュー名はユニークである必要があります
return new Queue("hello");
}
}
ヒント: ビジネスのニーズに応じて複数のキューを定義することができます。キュー名とQueueのbean IDは異なる必要があります。ここでは、メソッド名がbean IDです。
5. メッセージの送信
メッセージの送信には、Spring Bootがすでに初期化しているRabbitTemplateクラスが必要です。単純にインスタンスをインジェクトしてください:
package com.tizi365.rabbitmq.service;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
@Service
public class SendService {
// RabbitTemplateインスタンスをインジェクト
@Autowired
private RabbitTemplate template;
// 以前に定義したキューをインジェクト
@Autowired
@Qualifier("helloQueue")
private Queue helloQueue;
// デモのために、Springの組み込みスケジュールタスクを使用して定期的にメッセージを送信します(1秒ごとに1つのメッセージ)
@Scheduled(fixedDelay = 1000, initialDelay = 1000)
public void send() {
// メッセージの内容
String message = "Hello World!";
// メッセージを送信
// 最初のパラメータはルーティングキーで、ここではキュー名をルーティングキーとして使用しています
// 2番目のパラメータはメッセージの内容で、シリアル化をサポートしている限り、任意の型がサポートされます
template.convertAndSend(helloQueue.getName(), message);
System.out.println("メッセージを送信中: '" + message + "'");
}
}
ヒント: ここでは、直接交換を使用していません。基礎となるデフォルトの交換(ダイレクト交換)が使用されます。
6. メッセージの受信
コンシューマーがメッセージを受信するのも簡単です:
package com.tizi365.rabbitmq.listener;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
// メッセージリスナーを宣言し、`queues`パラメータを使用してリスンするキューを指定します。これは以前のキュー名と一致している必要があります
@RabbitListener(queues = "hello")
public class HelloListener {
// メッセージハンドラとしてのマークにRabbitHandlerを使用し、メッセージ処理ロジックを実行するために使用されます
@RabbitHandler
public void receive(String msg) {
System.out.println("コンシューマー - 受信したメッセージ: '" + msg + "'");
}
}
7. カスタムメッセージのタイプ
以前は文字列型のメッセージを送信しましたが、実際のビジネスシナリオでは、さまざまなカスタムJavaオブジェクトタイプのデータを直接送信することを好むでしょう。
エンティティオブジェクトの定義
package com.tizi365.rabbitmq.domain;
import java.io.Serializable;
import lombok.Data;
// ブログの内容
@Data
public class Blog implements Serializable {
// ID
private Integer id;
// タイトル
private String title;
}
カスタムタイプのメッセージを送信
Blog blog = new Blog();
blog.setId(100);
blog.setTitle("Tizi365 RabbitMQのチュートリアル");
// メッセージを送信
template.convertAndSend(helloQueue.getName(), blog);
カスタムタイプのメッセージを受信する
@RabbitHandler
// メソッドのパラメーターをカスタムメッセージタイプに単純に変更する
public void receive(Blog msg) {
System.out.println("Consumer - メッセージを受信しました '" + msg.getTitle() + "'");
}
メッセージ内容のJSONシリアリゼーションの使用
RabbitMQがJavaエンティティオブジェクトデータを送信する際、デフォルトでJDKのオブジェクトシリアリゼーションツールを使用します。これをデータシリアリゼーションのためにJSON形式を使用するよう変更することができ、これにより他の言語でJavaから送信されたメッセージを消費することができ、メッセージ形式がより読みやすくなります。
以前の構成クラスを修正し、次の構成を追加して、メッセージデータのシリアリゼーションとデシリアリゼーションにJackson JSONパーサーを使用します。
@Bean
public Jackson2JsonMessageConverter messageConverter() {
// デフォルトのメッセージコンバータを設定
return new Jackson2JsonMessageConverter();
}