Java RabbitMQ Publish/Subscribe Pattern (Broadcast Mode, Fanout Mode)

The FanoutExchange exchange type is used for the publish/subscribe pattern in RabbitMQ, where a message sent by a producer will be processed by multiple consumer queues. The architecture is as shown in the following diagram:

RabbitMQ Work Mode

The Fanout exchange can forward messages to all bound queues.

Tip: Regardless of the RabbitMQ work mode used, the difference lies in the exchange type and routing parameters used.

1. Prerequisite Tutorials

Please read the following sections first to understand the relevant knowledge:

2. Defining the Fanout Exchange

In Spring AMQP, the FanoutExchange class corresponds to the Fanout exchange. We define the exchange through a Spring Boot configuration class.

package com.tizi365.rabbitmq.config;

import org.springframework.amqp.core.FanoutExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class QueueConfig {
    // Define the exchange
    @Bean
    public FanoutExchange fanout() {
        // The parameter is the exchange name, which must be unique
        return new FanoutExchange("tizi365.fanout");
    }
}

Tip: Both message producers and consumers require an exchange.

3. Sending Messages

We send messages to the exchange, which will deliver the messages to the corresponding queues based on the routing rules.

package com.tizi365.rabbitmq.service;

import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;

@Service
public class SendService {
    @Autowired
    private RabbitTemplate template;
    @Autowired
    private FanoutExchange fanout;

    // For demonstration purposes, a scheduled task is used to send a message every second
    @Scheduled(fixedDelay = 1000, initialDelay = 1000)
    public void send() {
        // Message content
        String message = "Hello World!";
        // Send message
        // The first parameter is the exchange name
        // The second parameter is the routing key; fanout exchange will ignore the routing key, so it does not need to be set
        // The third parameter is the message content, which supports any type as long as it can be serialized
        template.convertAndSend(fanout.getName(), "", message);
        System.out.println("Message sent: '" + message + "'");
    }
}

4. Receiving Messages

4.1 Define Queues & Bind Exchange

To consume queue messages, you need to first define a queue and then bind the queue to the target exchange.
Below, we define two queues and bind them to the same exchange.

package com.tizi365.rabbitmq.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class QueueConfig {
    @Bean
    public FanoutExchange fanout() {
        // Define the exchange
        // The parameter is the exchange name, which must be unique
        return new FanoutExchange("tizi365.fanout");
    }

    @Bean
    public Queue queue1() {
        // Define queue 1
        return new Queue("tizi365.fanout.queue1");
    }

    @Bean
    public Queue queue2() {
        // Define queue 2
        return new Queue("tizi365.fanout.queue2");
    }

    @Bean
    public Binding binding1(FanoutExchange fanout, Queue queue1) {
        // Define a binding relationship, to bind queue 1 to the fanout exchange
        return BindingBuilder.bind(queue1).to(fanout);
    }

    @Bean
    public Binding binding2(FanoutExchange fanout, Queue queue2) {
        // Define a binding relationship, to bind queue 2 to the fanout exchange
        return BindingBuilder.bind(queue2).to(fanout);
    }
}

4.2 Define Queue Listeners

Define message listeners through the RabbitListener annotation to consume messages from specific queues.

package com.tizi365.rabbitmq.listener;

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

// Make the current class managed by Spring
@Component
public class DemoListener {
    // Define a listener, and specify which queue to listen to using the queues parameter
    @RabbitListener(queues = "tizi365.fanout.queue1")
    public void receive1(String msg) {
        System.out.println("Received message from queue 1 = " + msg);
    }

    // Define a listener, and specify which queue to listen to using the queues parameter
    @RabbitListener(queues = "tizi365.fanout.queue2")
    public void receive2(String msg) {
        System.out.println("Received message from queue 2 = " + msg);
    }
}

Because the earlier exchange was defined as fanout type, each message will be distributed to all queues bound to the current exchange, and the messages will be handled separately by the above two methods.

Note: The RabbitListener annotation can be applied to a class or method. If the RabbitListener annotation is defined on a class, it needs to be combined with the RabbitHandler annotation to mark which class method will execute the message handling.

4.3 Define Queue Listeners with Full Annotation

You don’t need the spring boot configuration class to define exchanges, queues, and binding relationships.
You can directly define the binding relationships, queues, and exchanges through the bindings parameter of the RabbitListener annotation.

@RabbitListener(
            bindings = {
                    @QueueBinding(
                            value = @Queue(name = "tizi365.fanout.queue3", durable = "true"),
                            exchange = @Exchange(name = "tizi365.fanout", durable = "true", type = ExchangeTypes.FANOUT)
                    )
            }
    )
public void receive3(String msg) {
    System.out.println("Received message from queue 3 = " + msg);
}

Explanation:

  • QueueBinding annotation: Defines the binding relationship between the queue and the exchange. The value parameter is used to define the queue, and the exchange is used to define the exchange.
  • Queue annotation: Defines a queue. The name parameter defines the queue name (which needs to be unique), and the durable parameter indicates whether it needs to be durable.
  • Exchange annotation: Defines an exchange. The name parameter defines the exchange name, and the type parameter indicates the exchange type.