Mode Penerusan PHP RabbitMQ (Mode Langsung)

Pada mode penerusan RabbitMQ, jenis pertukaran yang digunakan adalah "langsung" (direct). Perbedaan utamanya dari mode terbit-langganan adalah bahwa pertukaran langsung mengirimkan pesan ke antrian yang parameter penerusannya cocok sepenuhnya. Arsitekturnya seperti yang ditunjukkan dalam grafik berikut:

RabbitMQ Mode Langsung

Catatan: Terlepas dari mode kerja yang digunakan dalam RabbitMQ, perbedaannya terletak pada jenis pertukaran yang digunakan dan parameter penerusan.

1. Panduan Persiapan

Silakan baca bagian-bagian berikut terlebih dahulu untuk memahami pengetahuan terkait:

2. Mendefinisikan Pertukaran Langsung

// Mendeklarasikan pertukaran
$channel->exchange_declare(
    'tizi365.direct', // Nama pertukaran unik
    'direct', // Jenis pertukaran
    false,
    false, // Apakah persisten
    false
);

Catatan: Baik produsen pesan maupun konsumen memerlukan sebuah pertukaran.

3. Mengirim Pesan

Kami mengirim pesan ke pertukaran, dan pertukaran mengirimkan pesan ke antrian yang sesuai berdasarkan aturan penerusan.

<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

// Membuat koneksi RabbitMQ
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
// Membuat saluran (channel)
$channel = $connection->channel();

// Mendeklarasikan pertukaran
$channel->exchange_declare(
    'tizi365.direct', // Nama pertukaran unik
    'direct', // Jenis pertukaran
    false,
    false, // Apakah persisten
    false
);

// Objek pesan, isi pesannya adalah parameter
$msg = new AMQPMessage("hello tizi365.com");

// Perhatikan parameter ketiga, parameter penerus
$channel->basic_publish(
    $msg, // Objek pesan
    'tizi365.direct', // Nama pertukaran
    "tizi365" // Parameter penerusan, dapat ditentukan sesuai kebutuhan
);

echo ' [x] Sent ', $msg->getBody(), "\n";

// Membebaskan sumber daya
$channel->close();
$connection->close();

Catatan: Parameter ketiga dalam metode basic_publish adalah parameter kunci.

4. Menerima Pesan

4.1. Mendefinisikan Antrian & Melakukan Bind Exchange

Untuk mengonsumsi pesan dari antrian, Anda perlu terlebih dahulu mendefinisikan sebuah antrian, kemudian melakukan bind antrian tersebut ke exchange tujuan.

// Mendeklarasikan sebuah antrian anonim
list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);

// Melakukan bind antrian ke exchange yang ditentukan
$channel->queue_bind(
    $queue_name, // Nama antrian
    'tizi365.fanout', // Nama exchange
    "tizi365" // Parameter routing bind, di sini melakukan binding 'tizi365'
);

Catatan: Menurut aturan direct exchange, jika parameter routing yang dibawa saat mengirim pesan cocok dengan parameter routing yang diatur saat melakukan binding antrian ke exchange, pesan akan dikirim ke antrian ini.

<?php

require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;

// Membuat koneksi rabbitmq
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
// Membuat Channel
$channel = $connection->channel();

// Mendeklarasikan sebuah exchange
$channel->exchange_declare(
    'tizi365.direct', // Nama exchange, harus unik dan tidak boleh diulang
    'direct', // Tipe exchange
    false,
    false, // Apakah tahan lama (durable) atau tidak
    false
);

// Mendeklarasikan sebuah antrian anonim
list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);

// Melakukan bind antrian ke exchange tertentu
$channel->queue_bind(
    $queue_name, // Nama antrian
    'tizi365.fanout', // Nama exchange
    "tizi365" // Parameter routing binding, melakukan binding 'tizi365' di sini
);

echo " [*] Menunggu pesan. Tekan CTRL+C untuk keluar\n";

// Mendefinisikan fungsi penanganan pesan (menggunakan fungsi anonim di sini)
$callback = function ($msg) {
    // Logika penanganan pesan
    echo ' [x] ', $msg->body, "\n";
};

// Membuat konsumer
$channel->basic_consume(
    $queue_name, // Nama antrian, nama antrian yang akan dikonsumsi
    '', // Tag konsumer, diabaikan, ID unik dihasilkan secara otomatis
    false,
    true, // Apakah untuk otomatis mengakui pesan, yaitu memberitahu rabbitmq secara otomatis bahwa pesan telah berhasil diproses.
    false,
    false,
    $callback // Fungsi penanganan pesan
);

// Blokir proses sampai saluran tidak ditutup, untuk mencegah proses keluar
while ($channel->is_open()) {
    $channel->wait();
}

// Membebaskan sumber daya
$channel->close();
$connection->close();