1. Pratutorial
Silakan baca bagian-bagian berikut terlebih dahulu untuk memahami pengetahuan terkait:
- Konsep Dasar RabbitMQ
- Prinsip Pola Topik RabbitMQ
- Panduan Cepat PHP RabbitMQ Bab (diperlukan, karena bab-bab selanjutnya tidak akan mengulangi kode, hanya menunjukkan kode kunci)
- Bab Pola Terbit/Langganan PHP RabbitMQ (diperlukan, karena kode hampir sama, hanya jenis pertukaran dan parameter routing yang berbeda)
2. Mendefinisikan Bursa Topik
// Mendeklarasikan pertukaran
$channel->exchange_declare(
'tizi365.topic', // Nama pertukaran, harus unik, tidak boleh diulang
'topic', // Jenis pertukaran
false,
false, // Apakah bertahan
false
);
Catatan: Baik produsen pesan maupun konsumen memerlukan pertukaran.
3. Mengirim Pesan
Kami mengirim pesan ke pertukaran, dan pertukaran mengirim pesan ke antrian yang sesuai berdasarkan aturan routing.
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
// Membuat koneksi rabbitmq
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
// Membuat Channel
$channel = $connection->channel();
// Mendeklarasikan pertukaran
$channel->exchange_declare(
'tizi365.topic', // Nama pertukaran, harus unik, tidak boleh diulang
'topic', // Jenis pertukaran
false,
false, // Apakah bertahan
false
);
// Objek pesan, parameter adalah konten pesan
$msg = new AMQPMessage("hello tizi365.com");
// Mengirim pesan
// Perhatikan parameter ketiga, parameter routing
$channel->basic_publish(
$msg, // Objek Pesan
'tizi365.topic', // Nama pertukaran
"www.tizi365.com" // Parameter routing, bisa didefinisikan sesuai kebutuhan
);
echo ' [x] Sent ', $msg->getBody(), "\n";
// Melepaskan sumber daya
$channel->close();
$connection->close();
4.1. Tentukan Antrian & Ikatan Exchange
Untuk mengonsumsi pesan dari antrian, Anda perlu menentukan antrian terlebih dahulu, kemudian mengikat antrian ke exchange tujuan.
// Mendeklarasikan sebuah antrian anonim
list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);
// Mengikat antrian ke exchange yang ditentukan
$channel->queue_bind(
$queue_name, // Nama antrian
'tizi365.topic', // Nama exchange
"*.tizi365.com" // Parameter routing ikatan, di sini menggunakan wildcard * (asterisk), yang dapat cocok dengan satu kata
Catatan: Semua parameter routing yang diatur menggunakan wildcard * (asterisk), yang dapat cocok dengan satu kata. Jika diubah menjadi # (tagar), maka dapat cocok dengan beberapa kata.
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
// Membuat koneksi rabbitmq
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
// Membuat Channel
$channel = $connection->channel();
// Mendeklarasikan sebuah exchange
$channel->exchange_declare(
'tizi365.topic', // Nama exchange, harus unik dan tidak dapat diulang
'topic', // Tipe exchange
false,
false, // Apakah tahan lama
false
);
// Mendeklarasikan antrian anonim
list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);
// Mengikat antrian ke exchange yang ditentukan
$channel->queue_bind(
$queue_name, // Nama antrian
'tizi365.topic', // Nama exchange
"*.tizi365.com" // Kunci routing ikatan, menggunakan wildcard * di sini, yang dapat cocok dengan satu kata
);
echo " [*] Menunggu pesan. Tekan CTRL+C untuk keluar\n";
// Mendefinisikan fungsi penanganan pesan (menggunakan fungsi anonim di sini)
$callback = function ($msg) {
// Logika pemrosesan pesan
echo ' [x] ', $msg->body, "\n";
};
// Membuat konsumen
$channel->basic_consume(
$queue_name, // Nama antrian untuk dikonsumsi
'', // Tag konsumen, jika diabaikan, ID unik akan dibuat secara otomatis
false,
true, // Apakah secara otomatis mengakui pesan, yaitu memberitahukan rabbitmq secara otomatis bahwa pesan telah berhasil diproses
false,
false,
$callback // Fungsi penanganan pesan
);
// Jika channel tidak ditutup, terus blokir proses untuk menghindari keluarnya proses
while ($channel->is_open()) {
$channel->wait();
}
// Membebaskan sumber daya
$channel->close();
$connection->close();
Karena parameter routing yang diatur saat mengikat exchange adalah *.tizi365.com, hal itu cocok dengan parameter routing pesan (www.tizi365.com), sehingga pesan dapat diterima.