Java RabbitMQ Wzorzec Publish/Subscribe (tryb nadawania szeroko rozpowszechniony, tryb rozsyłania, tryb rozgłaszania)
Typ wymiany FanoutExchange jest używany do wzorca publikacji/subskrypcji w RabbitMQ. W tym przypadku wiadomość wysłana przez producenta będzie przetwarzana przez wiele kolejek konsumentów. Architektura jest przedstawiona na poniższym diagramie:
Wymiana Fanout może przekazywać wiadomości do wszystkich powiązanych kolejek.
Wskazówka: Bez względu na tryb pracy RabbitMQ, różnica polega na używanym typie wymiany i parametrach trasowania.
1. Samouczki wstępne
Proszę najpierw przeczytać następujące sekcje, aby zrozumieć związana wiedzę:
- Podstawowe koncepcje RabbitMQ
- Wzorzec publikacji/subskrypcji w RabbitMQ
- Szybki start z RabbitMQ dla Javy (obowiązkowy, jako że kolejne sekcje nie będą powielać kodu, a jedynie pokazują kluczowe fragmenty kodu)
2. Definiowanie wymiany Fanout
W Spring AMQP, klasa FanoutExchange odpowiada za wymianę typu Fanout. Definiujemy wymianę za pomocą klasy konfiguracji Spring Boot.
package com.tizi365.rabbitmq.config;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class QueueConfig {
// Definiowanie wymiany
@Bean
public FanoutExchange fanout() {
// Parametrem jest nazwa wymiany, która musi być unikalna
return new FanoutExchange("tizi365.fanout");
}
}
Wskazówka: Zarówno producenci wiadomości, jak i konsumenci wymagają wymiany.
3. Wysyłanie wiadomości
Wysyłamy wiadomości do wymiany, która przekazuje je do odpowiednich kolejek na podstawie zasad trasowania.
package com.tizi365.rabbitmq.service;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
@Service
public class SendService {
@Autowired
private RabbitTemplate template;
@Autowired
private FanoutExchange fanout;
// W celach demonstracyjnych używane jest zaplanowane zadanie do wysyłania wiadomości co sekundę
@Scheduled(fixedDelay = 1000, initialDelay = 1000)
public void send() {
// Treść wiadomości
String message = "Witaj Świecie!";
// Wysłanie wiadomości
// Pierwszym parametrem jest nazwa wymiany
// Drugim parametrem jest klucz trasowania; wymiana typu fanout ignoruje klucz trasowania, więc nie trzeba go ustawiać
// Trzecim parametrem jest treść wiadomości, która obsługuje dowolny typ, o ile może być zserializowany
template.convertAndSend(fanout.getName(), "", message);
System.out.println("Wiadomość wysłana: '" + message + "'");
}
}
4. Odbieranie wiadomości
4.1 Definiowanie kolejek i powiązanie wymiany
Aby konsumować wiadomości z kolejki, najpierw musisz zdefiniować kolejkę, a następnie powiązać ją z docelową wymianą. Poniżej definiujemy dwie kolejki i powiązujemy je z tą samą wymianą.
package com.tizi365.rabbitmq.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class QueueConfig {
@Bean
public FanoutExchange fanout() {
// Definiowanie wymiany
// Parametrem jest nazwa wymiany, która musi być unikalna
return new FanoutExchange("tizi365.fanout");
}
@Bean
public Queue queue1() {
// Definiowanie kolejki 1
return new Queue("tizi365.fanout.queue1");
}
@Bean
public Queue queue2() {
// Definiowanie kolejki 2
return new Queue("tizi365.fanout.queue2");
}
@Bean
public Binding binding1(FanoutExchange fanout, Queue queue1) {
// Definiowanie relacji powiązania, aby powiązać kolejkę 1 z wymianą fanout
return BindingBuilder.bind(queue1).to(fanout);
}
@Bean
public Binding binding2(FanoutExchange fanout, Queue queue2) {
// Definiowanie relacji powiązania, aby powiązać kolejkę 2 z wymianą fanout
return BindingBuilder.bind(queue2).to(fanout);
}
}
4.2 Definiowanie Słuchaczy Kolejki
Zdefiniuj słuchaczy wiadomości za pomocą adnotacji RabbitListener, aby konsumować wiadomości z określonych kolejek.
package com.tizi365.rabbitmq.listener;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
// Spraw, aby bieżąca klasa była zarządzana przez Spring
@Component
public class DemoListener {
// Zdefiniuj słuchacza i określ, która kolejka ma nasłuchiwać, używając parametru queues
@RabbitListener(queues = "tizi365.fanout.queue1")
public void receive1(String msg) {
System.out.println("Otrzymano wiadomość z kolejki 1 = " + msg);
}
// Zdefiniuj słuchacza i określ, która kolejka ma nasłuchiwać, używając parametru queues
@RabbitListener(queues = "tizi365.fanout.queue2")
public void receive2(String msg) {
System.out.println("Otrzymano wiadomość z kolejki 2 = " + msg);
}
}
Ponieważ wcześniejsza wymiana została zdefiniowana jako typ fanout, każda wiadomość będzie rozpowszechniana do wszystkich kolejek związanych z bieżącą wymianą, a wiadomości zostaną obsłużone oddzielnie przez powyższe dwie metody.
Uwaga: Adnotację RabbitListener można zastosować do klasy lub metody. Jeśli adnotacja RabbitListener jest zdefiniowana na klasie, należy ją połączyć z adnotacją RabbitHandler, aby oznaczyć, która metoda klasy będzie wykonywać obsługę wiadomości.
4.3 Definiowanie Słuchaczy Kolejki z Pełną Adnotacją
Nie potrzebujesz klasy konfiguracyjnej Spring Boot do definiowania wymian, kolejek i relacji wiązania. Możesz bezpośrednio zdefiniować relacje wiązania, kolejki i wymiany za pomocą parametru bindings adnotacji RabbitListener.
@RabbitListener(
bindings = {
@QueueBinding(
value = @Queue(name = "tizi365.fanout.queue3", durable = "true"),
exchange = @Exchange(name = "tizi365.fanout", durable = "true", type = ExchangeTypes.FANOUT)
)
}
)
public void receive3(String msg) {
System.out.println("Otrzymano wiadomość z kolejki 3 = " + msg);
}
Wyjaśnienie:
- Adnotacja QueueBinding: Definiuje relację wiązania między kolejką a wymianą. Parametr value służy do zdefiniowania kolejki, a exchange służy do zdefiniowania wymiany.
- Adnotacja Queue: Definiuje kolejkę. Parametr name określa nazwę kolejki (która musi być unikalna), a parametr durable wskazuje, czy musi być trwała.
- Adnotacja Exchange: Definiuje wymianę. Parametr name określa nazwę wymiany, a parametr type wskazuje typ wymiany.