حالت مسیریابی (Routing Mode) PHP RabbitMQ (حالت مستقیم)
در حالت مسیریابی RabbitMQ، نوع انتقالی از تبادل از نوع "direct" استفاده میشود. تفاوت اصلی با حالت انتشار-اشتراک این است که تبادل مستقیم پیامها را به صفهایی ارسال میکند که پارامترهای مسیریابی آنها کاملاً مطابقت دارد. معماری به شکل گراف زیر نمایش داده شده است:
توجه: بدون توجه به حالت کاری که در RabbitMQ استفاده میشود، اختلاف در نوع تبادل استفاده شده و پارامترهای مسیریابی وجود دارد.
1. آمادهسازی آموزشی
لطفاً ابتدا بخشهای زیر را بخوانید تا دانش مربوطه را متوجه شوید:
- مفاهیم پایه RabbitMQ
- اصول حالت مسیریابی RabbitMQ
- شروع سریع PHP با RabbitMQ (الزامی، زیرا بخشهای بعدی کد را تکرار نمیکنند و تنها کد کلیدی را نمایش میدهند)
- بخش الگوی انتشار-اشتراک برای PHP RabbitMQ (الزامی، زیرا ساختار کد تقریباً یکسان است، به جز نوع تبادل و پارامترهای مسیریابی)
2. تعریف تبادل مستقیم
// اعلام تبادل
$channel->exchange_declare(
'tizi365.direct', // نام یکتا تبادل
'direct', // نوع تبادل
false,
false, // آیا باید پایدار باشد؟
false
);
توجه: هر دو تولیدکننده و مصرفکننده پیام نیاز به یک تبادل دارند.
3. ارسال پیامها
ما پیامها را به تبادل ارسال میکنیم و تبادل پیامها را براساس قوانین مسیردهی به صف مربوطه ارسال میکند.
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
// ایجاد اتصال RabbitMQ
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
// ایجاد یک کانال
$channel = $connection->channel();
// اعلام تبادل
$channel->exchange_declare(
'tizi365.direct', // نام یکتا تبادل
'direct', // نوع تبادل
false,
false, // آیا باید پایدار باشد؟
false
);
// شیء پیام، محتوای پیام پارامتر است
$msg = new AMQPMessage("hello tizi365.com");
// توجه: به پارامتر سوم تابع basic_publish توجه کنید، پارامتر مسیردهی
$channel->basic_publish(
$msg, // شیء پیام
'tizi365.direct', // نام تبادل
"tizi365" // پارامتر مسیردهی، میتواند به دلخواه تعریف شود
);
echo ' [x] Sent ', $msg->getBody(), "\n";
// آزادکردن منابع
$channel->close();
$connection->close();
توجه: پارامتر سوم در متد
basic_publish
یک پارامتر کلیدی است.
4. دریافت پیامها
4.1. تعریف صف و متصل کردن تبادل
برای مصرف پیامهای صف، ابتدا باید یک صف را تعریف کرده و سپس آن را به تبادل مورد نظر متصل کنید.
// تعریف یک صف ناشناس
list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);
// متصل کردن صف به تبادل مشخص
$channel->queue_bind(
$queue_name, // نام صف
'tizi365.fanout', // نام تبادل
"tizi365" // پارامتر مسیریابی متصل، در اینجا 'tizi365' را میبندد
);
نکته: طبق قوانین تبادل مستقیم، اگر پارامتر مسیریابی حمل شده هنگام ارسال یک پیام با پارامتر مسیریابی تعیین شده هنگام متصل شدن صف به تبادل مطابقت داشته باشد، پیام به این صف تحویل داده میشود.
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
// ایجاد اتصال rabbitmq
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
// ایجاد یک کانال
$channel = $connection->channel();
// تعریف یک تبادل
$channel->exchange_declare(
'tizi365.direct', // نام تبادل، باید یکتا باشد و نباید تکراری باشد
'direct', // نوع تبادل
false,
false, // آیا دوامدار است
false
);
// تعریف یک صف ناشناس
list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);
// متصل کردن صف به یک تبادل خاص
$channel->queue_bind(
$queue_name, // نام صف
'tizi365.fanout', // نام تبادل
"tizi365" // پارامتر مسیریابی متصل، در اینجا 'tizi365' را میبندد
);
echo " [*] منتظر پیام هستم. برای خروج، CTRL+C را فشار دهید\n";
// تعریف تابع پردازش پیام (در اینجا از یک تابع ناشناس استفاده شده است)
$callback = function ($msg) {
// منطق پردازش پیام
echo ' [x] ', $msg->body, "\n";
};
// ایجاد یک مصرفکننده
$channel->basic_consume(
$queue_name, // نام صف، نام صفی که باید مصرف شود
'', // تگ مصرفکننده، نادیده گرفته میشود، یک شناسه یکتا به صورت خودکار تولید میشود
false,
true, // آیا به طور خودکار پیام را تأیید میکند، یعنی به طور خودکار به rabbitmq اطلاع دهید که پیام با موفقیت پردازش شده است.
false,
false,
$callback // تابع پردازش پیام
);
// مسدود کردن فرآیند تا زمانی که کانال بسته نشده است، برای جلوگیری از خروج فرآیند
while ($channel->is_open()) {
$channel->wait();
}
// آزاد سازی منابع
$channel->close();
$connection->close();