Padrão de Publicação/Assinatura RabbitMQ em Java (Modo de Transmissão, Modo de Transmissão Multi)

O tipo de troca FanoutExchange é usado para o padrão de publicação/assinatura no RabbitMQ, onde uma mensagem enviada por um produtor será processada por múltiplas filas de consumidores. A arquitetura é mostrada no seguinte diagrama:

Modo de Trabalho do RabbitMQ

A troca Fanout pode encaminhar mensagens para todas as filas vinculadas.

Dica: Independentemente do modo de trabalho do RabbitMQ usado, a diferença está no tipo de troca e nos parâmetros de roteamento utilizados.

1. Tutoriais Prévios

Por favor, leia as seções a seguir primeiro para entender o conhecimento relevante:

2. Definindo a Troca Fanout

No Spring AMQP, a classe FanoutExchange corresponde à troca Fanout. Definimos a troca por meio de uma classe de configuração do Spring Boot.

package com.tizi365.rabbitmq.config;

import org.springframework.amqp.core.FanoutExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class QueueConfig {
	// Define a troca
    @Bean
    public FanoutExchange fanout() {
        // O parâmetro é o nome da troca, que deve ser único
        return new FanoutExchange("tizi365.fanout");
    }
}

Dica: Tanto os produtores quanto os consumidores de mensagens requerem uma troca.

3. Enviando Mensagens

Enviamos mensagens para a troca, que entregará as mensagens para as filas correspondentes com base nas regras de roteamento.

package com.tizi365.rabbitmq.service;

import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;

@Service
public class SendService {
    @Autowired
    private RabbitTemplate template;
    @Autowired
    private FanoutExchange fanout;

	// Para fins de demonstração, é usado uma tarefa agendada para enviar uma mensagem a cada segundo
    @Scheduled(fixedDelay = 1000, initialDelay = 1000)
    public void send() {
        // Conteúdo da mensagem
        String mensagem = "Olá Mundo!";
        // Enviar mensagem
        // O primeiro parâmetro é o nome da troca
        // O segundo parâmetro é a chave de roteamento; a troca fanout irá ignorar a chave de roteamento, portanto, não precisa ser definida
        // O terceiro parâmetro é o conteúdo da mensagem, que suporta qualquer tipo desde que possa ser serializado
        template.convertAndSend(fanout.getName(), "", mensagem);
        System.out.println("Mensagem enviada: '" + mensagem + "'");
    }
}

4. Recebendo Mensagens

4.1 Definir Filas & Vincular Troca

Para consumir mensagens da fila, é necessário primeiro definir uma fila e, em seguida, vincular a fila à troca de destino. Abaixo, definimos duas filas e as vinculamos à mesma troca.

package com.tizi365.rabbitmq.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class QueueConfig {
    @Bean
    public FanoutExchange fanout() {
        // Definir a troca
        // O parâmetro é o nome da troca, que deve ser único
        return new FanoutExchange("tizi365.fanout");
    }

    @Bean
    public Queue queue1() {
        // Definir fila 1
        return new Queue("tizi365.fanout.queue1");
    }

    @Bean
    public Queue queue2() {
        // Definir fila 2
        return new Queue("tizi365.fanout.queue2");
    }

    @Bean
    public Binding binding1(FanoutExchange fanout, Queue queue1) {
        // Definir um relacionamento de vinculação, para vincular a fila 1 à troca fanout
        return BindingBuilder.bind(queue1).to(fanout);
    }

    @Bean
    public Binding binding2(FanoutExchange fanout, Queue queue2) {
        // Definir um relacionamento de vinculação, para vincular a fila 2 à troca fanout
        return BindingBuilder.bind(queue2).to(fanout);
    }
}

4.2 Definir Ouvintes de Fila

Defina os ouvintes de mensagem através da anotação RabbitListener para consumir mensagens de filas específicas.

package com.tizi365.rabbitmq.listener;

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

// Faça com que a classe atual seja gerenciada pelo Spring
@Component
public class DemoListener {
    // Defina um ouvinte e especifique a fila a ser ouvida usando o parâmetro queues
    @RabbitListener(queues = "tizi365.fanout.queue1")
    public void receive1(String msg) {
        System.out.println("Mensagem recebida da fila 1 = " + msg);
    }

    // Defina um ouvinte e especifique a fila a ser ouvida usando o parâmetro queues
    @RabbitListener(queues = "tizi365.fanout.queue2")
    public void receive2(String msg) {
        System.out.println("Mensagem recebida da fila 2 = " + msg);
    }
}

Como o exchange anterior foi definido como tipo fanout, cada mensagem será distribuída para todas as filas vinculadas ao exchange atual, e as mensagens serão tratadas separadamente pelos métodos acima.

Observação: A anotação RabbitListener pode ser aplicada a uma classe ou método. Se a anotação RabbitListener for definida em uma classe, ela precisa ser combinada com a anotação RabbitHandler para marcar qual método da classe executará o tratamento da mensagem.

4.3 Definir Ouvintes de Fila com Anotação Completa

Você não precisa de uma classe de configuração spring boot para definir trocas, filas e relacionamentos de ligação. Você pode definir diretamente os relacionamentos de ligação, filas e trocas através do parâmetro bindings da anotação RabbitListener.

@RabbitListener(
            bindings = {
                    @QueueBinding(
                            value = @Queue(name = "tizi365.fanout.queue3", durable = "true"),
                            exchange = @Exchange(name = "tizi365.fanout", durable = "true", type = ExchangeTypes.FANOUT)
                    )
            }
    )
public void receive3(String msg) {
    System.out.println("Mensagem recebida da fila 3 = " + msg);
}

Explicação:

  • Anotação QueueBinding: Define o relacionamento de ligação entre a fila e o exchange. O parâmetro value é usado para definir a fila e o exchange é usado para definir o exchange.
  • Anotação Queue: Define uma fila. O parâmetro name define o nome da fila (que precisa ser único) e o parâmetro durable indica se ela precisa ser durável.
  • Anotação Exchange: Define um exchange. O parâmetro name define o nome do exchange e o parâmetro type indica o tipo de exchange.