Java RabbitMQ Publish/Subscribe Pattern (Broadcast-Modus, Fanout-Modus)
Der FanoutExchange-Austauschtyp wird im RabbitMQ für das Publish/Subscribe-Muster verwendet, bei dem eine vom Produzenten gesendete Nachricht von mehreren Verbraucherwarteschlangen verarbeitet wird. Die Architektur ist in folgendem Diagramm dargestellt:
Der Fanout-Austausch kann Nachrichten an alle gebundenen Warteschlangen weiterleiten.
Tipp: Unabhängig vom verwendeten RabbitMQ-Arbeitsmodus liegt der Unterschied im Austauschtyp und den verwendeten Routing-Parametern.
1. Voraussetzungstutorials
Bitte lesen Sie zuerst die folgenden Abschnitte, um das relevante Wissen zu verstehen:
- Grundkonzepte von RabbitMQ
- RabbitMQ Publish/Subscribe Muster
- RabbitMQ Java Schnellstartanleitung (obligatorisch, da nachfolgende Abschnitte keinen Code duplizieren, sondern nur wichtige Code-Schnipsel zeigen)
2. Definition des Fanout-Austauschs
In Spring AMQP entspricht die Klasse FanoutExchange dem Fanout-Austausch. Wir definieren den Austausch über eine Spring Boot-Konfigurationsklasse.
package com.tizi365.rabbitmq.config;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class QueueConfig {
// Austausch definieren
@Bean
public FanoutExchange fanout() {
// Der Parameter ist der Austauschname, der eindeutig sein muss
return new FanoutExchange("tizi365.fanout");
}
}
Tipp: Sowohl Nachrichtenproduzenten als auch Verbraucher benötigen einen Austausch.
3. Senden von Nachrichten
Wir senden Nachrichten an den Austausch, der die Nachrichten basierend auf den Routingregeln an die entsprechenden Warteschlangen liefert.
package com.tizi365.rabbitmq.service;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
@Service
public class SendService {
@Autowired
private RabbitTemplate template;
@Autowired
private FanoutExchange fanout;
// Für Demonstrationszwecke wird eine geplante Aufgabe verwendet, um alle Sekunde eine Nachricht zu senden
@Scheduled(fixedDelay = 1000, initialDelay = 1000)
public void send() {
// Nachrichteninhalt
String nachricht = "Hallo Welt!";
// Nachricht senden
// Der erste Parameter ist der Austauschname
// Der zweite Parameter ist der Routing-Schlüssel; Fanout-Austausch ignoriert den Routing-Schlüssel, daher muss er nicht festgelegt werden
// Der dritte Parameter ist der Nachrichteninhalt, der jeden Typ unterstützt, solange er serialisierbar ist
template.convertAndSend(fanout.getName(), "", nachricht);
System.out.println("Nachricht gesendet: '" + nachricht + "'");
}
}
4. Empfangen von Nachrichten
4.1 Definition von Warteschlangen & Bindung des Austauschs
Um Warteschlangennachrichten zu konsumieren, müssen Sie zuerst eine Warteschlange definieren und dann die Warteschlange an den Ziel-Austausch binden. Im Folgenden definieren wir zwei Warteschlangen und binden sie an denselben Austausch.
package com.tizi365.rabbitmq.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class QueueConfig {
@Bean
public FanoutExchange fanout() {
// Austausch definieren
// Der Parameter ist der Austauschname, der eindeutig sein muss
return new FanoutExchange("tizi365.fanout");
}
@Bean
public Queue queue1() {
// Warteschlange 1 definieren
return new Queue("tizi365.fanout.queue1");
}
@Bean
public Queue queue2() {
// Warteschlange 2 definieren
return new Queue("tizi365.fanout.queue2");
}
@Bean
public Binding binding1(FanoutExchange fanout, Queue queue1) {
// Bindungsbeziehung definieren, um die Warteschlange 1 an den Fanout-Austausch zu binden
return BindingBuilder.bind(queue1).to(fanout);
}
@Bean
public Binding binding2(FanoutExchange fanout, Queue queue2) {
// Bindungsbeziehung definieren, um die Warteschlange 2 an den Fanout-Austausch zu binden
return BindingBuilder.bind(queue2).to(fanout);
}
}
4.2 Queue-Listener definieren
Definieren Sie Nachrichtenempfänger mithilfe der RabbitListener-Annotation, um Nachrichten von bestimmten Warteschlangen zu verarbeiten.
package com.tizi365.rabbitmq.listener;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
// Lassen Sie die aktuelle Klasse von Spring verwalten
@Component
public class DemoListener {
// Definieren eines Empfängers und geben Sie mit dem queues-Parameter an, auf welche Warteschlange zugehört werden soll
@RabbitListener(queues = "tizi365.fanout.queue1")
public void receive1(String msg) {
System.out.println("Nachricht von Warteschlange 1 empfangen = " + msg);
}
// Definieren eines Empfänger und geben Sie mit dem queues-Parameter an, auf welche Warteschlange zugehört werden soll
@RabbitListener(queues = "tizi365.fanout.queue2")
public void receive2(String msg) {
System.out.println("Nachricht von Warteschlange 2 empfangen = " + msg);
}
}
Da der vorherige Austausch als Fanout-Typ definiert wurde, wird jede Nachricht an alle an den aktuellen Austausch gebundenen Warteschlangen verteilt, und die Nachrichten werden separat von den obigen beiden Methoden verarbeitet.
Hinweis: Die RabbitListener-Annotation kann auf eine Klasse oder Methode angewendet werden. Wenn die RabbitListener-Annotation auf eine Klasse angewendet wird, muss sie mit der RabbitHandler-Annotation kombiniert werden, um zu kennzeichnen, welche Klassenmethode die Nachrichtenverarbeitung ausführt.
4.3 Queue-Listener mit vollständiger Annotation definieren
Sie benötigen keine Spring Boot Konfigurationsklasse, um Austausche, Warteschlangen und Bindungsbeziehungen zu definieren. Sie können die Bindungsbeziehungen, Warteschlangen und Austausche direkt über den bindings-Parameter der RabbitListener-Annotation definieren.
@RabbitListener(
bindings = {
@QueueBinding(
value = @Queue(name = "tizi365.fanout.queue3", durable = "true"),
exchange = @Exchange(name = "tizi365.fanout", durable = "true", type = ExchangeTypes.FANOUT)
)
}
)
public void receive3(String msg) {
System.out.println("Nachricht von Warteschlange 3 empfangen = " + msg);
}
Erklärung:
- QueueBinding-Annotation: Definiert die Bindungsbeziehung zwischen der Warteschlange und dem Austausch. Der Wertparameter wird verwendet, um die Warteschlange zu definieren, und der Austausch wird definiert.
- Queue-Annotation: Definiert eine Warteschlange. Der Name-Parameter definiert den Warteschlangennamen (der eindeutig sein muss), und der durable-Parameter gibt an, ob er dauerhaft sein muss.
- Exchange-Annotation: Definiert einen Austausch. Der Name-Parameter definiert den Austauschnamen, und der Typ-Parameter gibt den Austauschtyp an.