Le modèle de routage RabbitMQ (mode Direct) en Java utilise le type d'échange DirectExchange. La différence avec le modèle publish-subscribe est que l'échange direct livre les messages aux files d'attente avec des paramètres de routage correspondant entièrement. L'architecture est comme indiqué dans l'image ci-dessous :
Astuce : Quel que soit le mode de fonctionnement de RabbitMQ utilisé, la différence réside dans le type d'échange utilisé et les paramètres de routage.
1. Tutoriels préalables
Veuillez lire les sections suivantes pour comprendre les connaissances connexes :
- RabbitMQ - Concepts de base
- RabbitMQ - Principe du mode de routage
- Démarrage rapide pour Java avec RabbitMQ (à lire absolument, car les sections suivantes ne dupliqueront pas le code, mais afficheront uniquement le code clé)
- Modèle Publish-Subscribe pour Java avec RabbitMQ (à lire absolument, car la structure du code est presque la même, seuls le type d'échange et les paramètres de routage diffèrent)
2. Définir l'échange direct
Dans Spring AMQP, la classe correspondant à l'échange direct est DirectExchange. Nous définissons l'échange via une classe de configuration Spring Boot.
package com.tizi365.rabbitmq.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class QueueConfig {
@Bean
public DirectExchange direct() {
// Définir l'échange
// Le paramètre est le nom de l'échange, qui doit être unique
return new DirectExchange("tizi365.direct");
}
}
Astuce : Le producteur et le consommateur de messages nécessitent tous deux un échange.
3. Envoyer des messages
Nous envoyons des messages à l'échange, qui les livre ensuite aux files d'attente correspondantes en fonction des règles de routage.
package com.tizi365.rabbitmq.service;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
@Service
public class SendService {
@Autowired
private RabbitTemplate template;
@Autowired
private DirectExchange direct;
// Pour la démonstration, nous utilisons une tâche planifiée pour envoyer un message toutes les secondes
@Scheduled(fixedDelay = 1000, initialDelay = 1000)
public void send() {
// Contenu du message
String message = "Bonjour le monde !";
// Envoyer le message
// Le premier paramètre est le nom de l'échange
// Le deuxième paramètre est la clé de routage. Avec un échange direct, le message est livré à la file d'attente dont la clé de routage correspond à "tizi365"
// Le troisième paramètre est le contenu du message, qui prend en charge tout type tant qu'il peut être sérialisé
template.convertAndSend(direct.getName(), "tizi365", message);
System.out.println("Message envoyé : '" + message + "'");
}
}
Astuce : Faites attention au deuxième paramètre dans la méthode convertAndSend, car il s'agit d'un paramètre critique.
4. Recevoir des messages
4.1 Définir les files d'attente et lier l'échange
Pour consommer des messages d'une file d'attente, vous devez d'abord définir une file d'attente, puis la lier à l'échange cible. Le code suivant définit deux files d'attente et les lie au même échange :
package com.tizi365.rabbitmq.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class QueueConfig {
@Bean
public DirectExchange direct() {
// Définir l'échange
// Le paramètre est le nom de l'échange, qui doit être unique
return new DirectExchange("tizi365.direct");
}
@Bean
public Queue queue1() {
// Définir la file d'attente 1
return new Queue("tizi365.direct.queue1");
}
@Bean
public Queue queue2() {
// Définir la file d'attente 2
return new Queue("tizi365.direct.queue2");
}
@Bean
public Binding binding1(DirectExchange direct, Queue queue1) {
// Définir une liaison pour lier la file d'attente 1 à l'échange direct avec une clé de routage "tizi365"
// Lorsque la clé de routage correspond à "tizi365", l'échange livrera les messages à la file d'attente 1
return BindingBuilder.bind(queue1).to(direct).with("tizi365");
}
@Bean
public Binding binding2(DirectExchange direct, Queue queue2) {
// Définir une liaison pour lier la file d'attente 2 à l'échange direct avec une clé de routage "baidu"
// Lorsque la clé de routage correspond à "baidu", l'échange livrera les messages à la file d'attente 2
return BindingBuilder.bind(queue2).to(direct).with("baidu");
}
}
4.2 Définir les écouteurs de file d'attente
Définir les écouteurs de message en utilisant l'annotation RabbitListener pour consommer des messages à partir de files d'attente spécifiques :
package com.tizi365.rabbitmq.listener;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;
// Laissez Spring gérer la classe actuelle
@Component
public class DemoListener {
// Définir un écouteur pour consommer les messages de la file d'attente 1
@RabbitListener(queues = "tizi365.direct.queue1")
public void receive1(String msg) {
System.out.println("Message reçu de la file d'attente 1 : " + msg);
}
// Définir un écouteur pour consommer les messages de la file d'attente 2
@RabbitListener(queues = "tizi365.direct.queue2")
public void receive2(String msg) {
System.out.println("Message reçu de la file d'attente 2 : " + msg);
}
}
Étant donné que seule la file d'attente 1 a une clé de routage "tizi365" lors de la liaison à l'échange, seule la file d'attente 1 peut recevoir des messages. La file d'attente 2, car la clé de routage ne correspond pas, ne recevra aucun message.