Das Routingmuster (direkter Modus) von RabbitMQ in Java verwendet den Exchange-Typ DirectExchange. Der Unterschied zum Publish-Subscribe-Muster besteht darin, dass der Direct Exchange Nachrichten an Warteschlangen mit vollständig passenden Routing-Parametern liefert. Die Architektur ist wie im folgenden Bild dargestellt:
Hinweis: Unabhängig vom verwendeten Arbeitsmodus von RabbitMQ liegt der Unterschied im verwendeten Exchange-Typ und den Routing-Parametern.
1. Voraussetzungstutorials
Bitte lesen Sie die folgenden Abschnitte, um das damit verbundene Wissen zu verstehen:
- Grundkonzepte von RabbitMQ
- Arbeitsmodus-Prinzip des Routing-Modus von RabbitMQ
- Schnellstart für Java mit RabbitMQ (ein Muss, da nachfolgende Abschnitte keinen Code duplizieren, sondern nur den Schlüsselcode anzeigen)
- Publish-Subscribe-Muster für Java mit RabbitMQ (ein Muss, da die Code-Struktur fast identisch ist, nur der Exchange-Typ und die Routing-Parameter unterscheiden sich)
2. Definition des Direct Exchange
Im Spring AMQP ist die Klasse, die dem Direct Exchange entspricht, DirectExchange. Wir definieren den Exchange über eine Spring Boot-Konfigurationsklasse.
package com.tizi365.rabbitmq.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class QueueConfig {
@Bean
public DirectExchange direct() {
// Definiere den Exchange
// Der Parameter ist der Exchange-Name, der eindeutig sein muss
return new DirectExchange("tizi365.direct");
}
}
Hinweis: Sowohl der Message-Produzent als auch der -Konsument benötigen einen Exchange.
3. Senden von Nachrichten
Wir senden Nachrichten an den Exchange, der die Nachrichten dann basierend auf den Routing-Regeln an die entsprechenden Warteschlangen liefert.
package com.tizi365.rabbitmq.service;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
@Service
public class SendService {
@Autowired
private RabbitTemplate template;
@Autowired
private DirectExchange direct;
// Zur Demonstration verwenden wir eine geplante Aufgabe, um jede Sekunde eine Nachricht zu senden
@Scheduled(fixedDelay = 1000, initialDelay = 1000)
public void send() {
// Nachrichteninhalt
String message = "Hallo Welt!";
// Sende die Nachricht
// Der erste Parameter ist der Exchange-Name
// Der zweite Parameter ist der Routing-Key. Mit einem direkten Exchange wird die Nachricht an die Warteschlange geliefert, deren Routing-Key "tizi365" übereinstimmt
// Der dritte Parameter ist der Nachrichteninhalt, der jeden Typ unterstützt, solange er serialisierbar ist
template.convertAndSend(direct.getName(), "tizi365", message);
System.out.println("Nachricht gesendet '" + message + "'");
}
}
Hinweis: Beachten Sie den zweiten Parameter in der convertAndSend-Methode, da dieser ein wichtiger Parameter ist.
4. Nachrichten empfangen
4.1 Definition von Warteschlangen und Binden von Exchanges
Um Nachrichten von einer Warteschlange zu konsumieren, müssen Sie zunächst eine Warteschlange definieren und diese dann an den Ziel-Exchange binden. Der folgende Code definiert zwei Warteschlangen und bindet sie an denselben Exchange:
package com.tizi365.rabbitmq.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class QueueConfig {
@Bean
public DirectExchange direct() {
// Definiere den Exchange
// Der Parameter ist der Exchange-Name, der eindeutig sein muss
return new DirectExchange("tizi365.direct");
}
@Bean
public Queue queue1() {
// Definiere Warteschlange 1
return new Queue("tizi365.direct.queue1");
}
@Bean
public Queue queue2() {
// Definiere Warteschlange 2
return new Queue("tizi365.direct.queue2");
}
@Bean
public Binding binding1(DirectExchange direct, Queue queue1) {
// Definiere eine Bindung, um die Warteschlange 1 mit dem Direct-Exchange mit einem Routing-Key von "tizi365" zu verbinden
// Wenn der Routing-Key "tizi365" übereinstimmt, sendet der Exchange Nachrichten an Warteschlange 1
return BindingBuilder.bind(queue1).to(direct).with("tizi365");
}
@Bean
public Binding binding2(DirectExchange direct, Queue queue2) {
// Definiere eine Bindung, um die Warteschlange 2 mit dem Direct-Exchange mit einem Routing-Key von "baidu" zu verbinden
// Wenn der Routing-Key "baidu" übereinstimmt, sendet der Exchange Nachrichten an Warteschlange 2
return BindingBuilder.bind(queue2).to(direct).with("baidu");
}
}
4.2 Definition von Warteschlangen-Listenern
Definieren Sie Nachrichtenlistener mithilfe der RabbitListener-Annotation, um Nachrichten von bestimmten Warteschlangen zu konsumieren:
package com.tizi365.rabbitmq.listener;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;
// Lassen Sie Spring die aktuelle Klasse verwalten
@Component
public class DemoListener {
// Definiere einen Listener zum Konsumieren von Nachrichten von Warteschlange 1
@RabbitListener(queues = "tizi365.direct.queue1")
public void receive1(String msg) {
System.out.println("Nachricht von Warteschlange 1 erhalten: " + msg);
}
// Definiere einen Listener zum Konsumieren von Nachrichten von Warteschlange 2
@RabbitListener(queues = "tizi365.direct.queue2")
public void receive2(String msg) {
System.out.println("Nachricht von Warteschlange 2 erhalten: " + msg);
}
}
Da nur Warteschlange 1 einen Routing-Key von "tizi365" beim Binden an den Exchange hat, kann nur Warteschlange 1 Nachrichten empfangen. Warteschlange 2 empfängt keine Nachrichten, da der Routing-Key nicht übereinstimmt.