Das Java RabbitMQ-Thema Muster, das den Typ TopicExchange verwendet, unterscheidet sich vom Routing-Muster (Direct) darin, dass die Routing-Parameter eine ungenaue Übereinstimmung unterstützen. Da die Routing-Übereinstimmung flexibler ist, ist dies ein häufiger verwendetes Muster. Die Architektur ist wie im folgenden Diagramm dargestellt:
Hinweis: Unabhängig davon, welches RabbitMQ-Arbeitsmuster verwendet wird, liegt der Unterschied im Typ des verwendeten Austauschs und den Routing-Parametern.
1. Voraussetzung Tutorial
Bitte lesen Sie zuerst die folgenden Abschnitte, um das relevante Wissen zu verstehen:
- Grundkonzepte von RabbitMQ
- RabbitMQ Thema-Muster Prinzipien
- Schnellstart für Java mit RabbitMQ (Ein Muss, da die nachfolgenden Abschnitte den Code nicht wiederholen, sondern nur den Schlüsselcode anzeigen)
- Java RabbitMQ Publish-Subscribe Muster Abschnitt (Ein Muss, da die Codesyntax fast identisch ist, nur der Austauschtyp und die Routing-Parameter unterschiedlich sind)
2. Definition des Topic Exchange
In Spring AMQP ist die Klasse, die dem Direktaustausch entspricht, der TopicExchange. Wir definieren den Austausch durch eine Spring Boot-Konfigurationsklasse.
package com.tizi365.rabbitmq.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class QueueConfig {
@Bean
public TopicExchange topic() {
// Austausch definieren
// Der Parameter ist der Austauschname, der eindeutig sein muss
return new TopicExchange("tizi365.topic");
}
}
Hinweis: Sowohl Nachrichtenproduzenten als auch -verbraucher benötigen den Austausch.
3. Senden von Nachrichten
Wir senden Nachrichten an den Austausch, und der Austausch liefert die Nachrichten basierend auf den Routingregeln an die entsprechenden Warteschlangen.
package com.tizi365.rabbitmq.service;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
@Service
public class SendService {
@Autowired
private RabbitTemplate template;
@Autowired
private TopicExchange topic;
// Zu Demonstrationszwecken wird hier eine geplante Aufgabe verwendet, um jede Sekunde eine Nachricht zu senden
@Scheduled(fixedDelay = 1000, initialDelay = 1000)
public void send() {
// Nachrichteninhalt
String message = "Hallo Welt!";
// Nachricht senden
// Der erste Parameter ist der Austauschname
// Der zweite Parameter ist der Routing-Key. Der Topic-Austausch liefert Nachrichten an Warteschlangen, die dem Routing-Key entsprechen
// Der dritte Parameter ist der Nachrichteninhalt und unterstützt jeden Typ, solange er serialisierbar ist
template.convertAndSend(topic.getName(), "www.tizi365.com", message);
System.out.println("Nachricht gesendet: '" + message + "'");
}
}
Hinweis: Beachten Sie den zweiten Parameter "www.tizi365.com" in der convertAndSend-Methode, da dies ein entscheidender Parameter ist.
4. Empfangen von Nachrichten
4.1 Definition von Queues & Binden von Exchanges
Um Nachrichten aus Warteschlangen zu konsumieren, müssen Sie zunächst eine Warteschlange definieren und diese dann an die Ziel-Exchange binden. Im Folgenden werden zwei Warteschlangen definiert und an dieselbe Exchange gebunden.
package com.tizi365.rabbitmq.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class QueueConfig {
@Bean
public TopicExchange topic() {
// Definiere die Exchange
// Der Parameter ist der Exchange-Name, der eindeutig sein muss
return new TopicExchange("tizi365.topic");
}
@Bean
public Queue queue1() {
// Definiere Warteschlange 1
return new Queue("tizi365.topic.queue1");
}
@Bean
public Queue queue2() {
// Definiere Warteschlange 2
return new Queue("tizi365.topic.queue2");
}
@Bean
public Binding binding1(TopicExchange topic, Queue queue1) {
// Definiere eine Bindungsbeziehung, binde Warteschlange 1 an die Topic-Exchange mit einem Routing-Key von: *.tizi365.com
return BindingBuilder.bind(queue1).to(topic).with("*.tizi365.com");
}
@Bean
public Binding binding2(TopicExchange topic, Queue queue2) {
// Definiere eine Bindungsbeziehung, binde Warteschlange 2 an die Topic-Exchange mit einem Routing-Key von: *.baidu.com
return BindingBuilder.bind(queue2).to(topic).with("*.baidu.com");
}
}
Tipp: Beim Binden von Warteschlange 1 und Warteschlange 2 an die Exchange werden beide Routing-Keys mit dem * (Stern) Wildcard verwendet, der einem einzelnen Wort entspricht. Wenn dieser zu # (Raute) geändert wird, würde er mehrere Wörter entsprechen.
4.2 Definition von Queue-Listenern
Definieren Sie Nachrichtenlistener mit der RabbitListener-Annotation, um Nachrichten aus bestimmten Warteschlangen zu konsumieren.
package com.tizi365.rabbitmq.listener;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;
// Lassen Sie Spring die aktuelle Klasse verwalten
@Component
public class DemoListener {
// Definiere einen Listener und gib an, welche Warteschlange über den queues-Parameter zugehört werden soll
@RabbitListener(queues = "tizi365.topic.queue1")
public void receive1(String msg) {
System.out.println("Nachricht von Warteschlange 1 empfangen: " + msg);
}
// Definiere einen Listener und gib an, welche Warteschlange über den queues-Parameter zugehört werden soll
@RabbitListener(queues = "tizi365.topic.queue2")
public void receive2(String msg) {
System.out.println("Nachricht von Warteschlange 2 empfangen: " + msg);
}
}
Da nur Warteschlange 1 mit dem Routing-Key *.tizi365.com beim Binden an die Exchange übereinstimmt, werden nur Nachrichten mit dem Routing-Key (www.tizi365.com) von Warteschlange 1 empfangen. Daher wird Warteschlange 2 aufgrund des nicht übereinstimmenden Routing-Keys keine Nachrichten empfangen.