Modello di pubblicazione/sottoscrizione PHP RabbitMQ (noto anche come modello di pubblicazione/multipiattaforma)

Il modello di diffusione (Fanout) in RabbitMQ è un modello in cui i messaggi inviati da un produttore vengono elaborati da più consumatori in code diverse, come mostrato nel diagramma dell'architettura di seguito.

Modalità di lavoro di RabbitMQ

L'exchange di diffusione (Fanout) può inoltrare messaggi a tutte le code vincolate.

Nota: Indipendentemente dalla modalità di lavoro di RabbitMQ utilizzata, la differenza risiede nel tipo di exchange e nei parametri di routing utilizzati.

1. Tutorial preliminare

Si prega di leggere i seguenti capitoli per comprendere le conoscenze pertinenti:

2. Definizione di un exchange di diffusione (Fanout)

Definire lo scambio (exchange) attraverso la funzione exchange_declare del canale.

$channel->exchange_declare(
    'tizi365.fanout', // Nome dell'exchange, deve essere unico e non può essere duplicato
    'fanout', // Tipo di exchange
    false,
    false, // Se persistente
    false
);

Nota: Sia i produttori che i consumatori di messaggi richiedono lo scambio.

3. Invio di messaggi

Inviamo messaggi allo scambio, e lo scambio distribuisce i messaggi alle code corrispondenti in base alle regole di routing.

<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

// Creare una connessione RabbitMQ
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
// Creare un canale
$channel = $connection->channel();

// Dichiarare lo scambio
$channel->exchange_declare(
    'tizi365.fanout', // Nome dell'exchange, deve essere unico e non può essere duplicato
    'fanout', // Tipo di exchange
    false,
    false, // Se persistente
    false
);
// Oggetto messaggio, con il contenuto del messaggio come parametro
$msg = new AMQPMessage("ciao tizi365.com");

$channel->basic_publish(
    $msg, // Oggetto messaggio
    'tizi365.fanout' // Nome dell'exchange
);

echo ' [x] Sent ', $msg->getBody(), "\n";

// Rilasciare le risorse
$channel->close();
$connection->close();

4. Ricezione di messaggi

4.1. Definizione di una Coda e Associazione dell'Exchange

Per consumare i messaggi della coda, è necessario prima definire una coda e quindi associare la coda all'exchange di destinazione. Di seguito, viene definita una coda e associata a un exchange specifico.

// Dichiarare una coda, se il nome della coda è vuoto, verrà generato automaticamente un ID univoco e il nome della coda verrà restituito
list($queue_name, ,) = $channel->queue_declare(
    "", // Nome della coda, non consentito duplicato, se vuoto, verrà generato automaticamente un ID univoco, rendendolo una coda anonima
    false,
    false, // Se persistere
    true,
    false
);

// Associare la coda all'exchange specificato
$channel->queue_bind(
    $queue_name, // Nome della coda
    'tizi365.fanout' // Nome dell'exchange
);
<?php 
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;

// Creare una connessione rabbitmq
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
// Creare un Canale
$channel = $connection->channel();

// Dichiarare un exchange
$channel->exchange_declare(
    'tizi365.fanout', // Nome dell'exchange, deve essere unico e non ripetibile
    'fanout', // Tipo di exchange
    false,
    false, // Se persistere
    false
);

// Dichiarare una coda
list($queue_name, ,) = $channel->queue_declare(
    "", // Nome della coda, non consentito duplicato, se vuoto viene generato un ID univoco, quindi è una coda anonima
    false,
    false, // Se persistere
    true,
    false
);

// Associare la coda all'exchange specificato
$channel->queue_bind(
    $queue_name, // Nome della coda
    'tizi365.fanout' // Nome dell'exchange
);

echo " [*] In attesa di un messaggio. Premi CTRL+C per uscire\n";

// Definire la funzione di elaborazione del messaggio (usando una funzione anonima qui)
$callback = function ($msg) {
    // Logica di elaborazione del messaggio
    echo ' [x] ', $msg->body, "\n";
};

// Creare un consumatore
$channel->basic_consume(
    $queue_name, // Nome della coda da consumare
    '', // Tag del consumatore, ignorato, quindi generare un ID univoco
    false,
    true, // Se riconoscere automaticamente il messaggio, cioè informare rabbitmq che il messaggio è stato elaborato correttamente.
    false,
    false,
    $callback // Funzione di elaborazione del messaggio
);

// Se il canale non è chiuso, blocca il processo evitando che esca
while ($channel->is_open()) {
    $channel->wait();
}

// Rilasciare risorse
$channel->close();
$connection->close();