Mode Penerusan PHP RabbitMQ (Mode Langsung)
Pada mode penerusan RabbitMQ, jenis pertukaran yang digunakan adalah "langsung" (direct). Perbedaan utamanya dari mode terbit-langganan adalah bahwa pertukaran langsung mengirimkan pesan ke antrian yang parameter penerusannya cocok sepenuhnya. Arsitekturnya seperti yang ditunjukkan dalam grafik berikut:
Catatan: Terlepas dari mode kerja yang digunakan dalam RabbitMQ, perbedaannya terletak pada jenis pertukaran yang digunakan dan parameter penerusan.
1. Panduan Persiapan
Silakan baca bagian-bagian berikut terlebih dahulu untuk memahami pengetahuan terkait:
- Konsep Dasar RabbitMQ
- Prinsip Mode Penerusan RabbitMQ
- Memulai Cepat untuk PHP dengan RabbitMQ (diperlukan, karena bagian-bagian selanjutnya tidak akan mengulangi kode, hanya menampilkan kode kunci)
- Bagian Pola Terbit-Langganan RabbitMQ untuk PHP (diperlukan, karena struktur kode hampir sama, kecuali jenis pertukaran dan parameter penerusannya)
2. Mendefinisikan Pertukaran Langsung
// Mendeklarasikan pertukaran
$channel->exchange_declare(
'tizi365.direct', // Nama pertukaran unik
'direct', // Jenis pertukaran
false,
false, // Apakah persisten
false
);
Catatan: Baik produsen pesan maupun konsumen memerlukan sebuah pertukaran.
3. Mengirim Pesan
Kami mengirim pesan ke pertukaran, dan pertukaran mengirimkan pesan ke antrian yang sesuai berdasarkan aturan penerusan.
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
// Membuat koneksi RabbitMQ
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
// Membuat saluran (channel)
$channel = $connection->channel();
// Mendeklarasikan pertukaran
$channel->exchange_declare(
'tizi365.direct', // Nama pertukaran unik
'direct', // Jenis pertukaran
false,
false, // Apakah persisten
false
);
// Objek pesan, isi pesannya adalah parameter
$msg = new AMQPMessage("hello tizi365.com");
// Perhatikan parameter ketiga, parameter penerus
$channel->basic_publish(
$msg, // Objek pesan
'tizi365.direct', // Nama pertukaran
"tizi365" // Parameter penerusan, dapat ditentukan sesuai kebutuhan
);
echo ' [x] Sent ', $msg->getBody(), "\n";
// Membebaskan sumber daya
$channel->close();
$connection->close();
Catatan: Parameter ketiga dalam metode
basic_publish
adalah parameter kunci.
4. Menerima Pesan
4.1. Mendefinisikan Antrian & Melakukan Bind Exchange
Untuk mengonsumsi pesan dari antrian, Anda perlu terlebih dahulu mendefinisikan sebuah antrian, kemudian melakukan bind antrian tersebut ke exchange tujuan.
// Mendeklarasikan sebuah antrian anonim
list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);
// Melakukan bind antrian ke exchange yang ditentukan
$channel->queue_bind(
$queue_name, // Nama antrian
'tizi365.fanout', // Nama exchange
"tizi365" // Parameter routing bind, di sini melakukan binding 'tizi365'
);
Catatan: Menurut aturan direct exchange, jika parameter routing yang dibawa saat mengirim pesan cocok dengan parameter routing yang diatur saat melakukan binding antrian ke exchange, pesan akan dikirim ke antrian ini.
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
// Membuat koneksi rabbitmq
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
// Membuat Channel
$channel = $connection->channel();
// Mendeklarasikan sebuah exchange
$channel->exchange_declare(
'tizi365.direct', // Nama exchange, harus unik dan tidak boleh diulang
'direct', // Tipe exchange
false,
false, // Apakah tahan lama (durable) atau tidak
false
);
// Mendeklarasikan sebuah antrian anonim
list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);
// Melakukan bind antrian ke exchange tertentu
$channel->queue_bind(
$queue_name, // Nama antrian
'tizi365.fanout', // Nama exchange
"tizi365" // Parameter routing binding, melakukan binding 'tizi365' di sini
);
echo " [*] Menunggu pesan. Tekan CTRL+C untuk keluar\n";
// Mendefinisikan fungsi penanganan pesan (menggunakan fungsi anonim di sini)
$callback = function ($msg) {
// Logika penanganan pesan
echo ' [x] ', $msg->body, "\n";
};
// Membuat konsumer
$channel->basic_consume(
$queue_name, // Nama antrian, nama antrian yang akan dikonsumsi
'', // Tag konsumer, diabaikan, ID unik dihasilkan secara otomatis
false,
true, // Apakah untuk otomatis mengakui pesan, yaitu memberitahu rabbitmq secara otomatis bahwa pesan telah berhasil diproses.
false,
false,
$callback // Fungsi penanganan pesan
);
// Blokir proses sampai saluran tidak ditutup, untuk mencegah proses keluar
while ($channel->is_open()) {
$channel->wait();
}
// Membebaskan sumber daya
$channel->close();
$connection->close();