Java RabbitMQ Publish/Subscribe Pattern (Broadcast Mode, Fanout Mode)

팬아웃 교환(FanoutExchange)은 RabbitMQ에서 발행/구독 패턴에 사용되며, 생산자가 보낸 메시지는 여러 개의 소비자 대기열에서 처리됩니다. 다음 다이어그램에 표시된 것처럼 아키텍처가 구성됩니다:

RabbitMQ 작업 모드

팬아웃 교환은 메시지를 모든 바인딩된 대기열로 전달할 수 있습니다.

팁: RabbitMQ 작업 모드에 관계없이 교환 유형과 라우팅 매개변수의 차이가 있습니다.

1. 사전 자습서

관련 지식을 이해하기 위해 먼저 다음 섹션들을 읽어주세요:

2. 팬아웃 교환 정의

Spring AMQP에서 FanoutExchange 클래스는 팬아웃 교환에 해당합니다. 우리는 Spring Boot 구성 클래스를 통해 교환을 정의합니다.

package com.tizi365.rabbitmq.config;

import org.springframework.amqp.core.FanoutExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class QueueConfig {
	// 교환을 정의합니다.
    @Bean
    public FanoutExchange fanout() {
        // 매개변수는 고유해야 하는 교환 이름입니다.
        return new FanoutExchange("tizi365.fanout");
    }
}

팁: 메시지 생산자와 소비자는 모두 교환을 필요로 합니다.

3. 메시지 전송

우리는 메시지를 교환에 보내며, 교환은 라우팅 규칙에 따라 해당 대기열에 메시지를 전달합니다.

package com.tizi365.rabbitmq.service;

import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;

@Service
public class SendService {
    @Autowired
    private RabbitTemplate template;
    @Autowired
    private FanoutExchange fanout;

	// 데모를 위해 예약된 작업을 사용하여 초마다 메시지를 보냅니다.
    @Scheduled(fixedDelay = 1000, initialDelay = 1000)
    public void send() {
        // 메시지 내용
        String message = "안녕하세요, 세상아!";
        // 메시지를 보냅니다.
        // 첫 번째 매개변수는 교환 이름입니다.
        // 두 번째 매개변수는 라우팅 키입니다. 팬아웃 교환은 라우팅 키를 무시하므로 설정할 필요가 없습니다.
        // 세 번째 매개변수는 메시지 내용입니다. 직렬화될 수 있는 모든 유형을 지원합니다.
        template.convertAndSend(fanout.getName(), "", message);
        System.out.println("전송된 메시지: '" + message + "'");
    }
}

4. 메시지 수신

4.1 대기열 정의 및 교환 바인딩

대기열 메시지를 소비하려면 먼저 대기열을 정의한 다음 대기열을 대상 교환에 바인딩해야 합니다. 아래에서는 두 개의 대기열을 정의하고 동일한 교환에 바인딩합니다.

package com.tizi365.rabbitmq.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class QueueConfig {
    @Bean
    public FanoutExchange fanout() {
        // 교환을 정의합니다.
        // 매개변수는 고유해야 하는 교환 이름입니다.
        return new FanoutExchange("tizi365.fanout");
    }

    @Bean
    public Queue queue1() {
        // 대기열 1을 정의합니다.
        return new Queue("tizi365.fanout.queue1");
    }

    @Bean
    public Queue queue2() {
        // 대기열 2를 정의합니다.
        return new Queue("tizi365.fanout.queue2");
    }

    @Bean
    public Binding binding1(FanoutExchange fanout, Queue queue1) {
        // 바인딩 관계를 정의하여 대기열 1을 팬아웃 교환에 바인딩합니다.
        return BindingBuilder.bind(queue1).to(fanout);
    }

    @Bean
    public Binding binding2(FanoutExchange fanout, Queue queue2) {
        // 바인딩 관계를 정의하여 대기열 2를 팬아웃 교환에 바인딩합니다.
        return BindingBuilder.bind(queue2).to(fanout);
    }
}

4.2 큐 리스너 정의

특정 큐에서 메시지를 소비하기 위해 RabbitListener 주석을 사용하여 메시지 리스너를 정의합니다.

package com.tizi365.rabbitmq.listener;

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

// 스프링에 의해 현재 클래스를 관리 대상으로 만듭니다
@Component
public class DemoListener {
    // 리스너를 정의하고, queues 매개변수를 사용하여 어떤 큐에서 메시지를 수신할지 지정합니다
    @RabbitListener(queues = "tizi365.fanout.queue1")
    public void receive1(String msg) {
        System.out.println("큐 1에서 메시지 받음 = " + msg);
    }

    // 리스너를 정의하고, queues 매개변수를 사용하여 어떤 큐에서 메시지를 수신할지 지정합니다
    @RabbitListener(queues = "tizi365.fanout.queue2")
    public void receive2(String msg) {
        System.out.println("큐 2에서 메시지 받음 = " + msg);
    }
}

이전에 정의한 Exchange가 fanout 유형으로 정의되었기 때문에, 각 메시지는 현재 Exchange에 바인드된 모든 큐에 분배될 것이며, 위의 두 가지 메소드로 메시지가 별도로 처리될 것입니다.

참고: RabbitListener 주석을 클래스 또는 메서드에 적용할 수 있습니다. RabbitListener 주석이 클래스에 정의된 경우 RabbitHandler 주석과 결합하여 어떤 클래스 메서드가 메시지 처리를 실행할지 표시해야 합니다.

4.3 전체 주석을 사용하여 큐 리스너 정의

스프링 부트 구성 클래스를 사용하지 않고도 교환, 큐 및 바인딩 관계를 정의할 필요가 없습니다. RabbitListener 주석의 bindings 매개변수를 사용하여 직접적으로 바인딩 관계, 큐 및 교환을 정의할 수 있습니다.

@RabbitListener(
            bindings = {
                    @QueueBinding(
                            value = @Queue(name = "tizi365.fanout.queue3", durable = "true"),
                            exchange = @Exchange(name = "tizi365.fanout", durable = "true", type = ExchangeTypes.FANOUT)
                    )
            }
    )
public void receive3(String msg) {
    System.out.println("큐 3에서 메시지 받음 = " + msg);
}

설명:

  • QueueBinding 주석: 큐와 교환 사이의 바인딩 관계를 정의합니다. 값 매개변수는 큐를 정의하고, 교환은 교환을 정의합니다.
  • Queue 주석: 큐를 정의합니다. name 매개변수는 큐 이름을 정의하며(고유해야 함), durable 매개변수는 영속성이 필요한지 여부를 나타냅니다.
  • Exchange 주석: 교환을 정의합니다. name 매개변수는 교환 이름을 정의하며, type 매개변수는 교환 유형을 나타냅니다.