Java RabbitMQ Publish/Subscribe Pattern (Broadcast-Modus, Fanout-Modus)

Der FanoutExchange-Austauschtyp wird im RabbitMQ für das Publish/Subscribe-Muster verwendet, bei dem eine vom Produzenten gesendete Nachricht von mehreren Verbraucherwarteschlangen verarbeitet wird. Die Architektur ist in folgendem Diagramm dargestellt:

RabbitMQ Arbeitsmodus

Der Fanout-Austausch kann Nachrichten an alle gebundenen Warteschlangen weiterleiten.

Tipp: Unabhängig vom verwendeten RabbitMQ-Arbeitsmodus liegt der Unterschied im Austauschtyp und den verwendeten Routing-Parametern.

1. Voraussetzungstutorials

Bitte lesen Sie zuerst die folgenden Abschnitte, um das relevante Wissen zu verstehen:

2. Definition des Fanout-Austauschs

In Spring AMQP entspricht die Klasse FanoutExchange dem Fanout-Austausch. Wir definieren den Austausch über eine Spring Boot-Konfigurationsklasse.

package com.tizi365.rabbitmq.config;

import org.springframework.amqp.core.FanoutExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class QueueConfig {
	// Austausch definieren
    @Bean
    public FanoutExchange fanout() {
        // Der Parameter ist der Austauschname, der eindeutig sein muss
        return new FanoutExchange("tizi365.fanout");
    }
}

Tipp: Sowohl Nachrichtenproduzenten als auch Verbraucher benötigen einen Austausch.

3. Senden von Nachrichten

Wir senden Nachrichten an den Austausch, der die Nachrichten basierend auf den Routingregeln an die entsprechenden Warteschlangen liefert.

package com.tizi365.rabbitmq.service;

import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;

@Service
public class SendService {
    @Autowired
    private RabbitTemplate template;
    @Autowired
    private FanoutExchange fanout;

	// Für Demonstrationszwecke wird eine geplante Aufgabe verwendet, um alle Sekunde eine Nachricht zu senden
    @Scheduled(fixedDelay = 1000, initialDelay = 1000)
    public void send() {
        // Nachrichteninhalt
        String nachricht = "Hallo Welt!";
        // Nachricht senden
        // Der erste Parameter ist der Austauschname
        // Der zweite Parameter ist der Routing-Schlüssel; Fanout-Austausch ignoriert den Routing-Schlüssel, daher muss er nicht festgelegt werden
        // Der dritte Parameter ist der Nachrichteninhalt, der jeden Typ unterstützt, solange er serialisierbar ist
        template.convertAndSend(fanout.getName(), "", nachricht);
        System.out.println("Nachricht gesendet: '" + nachricht + "'");
    }
}

4. Empfangen von Nachrichten

4.1 Definition von Warteschlangen & Bindung des Austauschs

Um Warteschlangennachrichten zu konsumieren, müssen Sie zuerst eine Warteschlange definieren und dann die Warteschlange an den Ziel-Austausch binden. Im Folgenden definieren wir zwei Warteschlangen und binden sie an denselben Austausch.

package com.tizi365.rabbitmq.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class QueueConfig {
    @Bean
    public FanoutExchange fanout() {
        // Austausch definieren
        // Der Parameter ist der Austauschname, der eindeutig sein muss
        return new FanoutExchange("tizi365.fanout");
    }

    @Bean
    public Queue queue1() {
        // Warteschlange 1 definieren
        return new Queue("tizi365.fanout.queue1");
    }

    @Bean
    public Queue queue2() {
        // Warteschlange 2 definieren
        return new Queue("tizi365.fanout.queue2");
    }

    @Bean
    public Binding binding1(FanoutExchange fanout, Queue queue1) {
        // Bindungsbeziehung definieren, um die Warteschlange 1 an den Fanout-Austausch zu binden
        return BindingBuilder.bind(queue1).to(fanout);
    }

    @Bean
    public Binding binding2(FanoutExchange fanout, Queue queue2) {
        // Bindungsbeziehung definieren, um die Warteschlange 2 an den Fanout-Austausch zu binden
        return BindingBuilder.bind(queue2).to(fanout);
    }
}

4.2 Queue-Listener definieren

Definieren Sie Nachrichtenempfänger mithilfe der RabbitListener-Annotation, um Nachrichten von bestimmten Warteschlangen zu verarbeiten.

package com.tizi365.rabbitmq.listener;

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

// Lassen Sie die aktuelle Klasse von Spring verwalten
@Component
public class DemoListener {
    // Definieren eines Empfängers und geben Sie mit dem queues-Parameter an, auf welche Warteschlange zugehört werden soll
    @RabbitListener(queues = "tizi365.fanout.queue1")
    public void receive1(String msg) {
        System.out.println("Nachricht von Warteschlange 1 empfangen = " + msg);
    }

    // Definieren eines Empfänger und geben Sie mit dem queues-Parameter an, auf welche Warteschlange zugehört werden soll
    @RabbitListener(queues = "tizi365.fanout.queue2")
    public void receive2(String msg) {
        System.out.println("Nachricht von Warteschlange 2 empfangen = " + msg);
    }
}

Da der vorherige Austausch als Fanout-Typ definiert wurde, wird jede Nachricht an alle an den aktuellen Austausch gebundenen Warteschlangen verteilt, und die Nachrichten werden separat von den obigen beiden Methoden verarbeitet.

Hinweis: Die RabbitListener-Annotation kann auf eine Klasse oder Methode angewendet werden. Wenn die RabbitListener-Annotation auf eine Klasse angewendet wird, muss sie mit der RabbitHandler-Annotation kombiniert werden, um zu kennzeichnen, welche Klassenmethode die Nachrichtenverarbeitung ausführt.

4.3 Queue-Listener mit vollständiger Annotation definieren

Sie benötigen keine Spring Boot Konfigurationsklasse, um Austausche, Warteschlangen und Bindungsbeziehungen zu definieren. Sie können die Bindungsbeziehungen, Warteschlangen und Austausche direkt über den bindings-Parameter der RabbitListener-Annotation definieren.

@RabbitListener(
            bindings = {
                    @QueueBinding(
                            value = @Queue(name = "tizi365.fanout.queue3", durable = "true"),
                            exchange = @Exchange(name = "tizi365.fanout", durable = "true", type = ExchangeTypes.FANOUT)
                    )
            }
    )
public void receive3(String msg) {
    System.out.println("Nachricht von Warteschlange 3 empfangen = " + msg);
}

Erklärung:

  • QueueBinding-Annotation: Definiert die Bindungsbeziehung zwischen der Warteschlange und dem Austausch. Der Wertparameter wird verwendet, um die Warteschlange zu definieren, und der Austausch wird definiert.
  • Queue-Annotation: Definiert eine Warteschlange. Der Name-Parameter definiert den Warteschlangennamen (der eindeutig sein muss), und der durable-Parameter gibt an, ob er dauerhaft sein muss.
  • Exchange-Annotation: Definiert einen Austausch. Der Name-Parameter definiert den Austauschnamen, und der Typ-Parameter gibt den Austauschtyp an.