Modello di pubblicazione/sottoscrizione PHP RabbitMQ (noto anche come modello di pubblicazione/multipiattaforma)
Il modello di diffusione (Fanout) in RabbitMQ è un modello in cui i messaggi inviati da un produttore vengono elaborati da più consumatori in code diverse, come mostrato nel diagramma dell'architettura di seguito.
L'exchange di diffusione (Fanout) può inoltrare messaggi a tutte le code vincolate.
Nota: Indipendentemente dalla modalità di lavoro di RabbitMQ utilizzata, la differenza risiede nel tipo di exchange e nei parametri di routing utilizzati.
1. Tutorial preliminare
Si prega di leggere i seguenti capitoli per comprendere le conoscenze pertinenti:
- Concetti di base di RabbitMQ
- Modello di pubblicazione/sottoscrizione RabbitMQ
- Avvio rapido di RabbitMQ con PHP (Obbligatorio, in quanto i capitoli successivi non ripeteranno il codice ma mostreranno solo frammenti di codice chiave)
2. Definizione di un exchange di diffusione (Fanout)
Definire lo scambio (exchange) attraverso la funzione exchange_declare del canale.
$channel->exchange_declare(
'tizi365.fanout', // Nome dell'exchange, deve essere unico e non può essere duplicato
'fanout', // Tipo di exchange
false,
false, // Se persistente
false
);
Nota: Sia i produttori che i consumatori di messaggi richiedono lo scambio.
3. Invio di messaggi
Inviamo messaggi allo scambio, e lo scambio distribuisce i messaggi alle code corrispondenti in base alle regole di routing.
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
// Creare una connessione RabbitMQ
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
// Creare un canale
$channel = $connection->channel();
// Dichiarare lo scambio
$channel->exchange_declare(
'tizi365.fanout', // Nome dell'exchange, deve essere unico e non può essere duplicato
'fanout', // Tipo di exchange
false,
false, // Se persistente
false
);
// Oggetto messaggio, con il contenuto del messaggio come parametro
$msg = new AMQPMessage("ciao tizi365.com");
$channel->basic_publish(
$msg, // Oggetto messaggio
'tizi365.fanout' // Nome dell'exchange
);
echo ' [x] Sent ', $msg->getBody(), "\n";
// Rilasciare le risorse
$channel->close();
$connection->close();
4. Ricezione di messaggi
4.1. Definizione di una Coda e Associazione dell'Exchange
Per consumare i messaggi della coda, è necessario prima definire una coda e quindi associare la coda all'exchange di destinazione. Di seguito, viene definita una coda e associata a un exchange specifico.
// Dichiarare una coda, se il nome della coda è vuoto, verrà generato automaticamente un ID univoco e il nome della coda verrà restituito
list($queue_name, ,) = $channel->queue_declare(
"", // Nome della coda, non consentito duplicato, se vuoto, verrà generato automaticamente un ID univoco, rendendolo una coda anonima
false,
false, // Se persistere
true,
false
);
// Associare la coda all'exchange specificato
$channel->queue_bind(
$queue_name, // Nome della coda
'tizi365.fanout' // Nome dell'exchange
);
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
// Creare una connessione rabbitmq
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
// Creare un Canale
$channel = $connection->channel();
// Dichiarare un exchange
$channel->exchange_declare(
'tizi365.fanout', // Nome dell'exchange, deve essere unico e non ripetibile
'fanout', // Tipo di exchange
false,
false, // Se persistere
false
);
// Dichiarare una coda
list($queue_name, ,) = $channel->queue_declare(
"", // Nome della coda, non consentito duplicato, se vuoto viene generato un ID univoco, quindi è una coda anonima
false,
false, // Se persistere
true,
false
);
// Associare la coda all'exchange specificato
$channel->queue_bind(
$queue_name, // Nome della coda
'tizi365.fanout' // Nome dell'exchange
);
echo " [*] In attesa di un messaggio. Premi CTRL+C per uscire\n";
// Definire la funzione di elaborazione del messaggio (usando una funzione anonima qui)
$callback = function ($msg) {
// Logica di elaborazione del messaggio
echo ' [x] ', $msg->body, "\n";
};
// Creare un consumatore
$channel->basic_consume(
$queue_name, // Nome della coda da consumare
'', // Tag del consumatore, ignorato, quindi generare un ID univoco
false,
true, // Se riconoscere automaticamente il messaggio, cioè informare rabbitmq che il messaggio è stato elaborato correttamente.
false,
false,
$callback // Funzione di elaborazione del messaggio
);
// Se il canale non è chiuso, blocca il processo evitando che esca
while ($channel->is_open()) {
$channel->wait();
}
// Rilasciare risorse
$channel->close();
$connection->close();