Chế độ Routing Mode của RabbitMQ (Chế độ Trực tiếp)
Trong chế độ routing của RabbitMQ, loại sàn giao dịch được sử dụng là "trực tiếp". Sự khác biệt chính so với chế độ publish-subscribe là sàn giao dịch trực tiếp gửi các thông điệp đến các hàng đợi mà tham số định tuyến hoàn toàn trùng khớp. Kiến trúc được hiển thị trong đồ thị sau:
Lưu ý: Bất kể chế độ làm việc nào được sử dụng trong RabbitMQ, sự khác biệt nằm ở loại sàn giao dịch và các tham số định tuyến.
1. Hướng dẫn Chuẩn bị
Vui lòng đọc các phần sau đây trước để hiểu về kiến thức liên quan:
- Khái niệm cơ bản về RabbitMQ
- Nguyên lý Chế độ định tuyến của RabbitMQ
- Bắt đầu nhanh với PHP và RabbitMQ (bắt buộc, vì các phần sau sẽ không lặp lại mã, chỉ hiển thị mã chính)
- Phần Mẫu Publish-Subscribe cho PHP của RabbitMQ (bắt buộc, vì cấu trúc mã gần như giống nhau, ngoại trừ loại sàn giao dịch và các tham số định tuyến)
2. Xác định Sàn giao dịch Trực tiếp
// Khai báo sàn giao dịch
$channel->exchange_declare(
'tizi365.direct', // Tên sàn giao dịch duy nhất
'direct', // Loại sàn giao dịch
false,
false, // Có lưu trữ hay không
false
);
Lưu ý: Cả người sản xuất thông điệp lẫn người tiêu thụ đều cần một sàn giao dịch.
3. Gửi Thông Điệp
Chúng ta gửi các thông điệp đến sàn giao dịch, và sàn giao dịch gửi các thông điệp đến hàng đợi tương ứng dựa trên các quy tắc định tuyến.
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
// Tạo kết nối RabbitMQ
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
// Tạo một kênh
$channel = $connection->channel();
// Khai báo sàn giao dịch
$channel->exchange_declare(
'tizi365.direct', // Tên sàn giao dịch duy nhất
'direct', // Loại sàn giao dịch
false,
false, // Có lưu trữ hay không
false
);
// Đối tượng thông điệp, nội dung thông điệp là tham số
$msg = new AMQPMessage("xin chào tizi365.com");
// Chú ý đến tham số thứ ba, tham số định tuyến
$channel->basic_publish(
$msg, // Đối tượng thông điệp
'tizi365.direct', // Tên sàn giao dịch
"tizi365" // Tham số định tuyến, có thể được định nghĩa một cách tùy ý nếu cần
);
echo ' [x] Đã gửi ', $msg->getBody(), "\n";
// Giải phóng tài nguyên
$channel->close();
$connection->close();
Lưu ý: Tham số thứ ba trong phương thức
basic_publish
là một tham số quan trọng.
4. Nhận Thông Điệp
4.1. Định nghĩa Hàng đợi & Ràng buộc Exchange
Để tiêu thụ các tin nhắn hàng đợi, bạn cần trước tiên định nghĩa một hàng đợi và sau đó ràng buộc hàng đợi đó với sàn giao dịch mục tiêu.
// Khai báo một hàng đợi ẩn danh
list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);
// Ràng buộc hàng đợi với sàn giao dịch cụ thể
$channel->queue_bind(
$queue_name, // Tên hàng đợi
'tizi365.fanout', // Tên sàn giao dịch
"tizi365" // Tham số định tuyến ràng buộc, ràng buộc 'tizi365' ở đây
);
Lưu ý: Theo quy tắc sàn giao dịch trực tiếp, nếu tham số định tuyến đi kèm khi gửi một tin nhắn khớp với tham số định tuyến được thiết lập khi ràng buộc hàng đợi với sàn giao dịch, tin nhắn sẽ được gửi đến hàng đợi này.
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
// Tạo kết nối rabbitmq
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
// Tạo một Kênh
$channel = $connection->channel();
// Khai báo một sàn giao dịch
$channel->exchange_declare(
'tizi365.direct', // Tên sàn giao dịch, cần phải là duy nhất và không thể lặp lại
'direct', // Loại sàn giao dịch
false,
false, // Durable hay không
false
);
// Khai báo một hàng đợi ẩn danh
list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);
// Ràng buộc hàng đợi với sàn giao dịch cụ thể
$channel->queue_bind(
$queue_name, // Tên hàng đợi
'tizi365.fanout', // Tên sàn giao dịch
"tizi365" // Tham số định tuyến ràng buộc, ràng buộc 'tizi365' ở đây
);
echo " [*] Đang chờ tin nhắn. Để thoát, nhấn CTRL+C\n";
// Định nghĩa hàm xử lý tin nhắn (sử dụng hàm vô danh ở đây)
$callback = function ($msg) {
// Logic xử lý tin nhắn
echo ' [x] ', $msg->body, "\n";
};
// Tạo một người tiêu dùng
$channel->basic_consume(
$queue_name, // Tên hàng đợi, tên của hàng đợi cần tiêu thụ
'', // Tag người tiêu dùng, bị bỏ qua, một ID duy nhất được tạo tự động
false,
true, // Có tự động chấp nhận tin nhắn không, tức là tự động thông báo cho rabbitmq rằng tin nhắn đã được xử lý thành công
false,
false,
$callback // Hàm xử lý tin nhắn
);
// Chặn quá trình cho đến khi kênh không đóng, để ngăn quá trình thoát
while ($channel->is_open()) {
$channel->wait();
}
// Giải phóng tài nguyên
$channel->close();
$connection->close();