Wzorzec tematu RabbitMQ w języku Java, korzystający z typu wymiany TopicExchange, różni się od wzorca trasowania (Direct) tym, że parametry trasowania obsługują dopasowanie rozmyte. Ponieważ dopasowywanie trasowania jest bardziej elastyczne, jest to bardziej powszechnie używany wzorzec. Architektura jest przedstawiona na poniższym diagramie:
Wskazówka: Bez względu na to, który wzorzec pracy RabbitMQ jest używany, różnica polega na rodzaju użytej wymiany i parametrach trasowania.
1. Samouczek wstępny
Proszę najpierw przeczytać następujące sekcje, aby zrozumieć odpowiednią wiedzę:
- Podstawowe koncepcje RabbitMQ
- Zasady wzorca tematu RabbitMQ
- Szybki start z Javą i RabbitMQ (Konieczne do przeczytania, ponieważ kolejne sekcje nie będą powtarzać kodu, tylko pokażą kluczowy kod)
- Sekcja Podejmowania Decyzji dla Javy i wzorca Publikuj-Subskrybuj (Konieczne do przeczytania, ponieważ składnia kodu jest prawie taka sama, różni się tylko rodzajem wymiany i parametrami trasowania)
2. Definiowanie wymiany tematu
W Spring AMQP, klasa odpowiadająca za wymianę Direct to TopicExchange. Definiujemy wymianę za pomocą klasy konfiguracji Spring Boot.
package com.tizi365.rabbitmq.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class QueueConfig {
@Bean
public TopicExchange topic() {
// Zdefiniuj wymianę
// Parametr to nazwa wymiany, która musi być unikalna
return new TopicExchange("tizi365.topic");
}
}
Wskazówka: Zarówno producenci, jak i konsumenci wiadomości potrzebują wymiany.
3. Wysyłanie wiadomości
Wysyłamy wiadomości do wymiany, a wymiana dostarcza wiadomości do odpowiednich kolejek na podstawie reguł trasowania.
package com.tizi365.rabbitmq.service;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
@Service
public class SendService {
@Autowired
private RabbitTemplate template;
@Autowired
private TopicExchange topic;
// W celach demonstracyjnych tutaj używane jest zaplanowane zadanie do wysyłania wiadomości co sekundę
@Scheduled(fixedDelay = 1000, initialDelay = 1000)
public void send() {
// Treść wiadomości
String message = "Witaj świecie!";
// Wyślij wiadomość
// Pierwszy parametr to nazwa wymiany
// Drugi parametr to klucz trasowania. Wymiana tematów dostarcza wiadomości do kolejek, które pasują do klucza trasowania
// Trzeci parametr to treść wiadomości, obsługująca dowolny typ, o ile może być zserializowany
template.convertAndSend(topic.getName(), "www.tizi365.com", message);
System.out.println("Wiadomość wysłana: '" + message + "'");
}
}
Wskazówka: Zwróć uwagę na drugi parametr "www.tizi365.com" w metodzie convertAndSend, ponieważ jest to kluczowy parametr.
4. Odbieranie wiadomości
4.1 Definicja Kolejek i Wiązanie Wymiany
Aby konsumować wiadomości z kolejek, najpierw musisz zdefiniować kolejkę, a następnie przypiąć kolejkę do docelowej wymiany. Poniżej zdefiniowano dwie kolejki i powiązano je z tą samą wymianą.
package com.tizi365.rabbitmq.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class QueueConfig {
@Bean
public TopicExchange topic() {
// Definiowanie wymiany
// Parametrem jest nazwa wymiany, która musi być unikalna
return new TopicExchange("tizi365.topic");
}
@Bean
public Queue queue1() {
// Definicja kolejki 1
return new Queue("tizi365.topic.queue1");
}
@Bean
public Queue queue2() {
// Definicja kolejki 2
return new Queue("tizi365.topic.queue2");
}
@Bean
public Binding binding1(TopicExchange topic, Queue queue1) {
// Określenie relacji wiązania, przypinanie kolejki 1 do wymiany tematycznej za pomocą klucza routingu: *.tizi365.com
return BindingBuilder.bind(queue1).to(topic).with("*.tizi365.com");
}
@Bean
public Binding binding2(TopicExchange topic, Queue queue2) {
// Określenie relacji wiązania, przypinanie kolejki 2 do wymiany tematycznej za pomocą klucza routingu: *.baidu.com
return BindingBuilder.bind(queue2).to(topic).with("*.baidu.com");
}
}
Wskazówka: Podczas przypinania kolejki 1 i kolejki 2 do wymiany, ustawione są klucze routingu używające znaku * (gwiazdki), który dopasowuje pojedyncze słowo. Jeśli zostanie zmieniony na # (hasz), dopasowałby wiele słów.
4.2 Definicja Słuchaczy Kolejki
Zdefiniuj słuchaczy wiadomości, korzystając z adnotacji RabbitListener do konsumowania wiadomości z określonych kolejek.
package com.tizi365.rabbitmq.listener;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;
// Niech Spring zarządza bieżącą klasą
@Component
public class DemoListener {
// Definiowanie słuchacza, określenie, której kolejki słuchać za pomocą parametru queues
@RabbitListener(queues = "tizi365.topic.queue1")
public void receive1(String msg) {
System.out.println("Otrzymano wiadomość z kolejki 1: " + msg);
}
// Definiowanie słuchacza, określenie, której kolejki słuchać za pomocą parametru queues
@RabbitListener(queues = "tizi365.topic.queue2")
public void receive2(String msg) {
System.out.println("Otrzymano wiadomość z kolejki 2: " + msg);
}
}
Ponieważ tylko kolejka 1 ma klucz routingu *.tizi365.com podczas przypinania do wymiany, dopasuje ona wiadomości z kluczem routingu (www.tizi365.com). Dlatego tylko kolejka 1 będzie mogła otrzymywać wiadomości, podczas gdy kolejka 2 nie otrzyma żadnej z powodu niepasującego klucza routingu.