Das Java RabbitMQ-Thema Muster, das den Typ TopicExchange verwendet, unterscheidet sich vom Routing-Muster (Direct) darin, dass die Routing-Parameter eine ungenaue Übereinstimmung unterstützen. Da die Routing-Übereinstimmung flexibler ist, ist dies ein häufiger verwendetes Muster. Die Architektur ist wie im folgenden Diagramm dargestellt:

RabbitMQ Topic Pattern

Hinweis: Unabhängig davon, welches RabbitMQ-Arbeitsmuster verwendet wird, liegt der Unterschied im Typ des verwendeten Austauschs und den Routing-Parametern.

1. Voraussetzung Tutorial

Bitte lesen Sie zuerst die folgenden Abschnitte, um das relevante Wissen zu verstehen:

2. Definition des Topic Exchange

In Spring AMQP ist die Klasse, die dem Direktaustausch entspricht, der TopicExchange. Wir definieren den Austausch durch eine Spring Boot-Konfigurationsklasse.

package com.tizi365.rabbitmq.config;

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class QueueConfig {
    @Bean
    public TopicExchange topic() {
        // Austausch definieren
        // Der Parameter ist der Austauschname, der eindeutig sein muss
        return new TopicExchange("tizi365.topic");
    }
}

Hinweis: Sowohl Nachrichtenproduzenten als auch -verbraucher benötigen den Austausch.

3. Senden von Nachrichten

Wir senden Nachrichten an den Austausch, und der Austausch liefert die Nachrichten basierend auf den Routingregeln an die entsprechenden Warteschlangen.

package com.tizi365.rabbitmq.service;

import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;

@Service
public class SendService {
    @Autowired
    private RabbitTemplate template;
    @Autowired
    private TopicExchange topic;

    // Zu Demonstrationszwecken wird hier eine geplante Aufgabe verwendet, um jede Sekunde eine Nachricht zu senden
    @Scheduled(fixedDelay = 1000, initialDelay = 1000)
    public void send() {
        // Nachrichteninhalt
        String message = "Hallo Welt!";
        // Nachricht senden
        // Der erste Parameter ist der Austauschname
        // Der zweite Parameter ist der Routing-Key. Der Topic-Austausch liefert Nachrichten an Warteschlangen, die dem Routing-Key entsprechen
        // Der dritte Parameter ist der Nachrichteninhalt und unterstützt jeden Typ, solange er serialisierbar ist
        template.convertAndSend(topic.getName(), "www.tizi365.com", message);
        System.out.println("Nachricht gesendet: '" + message + "'");
    }
}

Hinweis: Beachten Sie den zweiten Parameter "www.tizi365.com" in der convertAndSend-Methode, da dies ein entscheidender Parameter ist.

4. Empfangen von Nachrichten

4.1 Definition von Queues & Binden von Exchanges

Um Nachrichten aus Warteschlangen zu konsumieren, müssen Sie zunächst eine Warteschlange definieren und diese dann an die Ziel-Exchange binden. Im Folgenden werden zwei Warteschlangen definiert und an dieselbe Exchange gebunden.

package com.tizi365.rabbitmq.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class QueueConfig {
    @Bean
    public TopicExchange topic() {
        // Definiere die Exchange
        // Der Parameter ist der Exchange-Name, der eindeutig sein muss
        return new TopicExchange("tizi365.topic");
    }

    @Bean
    public Queue queue1() {
        // Definiere Warteschlange 1
        return new Queue("tizi365.topic.queue1");
    }

    @Bean
    public Queue queue2() {
        // Definiere Warteschlange 2
        return new Queue("tizi365.topic.queue2");
    }

    @Bean
    public Binding binding1(TopicExchange topic, Queue queue1) {
        // Definiere eine Bindungsbeziehung, binde Warteschlange 1 an die Topic-Exchange mit einem Routing-Key von: *.tizi365.com
        return BindingBuilder.bind(queue1).to(topic).with("*.tizi365.com");
    }

    @Bean
    public Binding binding2(TopicExchange topic, Queue queue2) {
        // Definiere eine Bindungsbeziehung, binde Warteschlange 2 an die Topic-Exchange mit einem Routing-Key von: *.baidu.com
        return BindingBuilder.bind(queue2).to(topic).with("*.baidu.com");
    }
}

Tipp: Beim Binden von Warteschlange 1 und Warteschlange 2 an die Exchange werden beide Routing-Keys mit dem * (Stern) Wildcard verwendet, der einem einzelnen Wort entspricht. Wenn dieser zu # (Raute) geändert wird, würde er mehrere Wörter entsprechen.

4.2 Definition von Queue-Listenern

Definieren Sie Nachrichtenlistener mit der RabbitListener-Annotation, um Nachrichten aus bestimmten Warteschlangen zu konsumieren.

package com.tizi365.rabbitmq.listener;

import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;

// Lassen Sie Spring die aktuelle Klasse verwalten
@Component
public class DemoListener {
    // Definiere einen Listener und gib an, welche Warteschlange über den queues-Parameter zugehört werden soll
    @RabbitListener(queues = "tizi365.topic.queue1")
    public void receive1(String msg) {
        System.out.println("Nachricht von Warteschlange 1 empfangen: " + msg);
    }

    // Definiere einen Listener und gib an, welche Warteschlange über den queues-Parameter zugehört werden soll
    @RabbitListener(queues = "tizi365.topic.queue2")
    public void receive2(String msg) {
        System.out.println("Nachricht von Warteschlange 2 empfangen: " + msg);
    }
}

Da nur Warteschlange 1 mit dem Routing-Key *.tizi365.com beim Binden an die Exchange übereinstimmt, werden nur Nachrichten mit dem Routing-Key (www.tizi365.com) von Warteschlange 1 empfangen. Daher wird Warteschlange 2 aufgrund des nicht übereinstimmenden Routing-Keys keine Nachrichten empfangen.