Modello di pubblicazione/sottoscrizione RabbitMQ in modalità broadcast (modalità fanout)
Il tipo di scambio FanoutExchange è utilizzato per il modello di pubblicazione/sottoscrizione in RabbitMQ, in cui un messaggio inviato da un produttore sarà elaborato da code di consumatori multiple. L'architettura è come mostrato nel seguente diagramma:
L'exchange Fanout può inoltrare messaggi a tutte le code vincolate.
Suggerimento: Indipendentemente dal tipo di modalità di funzionamento di RabbitMQ utilizzato, la differenza risiede nel tipo di scambio e nei parametri di instradamento utilizzati.
1. Tutorial preliminari
Per favore, leggere prima le seguenti sezioni per comprendere le conoscenze pertinenti:
- Concetti di base di RabbitMQ
- Modello di pubblicazione/sottoscrizione di RabbitMQ
- Guida rapida a RabbitMQ Java (obbligatorio, in quanto le sezioni successive non duplicheranno il codice, ma mostreranno solo frammenti di codice chiave)
2. Definizione dell'exchange Fanout
In Spring AMQP, la classe FanoutExchange corrisponde all'exchange Fanout. Definiamo lo scambio attraverso una classe di configurazione Spring Boot.
package com.tizi365.rabbitmq.config;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class QueueConfig {
// Definisci lo scambio
@Bean
public FanoutExchange fanout() {
// Il parametro è il nome dello scambio, che deve essere univoco
return new FanoutExchange("tizi365.fanout");
}
}
Suggerimento: Sia i produttori che i consumatori di messaggi richiedono uno scambio.
3. Invio dei messaggi
Inviare messaggi allo scambio, che invierà i messaggi alle code corrispondenti in base alle regole di instradamento.
package com.tizi365.rabbitmq.service;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
@Service
public class SendService {
@Autowired
private RabbitTemplate template;
@Autowired
private FanoutExchange fanout;
// Per scopi dimostrativi, viene utilizzato un compito pianificato per inviare un messaggio ogni secondo
@Scheduled(fixedDelay = 1000, initialDelay = 1000)
public void send() {
// Contenuto del messaggio
String message = "Ciao mondo!";
// Invia il messaggio
// Il primo parametro è il nome dello scambio
// Il secondo parametro è la chiave di instradamento; lo scambio fanout ignorerà la chiave di instradamento, quindi non è necessario impostarla
// Il terzo parametro è il contenuto del messaggio, che supporta qualsiasi tipo purché possa essere serializzato
template.convertAndSend(fanout.getName(), "", message);
System.out.println("Messaggio inviato: '" + message + "'");
}
}
4. Ricezione dei messaggi
4.1 Definizione delle code e vincolo dello scambio
Per consumare i messaggi della coda, è necessario definire prima una coda e quindi vincolare la coda allo scambio di destinazione. Di seguito, definiamo due code e le vincoliamo allo stesso scambio.
package com.tizi365.rabbitmq.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class QueueConfig {
@Bean
public FanoutExchange fanout() {
// Definisci lo scambio
// Il parametro è il nome dello scambio, che deve essere univoco
return new FanoutExchange("tizi365.fanout");
}
@Bean
public Queue queue1() {
// Definisci la coda 1
return new Queue("tizi365.fanout.queue1");
}
@Bean
public Queue queue2() {
// Definisci la coda 2
return new Queue("tizi365.fanout.queue2");
}
@Bean
public Binding binding1(FanoutExchange fanout, Queue queue1) {
// Definisci una relazione di vincolo, per vincolare la coda 1 allo scambio fanout
return BindingBuilder.bind(queue1).to(fanout);
}
@Bean
public Binding binding2(FanoutExchange fanout, Queue queue2) {
// Definisci una relazione di vincolo, per vincolare la coda 2 allo scambio fanout
return BindingBuilder.bind(queue2).to(fanout);
}
}
4.2 Definire gli ascoltatori della coda
Definire i listeners dei messaggi attraverso l'annotazione RabbitListener per consumare i messaggi da code specifiche.
package com.tizi365.rabbitmq.listener;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
// Rendere la classe corrente gestita da Spring
@Component
public class DemoListener {
// Definire un listener e specificare a quale coda ascoltare utilizzando il parametro queues
@RabbitListener(queues = "tizi365.fanout.queue1")
public void receive1(String msg) {
System.out.println("Messaggio ricevuto dalla coda 1 = " + msg);
}
// Definire un listener e specificare a quale coda ascoltare utilizzando il parametro queues
@RabbitListener(queues = "tizi365.fanout.queue2")
public void receive2(String msg) {
System.out.println("Messaggio ricevuto dalla coda 2 = " + msg);
}
}
Poiché lo scambio precedente è stato definito come di tipo fanout, ogni messaggio verrà distribuito a tutte le code vincolate allo scambio corrente e i messaggi verranno gestiti separatamente dai due metodi sopra.
Nota: L'annotazione RabbitListener può essere applicata a una classe o a un metodo. Se l'annotazione RabbitListener è definita su una classe, deve essere combinata con l'annotazione RabbitHandler per segnare quale metodo della classe eseguirà la gestione del messaggio.
4.3 Definire gli ascoltatori della coda con l'annotazione completa
Non è necessaria la classe di configurazione di Spring Boot per definire gli scambi, le code e le relazioni di vincolo. È possibile definire direttamente le relazioni di vincolo, le code e gli scambi attraverso il parametro bindings dell'annotazione RabbitListener.
@RabbitListener(
bindings = {
@QueueBinding(
value = @Queue(name = "tizi365.fanout.queue3", durable = "true"),
exchange = @Exchange(name = "tizi365.fanout", durable = "true", type = ExchangeTypes.FANOUT)
)
}
)
public void receive3(String msg) {
System.out.println("Messaggio ricevuto dalla coda 3 = " + msg);
}
Spiegazione:
- Annotazione QueueBinding: Definisce la relazione di vincolo tra la coda e lo scambio. Il parametro value viene utilizzato per definire la coda e l'exchange viene utilizzato per definire lo scambio.
- Annotazione Queue: Definisce una coda. Il parametro name definisce il nome della coda (che deve essere univoco) e il parametro durable indica se deve essere durevole.
- Annotazione Exchange: Definisce uno scambio. Il parametro name definisce il nome dello scambio e il parametro type indica il tipo di scambio.