Java RabbitMQ Publish/Subscribe Pattern (Broadcast Mode, Fanout Mode)
팬아웃 교환(FanoutExchange)은 RabbitMQ에서 발행/구독 패턴에 사용되며, 생산자가 보낸 메시지는 여러 개의 소비자 대기열에서 처리됩니다. 다음 다이어그램에 표시된 것처럼 아키텍처가 구성됩니다:
팬아웃 교환은 메시지를 모든 바인딩된 대기열로 전달할 수 있습니다.
팁: RabbitMQ 작업 모드에 관계없이 교환 유형과 라우팅 매개변수의 차이가 있습니다.
1. 사전 자습서
관련 지식을 이해하기 위해 먼저 다음 섹션들을 읽어주세요:
- RabbitMQ 기본 개념
- RabbitMQ 발행/구독 패턴
- RabbitMQ 자바 빠른 시작 가이드 (후속 섹션에서 코드를 복제하지 않고 키 코드 스니펫만 표시되므로 필수입니다)
2. 팬아웃 교환 정의
Spring AMQP에서 FanoutExchange 클래스는 팬아웃 교환에 해당합니다. 우리는 Spring Boot 구성 클래스를 통해 교환을 정의합니다.
package com.tizi365.rabbitmq.config;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class QueueConfig {
// 교환을 정의합니다.
@Bean
public FanoutExchange fanout() {
// 매개변수는 고유해야 하는 교환 이름입니다.
return new FanoutExchange("tizi365.fanout");
}
}
팁: 메시지 생산자와 소비자는 모두 교환을 필요로 합니다.
3. 메시지 전송
우리는 메시지를 교환에 보내며, 교환은 라우팅 규칙에 따라 해당 대기열에 메시지를 전달합니다.
package com.tizi365.rabbitmq.service;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
@Service
public class SendService {
@Autowired
private RabbitTemplate template;
@Autowired
private FanoutExchange fanout;
// 데모를 위해 예약된 작업을 사용하여 초마다 메시지를 보냅니다.
@Scheduled(fixedDelay = 1000, initialDelay = 1000)
public void send() {
// 메시지 내용
String message = "안녕하세요, 세상아!";
// 메시지를 보냅니다.
// 첫 번째 매개변수는 교환 이름입니다.
// 두 번째 매개변수는 라우팅 키입니다. 팬아웃 교환은 라우팅 키를 무시하므로 설정할 필요가 없습니다.
// 세 번째 매개변수는 메시지 내용입니다. 직렬화될 수 있는 모든 유형을 지원합니다.
template.convertAndSend(fanout.getName(), "", message);
System.out.println("전송된 메시지: '" + message + "'");
}
}
4. 메시지 수신
4.1 대기열 정의 및 교환 바인딩
대기열 메시지를 소비하려면 먼저 대기열을 정의한 다음 대기열을 대상 교환에 바인딩해야 합니다. 아래에서는 두 개의 대기열을 정의하고 동일한 교환에 바인딩합니다.
package com.tizi365.rabbitmq.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class QueueConfig {
@Bean
public FanoutExchange fanout() {
// 교환을 정의합니다.
// 매개변수는 고유해야 하는 교환 이름입니다.
return new FanoutExchange("tizi365.fanout");
}
@Bean
public Queue queue1() {
// 대기열 1을 정의합니다.
return new Queue("tizi365.fanout.queue1");
}
@Bean
public Queue queue2() {
// 대기열 2를 정의합니다.
return new Queue("tizi365.fanout.queue2");
}
@Bean
public Binding binding1(FanoutExchange fanout, Queue queue1) {
// 바인딩 관계를 정의하여 대기열 1을 팬아웃 교환에 바인딩합니다.
return BindingBuilder.bind(queue1).to(fanout);
}
@Bean
public Binding binding2(FanoutExchange fanout, Queue queue2) {
// 바인딩 관계를 정의하여 대기열 2를 팬아웃 교환에 바인딩합니다.
return BindingBuilder.bind(queue2).to(fanout);
}
}
4.2 큐 리스너 정의
특정 큐에서 메시지를 소비하기 위해 RabbitListener 주석을 사용하여 메시지 리스너를 정의합니다.
package com.tizi365.rabbitmq.listener;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
// 스프링에 의해 현재 클래스를 관리 대상으로 만듭니다
@Component
public class DemoListener {
// 리스너를 정의하고, queues 매개변수를 사용하여 어떤 큐에서 메시지를 수신할지 지정합니다
@RabbitListener(queues = "tizi365.fanout.queue1")
public void receive1(String msg) {
System.out.println("큐 1에서 메시지 받음 = " + msg);
}
// 리스너를 정의하고, queues 매개변수를 사용하여 어떤 큐에서 메시지를 수신할지 지정합니다
@RabbitListener(queues = "tizi365.fanout.queue2")
public void receive2(String msg) {
System.out.println("큐 2에서 메시지 받음 = " + msg);
}
}
이전에 정의한 Exchange가 fanout 유형으로 정의되었기 때문에, 각 메시지는 현재 Exchange에 바인드된 모든 큐에 분배될 것이며, 위의 두 가지 메소드로 메시지가 별도로 처리될 것입니다.
참고: RabbitListener 주석을 클래스 또는 메서드에 적용할 수 있습니다. RabbitListener 주석이 클래스에 정의된 경우 RabbitHandler 주석과 결합하여 어떤 클래스 메서드가 메시지 처리를 실행할지 표시해야 합니다.
4.3 전체 주석을 사용하여 큐 리스너 정의
스프링 부트 구성 클래스를 사용하지 않고도 교환, 큐 및 바인딩 관계를 정의할 필요가 없습니다. RabbitListener 주석의 bindings 매개변수를 사용하여 직접적으로 바인딩 관계, 큐 및 교환을 정의할 수 있습니다.
@RabbitListener(
bindings = {
@QueueBinding(
value = @Queue(name = "tizi365.fanout.queue3", durable = "true"),
exchange = @Exchange(name = "tizi365.fanout", durable = "true", type = ExchangeTypes.FANOUT)
)
}
)
public void receive3(String msg) {
System.out.println("큐 3에서 메시지 받음 = " + msg);
}
설명:
- QueueBinding 주석: 큐와 교환 사이의 바인딩 관계를 정의합니다. 값 매개변수는 큐를 정의하고, 교환은 교환을 정의합니다.
- Queue 주석: 큐를 정의합니다. name 매개변수는 큐 이름을 정의하며(고유해야 함), durable 매개변수는 영속성이 필요한지 여부를 나타냅니다.
- Exchange 주석: 교환을 정의합니다. name 매개변수는 교환 이름을 정의하며, type 매개변수는 교환 유형을 나타냅니다.