Mô hình Publish/Subscribe (còn được gọi là Mô hình Fanout) trong RabbitMQ sử dụng PHP

Mô hình Fanout trong RabbitMQ là một mô hình mà các tin nhắn được gửi bởi một producer sẽ được xử lý bởi nhiều consumers khác nhau trong các hàng đợi khác nhau, như thể hiện trong sơ đồ kiến trúc dưới đây.

RabbitMQ Work Mode

Trong mô hình Fanout, sự truyền tải tin nhắn có thể được thực hiện tới tất cả các hàng đợi được ràng buộc.

Lưu ý: Bất kể chế độ làm việc RabbitMQ nào được sử dụng, sự khác biệt nằm ở loại trao đổi và các tham số định tuyến được sử dụng.

1. Hướng dẫn cơ bản

Vui lòng đọc các chương sau để hiểu về kiến thức liên quan:

2. Định nghĩa trao đổi Fanout

Định nghĩa trao đổi thông qua chức năng exchange_declare của kênh.

$channel->exchange_declare(
    'tizi365.fanout', // Tên trao đổi, phải là duy nhất và không thể trùng lặp
    'fanout', // Loại trao đổi
    false,
    false, // Có bền vững hay không
    false
);

Lưu ý: Cả người sản xuất tin nhắn và người tiêu dùng đều cần trao đổi.

3. Gửi tin nhắn

Chúng ta gửi tin nhắn tới trao đổi, và trao đổi gửi tin nhắn tới các hàng đợi tương ứng dựa trên các quy tắc định tuyến.

<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

// Tạo kết nối RabbitMQ
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
// Tạo một Kênh
$channel = $connection->channel();

// Khai báo trao đổi
$channel->exchange_declare(
    'tizi365.fanout', // Tên trao đổi, phải là duy nhất và không thể trùng lặp
    'fanout', // Loại trao đổi
    false,
    false, // Có bền vững hay không
    false
);
// Đối tượng tin nhắn, với nội dung tin nhắn là tham số
$msg = new AMQPMessage("xin chào từ tizi365.com");

$channel->basic_publish(
    $msg, // Đối tượng tin nhắn
    'tizi365.fanout' // Tên trao đổi
);

echo ' [x] Đã gửi ', $msg->getBody(), "\n";

// Giải phóng tài nguyên
$channel->close();
$connection->close();

4. Nhận tin nhắn

4.1. Định nghĩa Queue & Liên kết với Exchange

Để tiêu thụ các thông điệp từ hàng đợi, bạn cần đầu tiên định nghĩa một hàng đợi và sau đó liên kết hàng đợi đó với sàn gửi đi. Dưới đây, một hàng đợi được định nghĩa và liên kết với một sàn cụ thể.

// Khai báo một hàng đợi, nếu tên hàng đợi là trống, một ID duy nhất sẽ được tạo tự động và tên hàng đợi sẽ được trả về
list($queue_name, ,) = $channel->queue_declare(
    "", // Tên hàng đợi, không được phép trùng lặp, nếu trống, một ID duy nhất sẽ được tạo tự động, tạo thành một hàng đợi ẩn danh
    false,
    false, // Có lưu trữ không
    true,
    false
);

// Liên kết hàng đợi với sàn được chỉ định
$channel->queue_bind(
    $queue_name, // Tên hàng đợi
    'tizi365.fanout' // Tên sàn gửi đi
);
<?php 
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;

// Tạo kết nối rabbitmq
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
// Tạo một Kênh
$channel = $connection->channel();

// Khai báo một sàn gửi đi
$channel->exchange_declare(
    'tizi365.fanout', // Tên sàn gửi đi, cần phải là duy nhất và không thể lặp lại
    'fanout', // Loại sàn gửi đi
    false,
    false, // Có lưu trữ không
    false
);

// Khai báo một hàng đợi
list($queue_name, ,) = $channel->queue_declare(
    "", // Tên hàng đợi, không được phép lặp lại, nếu trống thì tạo một ID duy nhất, sau đó nó là một hàng đợi ẩn danh
    false,
    false, // Có lưu trữ không
    true,
    false
);

// Liên kết hàng đợi với sàn gửi đi được chỉ định
$channel->queue_bind(
    $queue_name, // Tên hàng đợi
    'tizi365.fanout' // Tên sàn gửi đi
);

echo " [*] Đang chờ tin nhắn. Nhấn CTRL+C để thoát\n";

// Định nghĩa hàm xử lý tin nhắn (sử dụng hàm ẩn danh ở đây)
$callback = function ($msg) {
    // Logic xử lý tin nhắn
    echo ' [x] ', $msg->body, "\n";
};

// Tạo một người tiêu thụ
$channel->basic_consume(
    $queue_name, // Tên hàng đợi, hàng đợi để tiêu thụ
    '', // Thẻ người tiêu thụ, bỏ qua, sau đó tạo một ID duy nhất
    false,
    true, // Có tự động nhận biết tin nhắn không, tức là cho rabbitmq biết rằng tin nhắn đã được xử lý thành công
    false,
    false,
    $callback // Hàm xử lý tin nhắn
);

// Nếu kênh không bị đóng, tiếp tục chặn quá trình để ngăn nó thoát
while ($channel->is_open()) {
    $channel->wait();
}

// Giải phóng tài nguyên
$channel->close();
$connection->close();