Pola Publish/Subscribe RabbitMQ PHP (dikenal juga sebagai Pola Fanout)

Mode Fanout dalam RabbitMQ adalah pola di mana pesan yang dikirim oleh produsen diproses oleh beberapa konsumen di antrian yang berbeda, seperti yang ditunjukkan dalam diagram arsitektur di bawah.

RabbitMQ Mode Kerja

Penukaran Fanout dapat meneruskan pesan ke semua antrian terikat.

Catatan: Terlepas dari mode kerja RabbitMQ yang digunakan, perbedaannya terletak pada jenis penukaran dan parameter routing yang digunakan.

1. Tutorial Prasyarat

Harap baca bab-bab berikut untuk memahami pengetahuan terkait:

2. Mendefinisikan Penukaran Fanout

Tentukan penukaran melalui fungsi exchange_declare pada saluran.

$channel->exchange_declare(
    'tizi365.fanout', // Nama Penukaran, harus unik dan tidak dapat diduplikasi
    'fanout', // Jenis Penukaran
    false,
    false, // Apakah harus bertahan
    false
);

Catatan: Produsen dan konsumen pesan memerlukan penukaran.

3. Mengirimkan Pesan

Kami mengirimkan pesan ke penukaran, dan penukaran mengirimkan pesan ke antrian yang sesuai berdasarkan aturan routing.

<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

// Membuat koneksi RabbitMQ
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
// Membuat Saluran
$channel = $connection->channel();

// Mendeklarasikan penukaran
$channel->exchange_declare(
    'tizi365.fanout', // Nama Penukaran, harus unik dan tidak dapat diduplikasi
    'fanout', // Jenis Penukaran
    false,
    false, // Apakah harus bertahan
    false
);
// Objek Pesan, dengan konten pesan sebagai parameter
$msg = new AMQPMessage("halo tizi365.com");

$channel->basic_publish(
    $msg, // Objek Pesan
    'tizi365.fanout' // Nama Penukaran
);

echo ' [x] Terkirim ', $msg->getBody(), "\n";

// Melepaskan sumber daya
$channel->close();
$connection->close();

4. Menerima Pesan

4.1. Mendefinisikan Antrian & Mengikat Exchange

Untuk mengonsumsi pesan dari antrian, Anda perlu terlebih dahulu mendefinisikan sebuah antrian dan kemudian mengikatnya ke exchange target. Di bawah ini, sebuah antrian didefinisikan dan diikatkan ke sebuah exchange tertentu.

// Mendeklarasikan sebuah antrian, jika nama antriannya kosong, sebuah ID unik secara otomatis akan dibuat, dan nama antriannya akan dikembalikan
list($queue_name, ,) = $channel->queue_declare(
    "", // Nama antrian, tidak diizinkan untuk diulang, jika kosong, sebuah ID unik secara otomatis akan dibuat, menjadikannya antrian anonim
    false,
    false, // Apakah persisten
    true,
    false
);

// Mengikat antrian ke exchange yang ditentukan
$channel->queue_bind(
    $queue_name, // Nama antrian
    'tizi365.fanout' // Nama exchange
);
<?php 
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;

// Membuat koneksi rabbitmq
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
// Membuat Channel
$channel = $connection->channel();

// Mendeklarasikan sebuah exchange
$channel->exchange_declare(
    'tizi365.fanout', // Nama exchange, harus unik dan tidak dapat diulang
    'fanout', // Tipe exchange
    false,
    false, // Apakah persisten
    false
);

// Mendeklarasikan sebuah antrian
list($queue_name, ,) = $channel->queue_declare(
    "", // Nama antrian, tidak diizinkan untuk diulang, jika kosong maka sebuah ID unik akan dihasilkan, maka antriannya akan menjadi anonim
    false,
    false, // Apakah persisten
    true,
    false
);

// Mengikat antrian ke exchange yang ditentukan
$channel->queue_bind(
    $queue_name, // Nama antrian
    'tizi365.fanout' // Nama exchange
);

echo " [*] Menunggu pesan. Tekan CTRL+C untuk keluar\n";

// Mendefinisikan fungsi pemrosesan pesan (menggunakan fungsi anonim di sini)
$callback = function ($msg) {
    // Logika pemrosesan pesan
    echo ' [x] ', $msg->body, "\n";
};

// Membuat konsumen
$channel->basic_consume(
    $queue_name, // Nama antrian, antrian untuk dikonsumsi
    '', // Tag konsumen, abaikan, maka sebuah ID unik akan dihasilkan
    false,
    true, // Apakah secara otomatis mengakui pesan, yaitu memberitahu rabbitmq bahwa pesan telah berhasil diproses
    false,
    false,
    $callback // Fungsi pemrosesan pesan
);

// Jika channel tidak ditutup, tetap memblokir proses untuk mencegah keluar
while ($channel->is_open()) {
    $channel->wait();
}

// Melepaskan sumber daya
$channel->close();
$connection->close();