Pola Publish/Subscribe RabbitMQ PHP (dikenal juga sebagai Pola Fanout)
Mode Fanout dalam RabbitMQ adalah pola di mana pesan yang dikirim oleh produsen diproses oleh beberapa konsumen di antrian yang berbeda, seperti yang ditunjukkan dalam diagram arsitektur di bawah.
Penukaran Fanout dapat meneruskan pesan ke semua antrian terikat.
Catatan: Terlepas dari mode kerja RabbitMQ yang digunakan, perbedaannya terletak pada jenis penukaran dan parameter routing yang digunakan.
1. Tutorial Prasyarat
Harap baca bab-bab berikut untuk memahami pengetahuan terkait:
- Dasar-dasar RabbitMQ
- Pola Publish/Subscribe RabbitMQ
- Panduan Cepat RabbitMQ PHP (Wajib, karena bab-bab selanjutnya tidak akan mengulangi kode namun hanya menampilkan potongan kode kunci)
2. Mendefinisikan Penukaran Fanout
Tentukan penukaran melalui fungsi exchange_declare pada saluran.
$channel->exchange_declare(
'tizi365.fanout', // Nama Penukaran, harus unik dan tidak dapat diduplikasi
'fanout', // Jenis Penukaran
false,
false, // Apakah harus bertahan
false
);
Catatan: Produsen dan konsumen pesan memerlukan penukaran.
3. Mengirimkan Pesan
Kami mengirimkan pesan ke penukaran, dan penukaran mengirimkan pesan ke antrian yang sesuai berdasarkan aturan routing.
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
// Membuat koneksi RabbitMQ
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
// Membuat Saluran
$channel = $connection->channel();
// Mendeklarasikan penukaran
$channel->exchange_declare(
'tizi365.fanout', // Nama Penukaran, harus unik dan tidak dapat diduplikasi
'fanout', // Jenis Penukaran
false,
false, // Apakah harus bertahan
false
);
// Objek Pesan, dengan konten pesan sebagai parameter
$msg = new AMQPMessage("halo tizi365.com");
$channel->basic_publish(
$msg, // Objek Pesan
'tizi365.fanout' // Nama Penukaran
);
echo ' [x] Terkirim ', $msg->getBody(), "\n";
// Melepaskan sumber daya
$channel->close();
$connection->close();
4. Menerima Pesan
4.1. Mendefinisikan Antrian & Mengikat Exchange
Untuk mengonsumsi pesan dari antrian, Anda perlu terlebih dahulu mendefinisikan sebuah antrian dan kemudian mengikatnya ke exchange target. Di bawah ini, sebuah antrian didefinisikan dan diikatkan ke sebuah exchange tertentu.
// Mendeklarasikan sebuah antrian, jika nama antriannya kosong, sebuah ID unik secara otomatis akan dibuat, dan nama antriannya akan dikembalikan
list($queue_name, ,) = $channel->queue_declare(
"", // Nama antrian, tidak diizinkan untuk diulang, jika kosong, sebuah ID unik secara otomatis akan dibuat, menjadikannya antrian anonim
false,
false, // Apakah persisten
true,
false
);
// Mengikat antrian ke exchange yang ditentukan
$channel->queue_bind(
$queue_name, // Nama antrian
'tizi365.fanout' // Nama exchange
);
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
// Membuat koneksi rabbitmq
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
// Membuat Channel
$channel = $connection->channel();
// Mendeklarasikan sebuah exchange
$channel->exchange_declare(
'tizi365.fanout', // Nama exchange, harus unik dan tidak dapat diulang
'fanout', // Tipe exchange
false,
false, // Apakah persisten
false
);
// Mendeklarasikan sebuah antrian
list($queue_name, ,) = $channel->queue_declare(
"", // Nama antrian, tidak diizinkan untuk diulang, jika kosong maka sebuah ID unik akan dihasilkan, maka antriannya akan menjadi anonim
false,
false, // Apakah persisten
true,
false
);
// Mengikat antrian ke exchange yang ditentukan
$channel->queue_bind(
$queue_name, // Nama antrian
'tizi365.fanout' // Nama exchange
);
echo " [*] Menunggu pesan. Tekan CTRL+C untuk keluar\n";
// Mendefinisikan fungsi pemrosesan pesan (menggunakan fungsi anonim di sini)
$callback = function ($msg) {
// Logika pemrosesan pesan
echo ' [x] ', $msg->body, "\n";
};
// Membuat konsumen
$channel->basic_consume(
$queue_name, // Nama antrian, antrian untuk dikonsumsi
'', // Tag konsumen, abaikan, maka sebuah ID unik akan dihasilkan
false,
true, // Apakah secara otomatis mengakui pesan, yaitu memberitahu rabbitmq bahwa pesan telah berhasil diproses
false,
false,
$callback // Fungsi pemrosesan pesan
);
// Jika channel tidak ditutup, tetap memblokir proses untuk mencegah keluar
while ($channel->is_open()) {
$channel->wait();
}
// Melepaskan sumber daya
$channel->close();
$connection->close();