Modèle de diffusion / abonnement RabbitMQ (mode de diffusion, mode fanout)
Le type d'échange FanoutExchange est utilisé pour le modèle de diffusion / abonnement dans RabbitMQ, où un message envoyé par un producteur sera traité par plusieurs files de consommateurs. L'architecture est illustrée dans le diagramme suivant :
L'échange Fanout peut transférer des messages à toutes les files liées.
Astuce : Peu importe le type de mode de fonctionnement RabbitMQ utilisé, la différence réside dans le type d'échange et les paramètres de routage utilisés.
1. Tutoriels préalables
Veuillez lire d'abord les sections suivantes pour comprendre les connaissances pertinentes :
- Concepts de base de RabbitMQ
- Modèle de diffusion / abonnement RabbitMQ
- Guide de démarrage rapide de RabbitMQ en Java (obligatoire, car les sections suivantes ne dupliqueront pas le code, mais montreront uniquement des extraits de code clés)
2. Définition de l'échange Fanout
En Spring AMQP, la classe FanoutExchange correspond à l'échange Fanout. Nous définissons l'échange à travers une classe de configuration Spring Boot.
package com.tizi365.rabbitmq.config;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class QueueConfig {
// Définir l'échange
@Bean
public FanoutExchange fanout() {
// Le paramètre est le nom de l'échange, qui doit être unique
return new FanoutExchange("tizi365.fanout");
}
}
Astuce : Les producteurs et les consommateurs de messages nécessitent tous deux un échange.
3. Envoi de messages
Nous envoyons des messages à l'échange, qui livrera les messages aux files correspondantes en fonction des règles de routage.
package com.tizi365.rabbitmq.service;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
@Service
public class SendService {
@Autowired
private RabbitTemplate template;
@Autowired
private FanoutExchange fanout;
// Dans un but de démonstration, une tâche planifiée est utilisée pour envoyer un message toutes les secondes
@Scheduled(fixedDelay = 1000, initialDelay = 1000)
public void send() {
// Contenu du message
String message = "Bonjour le monde !";
// Envoyer le message
// Le premier paramètre est le nom de l'échange
// Le deuxième paramètre est la clé de routage ; l'échange fanout ignorera la clé de routage, donc elle n'a pas besoin d'être définie
// Le troisième paramètre est le contenu du message, qui prend en charge tout type tant qu'il peut être sérialisé
template.convertAndSend(fanout.getName(), "", message);
System.out.println("Message envoyé : '" + message + "'");
}
}
4. Réception des messages
4.1 Définir les files d'attente et lier l'échange
Pour consommer des messages de file, vous devez d'abord définir une file, puis lier la file à l'échange cible. Ci-dessous, nous définissons deux files et les lions au même échange.
package com.tizi365.rabbitmq.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class QueueConfig {
@Bean
public FanoutExchange fanout() {
// Définir l'échange
// Le paramètre est le nom de l'échange, qui doit être unique
return new FanoutExchange("tizi365.fanout");
}
@Bean
public Queue queue1() {
// Définir la file 1
return new Queue("tizi365.fanout.queue1");
}
@Bean
public Queue queue2() {
// Définir la file 2
return new Queue("tizi365.fanout.queue2");
}
@Bean
public Binding binding1(FanoutExchange fanout, Queue queue1) {
// Définir une relation de liaison, pour lier la file 1 à l'échange fanout
return BindingBuilder.bind(queue1).to(fanout);
}
@Bean
public Binding binding2(FanoutExchange fanout, Queue queue2) {
// Définir une relation de liaison, pour lier la file 2 à l'échange fanout
return BindingBuilder.bind(queue2).to(fanout);
}
}
4.2 Définir les écouteurs de file d'attente
Définissez les écouteurs de message à l'aide de l'annotation RabbitListener pour consommer des messages à partir de files d'attente spécifiques.
package com.tizi365.rabbitmq.listener;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
// Rendre la classe actuelle gérée par Spring
@Component
public class DemoListener {
// Définir un écouteur et spécifier la file d'attente à écouter en utilisant le paramètre queues
@RabbitListener(queues = "tizi365.fanout.queue1")
public void receive1(String msg) {
System.out.println("Message reçu de la file d'attente 1 = " + msg);
}
// Définir un écouteur et spécifier la file d'attente à écouter en utilisant le paramètre queues
@RabbitListener(queues = "tizi365.fanout.queue2")
public void receive2(String msg) {
System.out.println("Message reçu de la file d'attente 2 = " + msg);
}
}
Étant donné que l'échange précédent a été défini comme de type fanout, chaque message sera distribué à toutes les files d'attente liées à l'échange actuel, et les messages seront traités séparément par les méthodes ci-dessus.
Remarque : L'annotation RabbitListener peut être appliquée à une classe ou une méthode. Si l'annotation RabbitListener est définie sur une classe, elle doit être combinée avec l'annotation RabbitHandler pour indiquer quelle méthode de classe exécutera le traitement du message.
4.3 Définir les écouteurs de file d'attente avec une annotation complète
Vous n'avez pas besoin de la classe de configuration spring boot pour définir les échanges, les files d'attente et les relations de liaison. Vous pouvez directement définir les relations de liaison, les files d'attente et les échanges via le paramètre de liaisons de l'annotation RabbitListener.
@RabbitListener(
bindings = {
@QueueBinding(
value = @Queue(name = "tizi365.fanout.queue3", durable = "true"),
exchange = @Exchange(name = "tizi365.fanout", durable = "true", type = ExchangeTypes.FANOUT)
)
}
)
public void receive3(String msg) {
System.out.println("Message reçu de la file d'attente 3 = " + msg);
}
Explication :
- Annotation QueueBinding : Définit la relation de liaison entre la file d'attente et l'échange. Le paramètre value est utilisé pour définir la file d'attente, et l'échange est utilisé pour définir l'échange.
- Annotation Queue : Définit une file d'attente. Le paramètre name définit le nom de la file d'attente (qui doit être unique), et le paramètre durable indique s'il doit être durable.
- Annotation Exchange : Définit un échange. Le paramètre name définit le nom de l'échange, et le paramètre type indique le type d'échange.