Wzorzec routingu RabbitMQ (tryb bezpośredni) w języku Java wykorzystuje typ wymiany DirectExchange. Różnica w porównaniu z wzorcem publikuj-prenumeruj polega na tym, że wymiana bezpośrednia wysyła wiadomości do kolejek z parametrami routingu, które są w pełni zgodne. Architektura jest przedstawiona na poniższym obrazku:

RabbitMQ Direct Mode

Wskazówka: Niezależnie od używanego trybu pracy RabbitMQ, różnica polega na używanym typie wymiany i parametrach routingu.

1. Samouczki wstępne

Proszę przeczytać poniższe sekcje, aby zrozumieć związane z nimi wiadomości:

2. Zdefiniuj wymianę bezpośrednią

W Spring AMQP, klasą odpowiadającą wymianie bezpośredniej jest DirectExchange. W definicji wymiany używamy klasy konfiguracji Spring Boot.

package com.tizi365.rabbitmq.config;

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class QueueConfig {
	@Bean
    public DirectExchange direct() {
        // Zdefiniuj wymianę
        // Parametrem jest nazwa wymiany, która musi być unikalna
        return new DirectExchange("tizi365.direct");
    }
}

Wskazówka: Zarówno producent wiadomości, jak i konsument wymagają wymiany.

3. Wysyłanie wiadomości

Wysyłamy wiadomości do wymiany, która następnie dostarcza je do odpowiednich kolejek na podstawie reguł routingu.

package com.tizi365.rabbitmq.service;

import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;

@Service
public class SendService {
    @Autowired
    private RabbitTemplate template;
    @Autowired
    private DirectExchange direct;

    // Dla celów demonstracyjnych używamy zaplanowanego zadania do wysyłania wiadomości co sekundę
    @Scheduled(fixedDelay = 1000, initialDelay = 1000)
    public void send() {
        // Treść wiadomości
        String message = "Witaj, świecie!";
        // Wyślij wiadomość
        // Pierwszym parametrem jest nazwa wymiany
        // Drugim parametrem jest klucz routingu. W przypadku wymiany bezpośredniej wiadomość jest dostarczana do kolejki, której klucz routingu pasuje do "tizi365"
        // Trzecim parametrem jest treść wiadomości, która obsługuje dowolny typ, pod warunkiem że może być zserializowany
        template.convertAndSend(direct.getName(), "tizi365", message);
        System.out.println("Wysłano wiadomość '" + message + "'");
    }
}

Wskazówka: Zwróć uwagę na drugi parametr w metodzie convertAndSend, ponieważ jest to istotny parametr.

4. Odbieranie wiadomości

4.1 Zdefiniuj Kolejkę i Powiąż Wymianę

Aby konsumować wiadomości z kolejki, najpierw musisz zdefiniować kolejkę, a następnie powiązać ją z docelową wymianą. Poniższy kod definiuje dwie kolejki i powiązuje je z tą samą wymianą:

package com.tizi365.rabbitmq.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class QueueConfig {
    @Bean
    public DirectExchange direct() {
        // Definiuj wymianę
        // Parametr to nazwa wymiany, która musi być unikalna
        return new DirectExchange("tizi365.direct");
    }

    @Bean
    public Queue queue1() {
        // Definiuj kolejkę 1
        return new Queue("tizi365.direct.queue1");
    }

    @Bean
    public Queue queue2() {
        // Definiuj kolejkę 2
        return new Queue("tizi365.direct.queue2");
    }

    @Bean
    public Binding binding1(DirectExchange direct, Queue queue1) {
        // Definiuj powiązanie kolejki 1 z wymianą bezpośrednią za pomocą klucza routingu "tizi365"
        // Kiedy klucz routingu pasuje do "tizi365", wymiana przekazuje wiadomości do kolejki 1
        return BindingBuilder.bind(queue1).to(direct).with("tizi365");
    }

    @Bean
    public Binding binding2(DirectExchange direct, Queue queue2) {
        // Definiuj powiązanie kolejki 2 z wymianą bezpośrednią za pomocą klucza routingu "baidu"
        // Kiedy klucz routingu pasuje do "baidu", wymiana przekazuje wiadomości do kolejki 2
        return BindingBuilder.bind(queue2).to(direct).with("baidu");
    }
}

4.2 Zdefiniuj Słuchaczy Kolejek

Zdefiniuj słuchaczy wiadomości za pomocą adnotacji RabbitListener, aby konsumować wiadomości z określonych kolejek:

package com.tizi365.rabbitmq.listener;

import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component; 

// Pozwól Springowi zarządzać bieżącą klasą
@Component
public class DemoListener {
    // Zdefiniuj słuchacza do konsumowania wiadomości z kolejki1
    @RabbitListener(queues = "tizi365.direct.queue1")
    public void receive1(String msg) {
        System.out.println("Otrzymano wiadomość z kolejki1: " + msg);
    }

    // Zdefiniuj słuchacza do konsumowania wiadomości z kolejki2
    @RabbitListener(queues = "tizi365.direct.queue2")
    public void receive2(String msg) {
        System.out.println("Otrzymano wiadomość z kolejki2: " + msg);
    }
}

Ponieważ tylko kolejka 1 ma klucz routingu "tizi365" podczas powiązania z wymianą, tylko kolejka 1 może otrzymywać wiadomości. Kolejka 2, ponieważ klucz routingu nie pasuje, nie będzie odbierać żadnych wiadomości.