Modèle de diffusion / abonnement RabbitMQ (mode de diffusion, mode fanout)

Le type d'échange FanoutExchange est utilisé pour le modèle de diffusion / abonnement dans RabbitMQ, où un message envoyé par un producteur sera traité par plusieurs files de consommateurs. L'architecture est illustrée dans le diagramme suivant :

Mode de fonctionnement de RabbitMQ

L'échange Fanout peut transférer des messages à toutes les files liées.

Astuce : Peu importe le type de mode de fonctionnement RabbitMQ utilisé, la différence réside dans le type d'échange et les paramètres de routage utilisés.

1. Tutoriels préalables

Veuillez lire d'abord les sections suivantes pour comprendre les connaissances pertinentes :

2. Définition de l'échange Fanout

En Spring AMQP, la classe FanoutExchange correspond à l'échange Fanout. Nous définissons l'échange à travers une classe de configuration Spring Boot.

package com.tizi365.rabbitmq.config;

import org.springframework.amqp.core.FanoutExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class QueueConfig {
	// Définir l'échange
    @Bean
    public FanoutExchange fanout() {
        // Le paramètre est le nom de l'échange, qui doit être unique
        return new FanoutExchange("tizi365.fanout");
    }
}

Astuce : Les producteurs et les consommateurs de messages nécessitent tous deux un échange.

3. Envoi de messages

Nous envoyons des messages à l'échange, qui livrera les messages aux files correspondantes en fonction des règles de routage.

package com.tizi365.rabbitmq.service;

import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;

@Service
public class SendService {
    @Autowired
    private RabbitTemplate template;
    @Autowired
    private FanoutExchange fanout;

	// Dans un but de démonstration, une tâche planifiée est utilisée pour envoyer un message toutes les secondes
    @Scheduled(fixedDelay = 1000, initialDelay = 1000)
    public void send() {
        // Contenu du message
        String message = "Bonjour le monde !";
        // Envoyer le message
        // Le premier paramètre est le nom de l'échange
        // Le deuxième paramètre est la clé de routage ; l'échange fanout ignorera la clé de routage, donc elle n'a pas besoin d'être définie
        // Le troisième paramètre est le contenu du message, qui prend en charge tout type tant qu'il peut être sérialisé
        template.convertAndSend(fanout.getName(), "", message);
        System.out.println("Message envoyé : '" + message + "'");
    }
}

4. Réception des messages

4.1 Définir les files d'attente et lier l'échange

Pour consommer des messages de file, vous devez d'abord définir une file, puis lier la file à l'échange cible. Ci-dessous, nous définissons deux files et les lions au même échange.

package com.tizi365.rabbitmq.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class QueueConfig {
    @Bean
    public FanoutExchange fanout() {
        // Définir l'échange
        // Le paramètre est le nom de l'échange, qui doit être unique
        return new FanoutExchange("tizi365.fanout");
    }

    @Bean
    public Queue queue1() {
        // Définir la file 1
        return new Queue("tizi365.fanout.queue1");
    }

    @Bean
    public Queue queue2() {
        // Définir la file 2
        return new Queue("tizi365.fanout.queue2");
    }

    @Bean
    public Binding binding1(FanoutExchange fanout, Queue queue1) {
        // Définir une relation de liaison, pour lier la file 1 à l'échange fanout
        return BindingBuilder.bind(queue1).to(fanout);
    }

    @Bean
    public Binding binding2(FanoutExchange fanout, Queue queue2) {
        // Définir une relation de liaison, pour lier la file 2 à l'échange fanout
        return BindingBuilder.bind(queue2).to(fanout);
    }
}

4.2 Définir les écouteurs de file d'attente

Définissez les écouteurs de message à l'aide de l'annotation RabbitListener pour consommer des messages à partir de files d'attente spécifiques.

package com.tizi365.rabbitmq.listener;

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

// Rendre la classe actuelle gérée par Spring
@Component
public class DemoListener {
    // Définir un écouteur et spécifier la file d'attente à écouter en utilisant le paramètre queues
    @RabbitListener(queues = "tizi365.fanout.queue1")
    public void receive1(String msg) {
        System.out.println("Message reçu de la file d'attente 1 = " + msg);
    }

    // Définir un écouteur et spécifier la file d'attente à écouter en utilisant le paramètre queues
    @RabbitListener(queues = "tizi365.fanout.queue2")
    public void receive2(String msg) {
        System.out.println("Message reçu de la file d'attente 2 = " + msg);
    }
}

Étant donné que l'échange précédent a été défini comme de type fanout, chaque message sera distribué à toutes les files d'attente liées à l'échange actuel, et les messages seront traités séparément par les méthodes ci-dessus.

Remarque : L'annotation RabbitListener peut être appliquée à une classe ou une méthode. Si l'annotation RabbitListener est définie sur une classe, elle doit être combinée avec l'annotation RabbitHandler pour indiquer quelle méthode de classe exécutera le traitement du message.

4.3 Définir les écouteurs de file d'attente avec une annotation complète

Vous n'avez pas besoin de la classe de configuration spring boot pour définir les échanges, les files d'attente et les relations de liaison. Vous pouvez directement définir les relations de liaison, les files d'attente et les échanges via le paramètre de liaisons de l'annotation RabbitListener.

@RabbitListener(
            bindings = {
                    @QueueBinding(
                            value = @Queue(name = "tizi365.fanout.queue3", durable = "true"),
                            exchange = @Exchange(name = "tizi365.fanout", durable = "true", type = ExchangeTypes.FANOUT)
                    )
            }
    )
public void receive3(String msg) {
    System.out.println("Message reçu de la file d'attente 3 = " + msg);
}

Explication :

  • Annotation QueueBinding : Définit la relation de liaison entre la file d'attente et l'échange. Le paramètre value est utilisé pour définir la file d'attente, et l'échange est utilisé pour définir l'échange.
  • Annotation Queue : Définit une file d'attente. Le paramètre name définit le nom de la file d'attente (qui doit être unique), et le paramètre durable indique s'il doit être durable.
  • Annotation Exchange : Définit un échange. Le paramètre name définit le nom de l'échange, et le paramètre type indique le type d'échange.