Wzorzec routingu RabbitMQ (tryb bezpośredni) w języku Java wykorzystuje typ wymiany DirectExchange. Różnica w porównaniu z wzorcem publikuj-prenumeruj polega na tym, że wymiana bezpośrednia wysyła wiadomości do kolejek z parametrami routingu, które są w pełni zgodne. Architektura jest przedstawiona na poniższym obrazku:
Wskazówka: Niezależnie od używanego trybu pracy RabbitMQ, różnica polega na używanym typie wymiany i parametrach routingu.
1. Samouczki wstępne
Proszę przeczytać poniższe sekcje, aby zrozumieć związane z nimi wiadomości:
- Podstawowe koncepcje RabbitMQ
- Zasada trybu routingu RabbitMQ
- Szybki start z RabbitMQ w języku Java (Konieczne do przeczytania, ponieważ kolejne sekcje nie będą powielać kodu, a jedynie wyświetlać kluczowy kod)
- Wzorzec publikuj-prenumeruj w języku Java z RabbitMQ (Konieczne do przeczytania, ponieważ struktura kodu jest prawie taka sama, zmienia się tylko typ wymiany i parametry routingu)
2. Zdefiniuj wymianę bezpośrednią
W Spring AMQP, klasą odpowiadającą wymianie bezpośredniej jest DirectExchange. W definicji wymiany używamy klasy konfiguracji Spring Boot.
package com.tizi365.rabbitmq.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class QueueConfig {
@Bean
public DirectExchange direct() {
// Zdefiniuj wymianę
// Parametrem jest nazwa wymiany, która musi być unikalna
return new DirectExchange("tizi365.direct");
}
}
Wskazówka: Zarówno producent wiadomości, jak i konsument wymagają wymiany.
3. Wysyłanie wiadomości
Wysyłamy wiadomości do wymiany, która następnie dostarcza je do odpowiednich kolejek na podstawie reguł routingu.
package com.tizi365.rabbitmq.service;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
@Service
public class SendService {
@Autowired
private RabbitTemplate template;
@Autowired
private DirectExchange direct;
// Dla celów demonstracyjnych używamy zaplanowanego zadania do wysyłania wiadomości co sekundę
@Scheduled(fixedDelay = 1000, initialDelay = 1000)
public void send() {
// Treść wiadomości
String message = "Witaj, świecie!";
// Wyślij wiadomość
// Pierwszym parametrem jest nazwa wymiany
// Drugim parametrem jest klucz routingu. W przypadku wymiany bezpośredniej wiadomość jest dostarczana do kolejki, której klucz routingu pasuje do "tizi365"
// Trzecim parametrem jest treść wiadomości, która obsługuje dowolny typ, pod warunkiem że może być zserializowany
template.convertAndSend(direct.getName(), "tizi365", message);
System.out.println("Wysłano wiadomość '" + message + "'");
}
}
Wskazówka: Zwróć uwagę na drugi parametr w metodzie convertAndSend, ponieważ jest to istotny parametr.
4. Odbieranie wiadomości
4.1 Zdefiniuj Kolejkę i Powiąż Wymianę
Aby konsumować wiadomości z kolejki, najpierw musisz zdefiniować kolejkę, a następnie powiązać ją z docelową wymianą. Poniższy kod definiuje dwie kolejki i powiązuje je z tą samą wymianą:
package com.tizi365.rabbitmq.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class QueueConfig {
@Bean
public DirectExchange direct() {
// Definiuj wymianę
// Parametr to nazwa wymiany, która musi być unikalna
return new DirectExchange("tizi365.direct");
}
@Bean
public Queue queue1() {
// Definiuj kolejkę 1
return new Queue("tizi365.direct.queue1");
}
@Bean
public Queue queue2() {
// Definiuj kolejkę 2
return new Queue("tizi365.direct.queue2");
}
@Bean
public Binding binding1(DirectExchange direct, Queue queue1) {
// Definiuj powiązanie kolejki 1 z wymianą bezpośrednią za pomocą klucza routingu "tizi365"
// Kiedy klucz routingu pasuje do "tizi365", wymiana przekazuje wiadomości do kolejki 1
return BindingBuilder.bind(queue1).to(direct).with("tizi365");
}
@Bean
public Binding binding2(DirectExchange direct, Queue queue2) {
// Definiuj powiązanie kolejki 2 z wymianą bezpośrednią za pomocą klucza routingu "baidu"
// Kiedy klucz routingu pasuje do "baidu", wymiana przekazuje wiadomości do kolejki 2
return BindingBuilder.bind(queue2).to(direct).with("baidu");
}
}
4.2 Zdefiniuj Słuchaczy Kolejek
Zdefiniuj słuchaczy wiadomości za pomocą adnotacji RabbitListener, aby konsumować wiadomości z określonych kolejek:
package com.tizi365.rabbitmq.listener;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;
// Pozwól Springowi zarządzać bieżącą klasą
@Component
public class DemoListener {
// Zdefiniuj słuchacza do konsumowania wiadomości z kolejki1
@RabbitListener(queues = "tizi365.direct.queue1")
public void receive1(String msg) {
System.out.println("Otrzymano wiadomość z kolejki1: " + msg);
}
// Zdefiniuj słuchacza do konsumowania wiadomości z kolejki2
@RabbitListener(queues = "tizi365.direct.queue2")
public void receive2(String msg) {
System.out.println("Otrzymano wiadomość z kolejki2: " + msg);
}
}
Ponieważ tylko kolejka 1 ma klucz routingu "tizi365" podczas powiązania z wymianą, tylko kolejka 1 może otrzymywać wiadomości. Kolejka 2, ponieważ klucz routingu nie pasuje, nie będzie odbierać żadnych wiadomości.