Wzorzec tematu RabbitMQ w języku Java, korzystający z typu wymiany TopicExchange, różni się od wzorca trasowania (Direct) tym, że parametry trasowania obsługują dopasowanie rozmyte. Ponieważ dopasowywanie trasowania jest bardziej elastyczne, jest to bardziej powszechnie używany wzorzec. Architektura jest przedstawiona na poniższym diagramie:

RabbitMQ Topic Pattern

Wskazówka: Bez względu na to, który wzorzec pracy RabbitMQ jest używany, różnica polega na rodzaju użytej wymiany i parametrach trasowania.

1. Samouczek wstępny

Proszę najpierw przeczytać następujące sekcje, aby zrozumieć odpowiednią wiedzę:

2. Definiowanie wymiany tematu

W Spring AMQP, klasa odpowiadająca za wymianę Direct to TopicExchange. Definiujemy wymianę za pomocą klasy konfiguracji Spring Boot.

package com.tizi365.rabbitmq.config;

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class QueueConfig {
    @Bean
    public TopicExchange topic() {
        // Zdefiniuj wymianę
        // Parametr to nazwa wymiany, która musi być unikalna
        return new TopicExchange("tizi365.topic");
    }
}

Wskazówka: Zarówno producenci, jak i konsumenci wiadomości potrzebują wymiany.

3. Wysyłanie wiadomości

Wysyłamy wiadomości do wymiany, a wymiana dostarcza wiadomości do odpowiednich kolejek na podstawie reguł trasowania.

package com.tizi365.rabbitmq.service;

import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;

@Service
public class SendService {
    @Autowired
    private RabbitTemplate template;
    @Autowired
    private TopicExchange topic;

    // W celach demonstracyjnych tutaj używane jest zaplanowane zadanie do wysyłania wiadomości co sekundę
    @Scheduled(fixedDelay = 1000, initialDelay = 1000)
    public void send() {
        // Treść wiadomości
        String message = "Witaj świecie!";
        // Wyślij wiadomość
        // Pierwszy parametr to nazwa wymiany
        // Drugi parametr to klucz trasowania. Wymiana tematów dostarcza wiadomości do kolejek, które pasują do klucza trasowania
        // Trzeci parametr to treść wiadomości, obsługująca dowolny typ, o ile może być zserializowany
        template.convertAndSend(topic.getName(), "www.tizi365.com", message);
        System.out.println("Wiadomość wysłana: '" + message + "'");
    }
}

Wskazówka: Zwróć uwagę na drugi parametr "www.tizi365.com" w metodzie convertAndSend, ponieważ jest to kluczowy parametr.

4. Odbieranie wiadomości

4.1 Definicja Kolejek i Wiązanie Wymiany

Aby konsumować wiadomości z kolejek, najpierw musisz zdefiniować kolejkę, a następnie przypiąć kolejkę do docelowej wymiany. Poniżej zdefiniowano dwie kolejki i powiązano je z tą samą wymianą.

package com.tizi365.rabbitmq.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class QueueConfig {
    @Bean
    public TopicExchange topic() {
        // Definiowanie wymiany
        // Parametrem jest nazwa wymiany, która musi być unikalna
        return new TopicExchange("tizi365.topic");
    }

    @Bean
    public Queue queue1() {
        // Definicja kolejki 1
        return new Queue("tizi365.topic.queue1");
    }

    @Bean
    public Queue queue2() {
        // Definicja kolejki 2
        return new Queue("tizi365.topic.queue2");
    }

    @Bean
    public Binding binding1(TopicExchange topic, Queue queue1) {
        // Określenie relacji wiązania, przypinanie kolejki 1 do wymiany tematycznej za pomocą klucza routingu: *.tizi365.com
        return BindingBuilder.bind(queue1).to(topic).with("*.tizi365.com");
    }

    @Bean
    public Binding binding2(TopicExchange topic, Queue queue2) {
        // Określenie relacji wiązania, przypinanie kolejki 2 do wymiany tematycznej za pomocą klucza routingu: *.baidu.com
        return BindingBuilder.bind(queue2).to(topic).with("*.baidu.com");
    }
}

Wskazówka: Podczas przypinania kolejki 1 i kolejki 2 do wymiany, ustawione są klucze routingu używające znaku * (gwiazdki), który dopasowuje pojedyncze słowo. Jeśli zostanie zmieniony na # (hasz), dopasowałby wiele słów.

4.2 Definicja Słuchaczy Kolejki

Zdefiniuj słuchaczy wiadomości, korzystając z adnotacji RabbitListener do konsumowania wiadomości z określonych kolejek.

package com.tizi365.rabbitmq.listener;

import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;

// Niech Spring zarządza bieżącą klasą
@Component
public class DemoListener {
    // Definiowanie słuchacza, określenie, której kolejki słuchać za pomocą parametru queues
    @RabbitListener(queues = "tizi365.topic.queue1")
    public void receive1(String msg) {
        System.out.println("Otrzymano wiadomość z kolejki 1: " + msg);
    }

    // Definiowanie słuchacza, określenie, której kolejki słuchać za pomocą parametru queues
    @RabbitListener(queues = "tizi365.topic.queue2")
    public void receive2(String msg) {
        System.out.println("Otrzymano wiadomość z kolejki 2: " + msg);
    }
}

Ponieważ tylko kolejka 1 ma klucz routingu *.tizi365.com podczas przypinania do wymiany, dopasuje ona wiadomości z kluczem routingu (www.tizi365.com). Dlatego tylko kolejka 1 będzie mogła otrzymywać wiadomości, podczas gdy kolejka 2 nie otrzyma żadnej z powodu niepasującego klucza routingu.