Java RabbitMQ Wzorzec Publish/Subscribe (tryb nadawania szeroko rozpowszechniony, tryb rozsyłania, tryb rozgłaszania)

Typ wymiany FanoutExchange jest używany do wzorca publikacji/subskrypcji w RabbitMQ. W tym przypadku wiadomość wysłana przez producenta będzie przetwarzana przez wiele kolejek konsumentów. Architektura jest przedstawiona na poniższym diagramie:

Tryb pracy RabbitMQ

Wymiana Fanout może przekazywać wiadomości do wszystkich powiązanych kolejek.

Wskazówka: Bez względu na tryb pracy RabbitMQ, różnica polega na używanym typie wymiany i parametrach trasowania.

1. Samouczki wstępne

Proszę najpierw przeczytać następujące sekcje, aby zrozumieć związana wiedzę:

2. Definiowanie wymiany Fanout

W Spring AMQP, klasa FanoutExchange odpowiada za wymianę typu Fanout. Definiujemy wymianę za pomocą klasy konfiguracji Spring Boot.

package com.tizi365.rabbitmq.config;

import org.springframework.amqp.core.FanoutExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class QueueConfig {
	// Definiowanie wymiany
    @Bean
    public FanoutExchange fanout() {
        // Parametrem jest nazwa wymiany, która musi być unikalna
        return new FanoutExchange("tizi365.fanout");
    }
}

Wskazówka: Zarówno producenci wiadomości, jak i konsumenci wymagają wymiany.

3. Wysyłanie wiadomości

Wysyłamy wiadomości do wymiany, która przekazuje je do odpowiednich kolejek na podstawie zasad trasowania.

package com.tizi365.rabbitmq.service;

import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;

@Service
public class SendService {
    @Autowired
    private RabbitTemplate template;
    @Autowired
    private FanoutExchange fanout;

	// W celach demonstracyjnych używane jest zaplanowane zadanie do wysyłania wiadomości co sekundę
    @Scheduled(fixedDelay = 1000, initialDelay = 1000)
    public void send() {
        // Treść wiadomości
        String message = "Witaj Świecie!";
        // Wysłanie wiadomości
        // Pierwszym parametrem jest nazwa wymiany
        // Drugim parametrem jest klucz trasowania; wymiana typu fanout ignoruje klucz trasowania, więc nie trzeba go ustawiać
        // Trzecim parametrem jest treść wiadomości, która obsługuje dowolny typ, o ile może być zserializowany
        template.convertAndSend(fanout.getName(), "", message);
        System.out.println("Wiadomość wysłana: '" + message + "'");
    }
}

4. Odbieranie wiadomości

4.1 Definiowanie kolejek i powiązanie wymiany

Aby konsumować wiadomości z kolejki, najpierw musisz zdefiniować kolejkę, a następnie powiązać ją z docelową wymianą. Poniżej definiujemy dwie kolejki i powiązujemy je z tą samą wymianą.

package com.tizi365.rabbitmq.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class QueueConfig {
    @Bean
    public FanoutExchange fanout() {
        // Definiowanie wymiany
        // Parametrem jest nazwa wymiany, która musi być unikalna
        return new FanoutExchange("tizi365.fanout");
    }

    @Bean
    public Queue queue1() {
        // Definiowanie kolejki 1
        return new Queue("tizi365.fanout.queue1");
    }

    @Bean
    public Queue queue2() {
        // Definiowanie kolejki 2
        return new Queue("tizi365.fanout.queue2");
    }

    @Bean
    public Binding binding1(FanoutExchange fanout, Queue queue1) {
        // Definiowanie relacji powiązania, aby powiązać kolejkę 1 z wymianą fanout
        return BindingBuilder.bind(queue1).to(fanout);
    }

    @Bean
    public Binding binding2(FanoutExchange fanout, Queue queue2) {
        // Definiowanie relacji powiązania, aby powiązać kolejkę 2 z wymianą fanout
        return BindingBuilder.bind(queue2).to(fanout);
    }
}

4.2 Definiowanie Słuchaczy Kolejki

Zdefiniuj słuchaczy wiadomości za pomocą adnotacji RabbitListener, aby konsumować wiadomości z określonych kolejek.

package com.tizi365.rabbitmq.listener;

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

// Spraw, aby bieżąca klasa była zarządzana przez Spring
@Component
public class DemoListener {
    // Zdefiniuj słuchacza i określ, która kolejka ma nasłuchiwać, używając parametru queues
    @RabbitListener(queues = "tizi365.fanout.queue1")
    public void receive1(String msg) {
        System.out.println("Otrzymano wiadomość z kolejki 1 = " + msg);
    }

    // Zdefiniuj słuchacza i określ, która kolejka ma nasłuchiwać, używając parametru queues
    @RabbitListener(queues = "tizi365.fanout.queue2")
    public void receive2(String msg) {
        System.out.println("Otrzymano wiadomość z kolejki 2 = " + msg);
    }
}

Ponieważ wcześniejsza wymiana została zdefiniowana jako typ fanout, każda wiadomość będzie rozpowszechniana do wszystkich kolejek związanych z bieżącą wymianą, a wiadomości zostaną obsłużone oddzielnie przez powyższe dwie metody.

Uwaga: Adnotację RabbitListener można zastosować do klasy lub metody. Jeśli adnotacja RabbitListener jest zdefiniowana na klasie, należy ją połączyć z adnotacją RabbitHandler, aby oznaczyć, która metoda klasy będzie wykonywać obsługę wiadomości.

4.3 Definiowanie Słuchaczy Kolejki z Pełną Adnotacją

Nie potrzebujesz klasy konfiguracyjnej Spring Boot do definiowania wymian, kolejek i relacji wiązania. Możesz bezpośrednio zdefiniować relacje wiązania, kolejki i wymiany za pomocą parametru bindings adnotacji RabbitListener.

@RabbitListener(
            bindings = {
                    @QueueBinding(
                            value = @Queue(name = "tizi365.fanout.queue3", durable = "true"),
                            exchange = @Exchange(name = "tizi365.fanout", durable = "true", type = ExchangeTypes.FANOUT)
                    )
            }
    )
public void receive3(String msg) {
    System.out.println("Otrzymano wiadomość z kolejki 3 = " + msg);
}

Wyjaśnienie:

  • Adnotacja QueueBinding: Definiuje relację wiązania między kolejką a wymianą. Parametr value służy do zdefiniowania kolejki, a exchange służy do zdefiniowania wymiany.
  • Adnotacja Queue: Definiuje kolejkę. Parametr name określa nazwę kolejki (która musi być unikalna), a parametr durable wskazuje, czy musi być trwała.
  • Adnotacja Exchange: Definiuje wymianę. Parametr name określa nazwę wymiany, a parametr type wskazuje typ wymiany.