1. Chuẩn bị trước khi học

Vui lòng đọc các phần sau đây trước để hiểu kiến thức liên quan

2. Định nghĩa Trạng Thái Mẫu

// Khai báo trạng thái mẫu
$channel->exchange_declare(
    'tizi365.topic', // Tên trạng thái mẫu, cần phải duy nhất, không thể lặp lại
    'topic', // Loại trạng thái mẫu
    false,
    false, // Có bền vững không
    false
);

Lưu ý: Cả người sản xuất và người tiêu thụ thông báo cần trạng thái mẫu.

3. Gửi Thông Báo

Chúng ta gửi thông báo đến trạng thái mẫu, và trạng thái mẫu gửi thông báo đến hàng đợi tương ứng dựa trên quy tắc định tuyến.

<?php

require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

// Tạo kết nối rabbitmq
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
// Tạo kênh
$channel = $connection->channel();

// Khai báo trạng thái mẫu
$channel->exchange_declare(
    'tizi365.topic', // Tên trạng thái mẫu, cần phải duy nhất, không thể lặp lại
    'topic', // Loại trạng thái mẫu
    false,
    false, // Có bền vững không
    false
);

// Đối tượng thông báo, tham số là nội dung thông báo
$msg = new AMQPMessage("xin chào tizi365.com");

// Gửi thông báo
// Lưu ý tham số thứ ba, tham số định tuyến
$channel->basic_publish(
    $msg, // Đối tượng thông báo
    'tizi365.topic', // Tên trạng thái mẫu
    "www.tizi365.com" // Tham số định tuyến, có thể được xác định một cách tùy ý theo yêu cầu
);

echo ' [x] Đã gửi ', $msg->getBody(), "\n";

// Giải phóng tài nguyên
$channel->close();
$connection->close();

4. Nhận Thông Báo

4.1. Định nghĩa Queue & Bind Exchange

Để tiêu thụ các tin nhắn từ hàng đợi, bạn cần định nghĩa một hàng đợi trước, sau đó ràng buộc hàng đợi đó với sàn gửi đích.

// Khai báo một hàng đợi ẩn danh
list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);

// Ràng buộc hàng đợi với sàn gửi cụ thể
$channel->queue_bind(
    $queue_name, // Tên hàng đợi
    'tizi365.topic', // Tên sàn gửi
    "*.tizi365.com" // Tham số định tuyến ràng buộc, ở đây sử dụng ký tự đại diện * (asterisk), có thể phù hợp với một từ duy nhất

Chú ý: Tất cả cài đặt tham số định tuyến đều sử dụng dấu * (asterisk) đại diện, có thể phù hợp với một từ duy nhất. Nếu thay đổi thành # (hash), nó có thể phù hợp với nhiều từ.

<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;

// Tạo kết nối với rabbitmq
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
// Tạo một kênh
$channel = $connection->channel();

// Khai báo một sàn gửi
$channel->exchange_declare(
    'tizi365.topic', // Tên sàn gửi, cần phải duy nhất và không thể lặp lại
    'topic', // Loại sàn gửi
    false,
    false, // Có bền vững không
    false
);

// Khai báo một hàng đợi ẩn danh
list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);

// Ràng buộc hàng đợi với sàn gửi cụ thể
$channel->queue_bind(
    $queue_name, // Tên hàng đợi
    'tizi365.topic', // Tên sàn gửi
    "*.tizi365.com" // Khóa định tuyến ràng buộc, sử dụng ký tự đại diện * ở đây, có thể phù hợp với một từ duy nhất
);

echo " [*] Đang chờ tin nhắn. Để thoát, nhấn CTRL+C\n";

// Định nghĩa hàm xử lý tin nhắn (ở đây sử dụng hàm ẩn danh)
$callback = function ($msg) {
    // Logic xử lý tin nhắn
    echo ' [x] ', $msg->body, "\n";
};

// Tạo một người tiêu dùng
$channel->basic_consume(
    $queue_name, // Tên hàng đợi tiêu thụ từ
    '', // Thẻ người tiêu dùng, nếu bỏ qua, một ID duy nhất sẽ được tạo tự động
    false,
    true, // Có tự động xác nhận tin nhắn không, tức là tự động thông báo với rabbitmq rằng tin nhắn đã được xử lý thành công
    false,
    false,
    $callback // Hàm xử lý tin nhắn
);

// Nếu kênh chưa đóng, tiếp tục chặn quá trình để tránh việc thoát quá trình
while ($channel->is_open()) {
    $channel->wait();
}

// Giải phóng tài nguyên
$channel->close();
$connection->close();

Bởi vì tham số định tuyến được thiết lập khi ràng buộc sàn gửi là *.tizi365.com, nó phù hợp với tham số định tuyến của tin nhắn (www.tizi365.com), vì vậy tin nhắn có thể được nhận.