الگوی انتشار/اشتراک (همچنین به عنوان الگوی Fanout شناخته میشود) در RabbitMQ یک الگو است که در آن پیامهای ارسال شده توسط یک تولیدکننده توسط چند مصرفکننده در صفهای مختلف پردازش میشوند، همانطور که در نمودار معماری زیر نشان داده شده است.
تبادل Fanout میتواند پیامها را به همهی صفهای متصل ارسال کند.
توجه: بدون توجه به حالت کاری RabbitMQ استفاده شده، تفاوت در نوع تبادل و پارامترهای مسیریابی استفاده شده است.
1. آموزش پیشنیاز
لطفاً فصلهای زیر را بخوانید تا دانش مرتبط را درک کنید:
- مفاهیم اصلی RabbitMQ
- الگوی انتشار/اشتراک RabbitMQ
- شروع سریع PHP RabbitMQ (الزامی، زیرا فصلهای بعدی کد را تکرار نمیکنند و تنها قطعات کلیدی کد را نشان میدهند)
2. تعریف تبادل Fanout
تعریف تبادل از طریق تابع exchange_declare کانال انجام میشود.
$channel->exchange_declare(
'tizi365.fanout', // نام تبادل، باید یکتا و نمیتواند تکراری باشد
'fanout', // نوع تبادل
false,
false, // آیا باید پایدار باشد
false
);
توجه: هم تولیدکنندهها و هم مصرفکنندههای پیام نیازمند تبادل هستند.
3. ارسال پیامها
ما پیامها را به تبادل میفرستیم و تبادل پیامها را بر اساس اصول مسیریابی به صفهای مربوطه ارسال میکند.
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
// ایجاد اتصال RabbitMQ
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
// ایجاد یک کانال
$channel = $connection->channel();
// اعلام تبادل
$channel->exchange_declare(
'tizi365.fanout', // نام تبادل، باید یکتا و نمیتواند تکراری باشد
'fanout', // نوع تبادل
false,
false, // آیا باید پایدار باشد
false
);
// شیء پیام، با محتوای پیام به عنوان پارامتر
$msg = new AMQPMessage("سلام tizi365.com");
$channel->basic_publish(
$msg, // شیء پیام
'tizi365.fanout' // نام تبادل
);
echo ' [x] ارسال شد ', $msg->getBody(), "\n";
// آزاد کردن منابع
$channel->close();
$connection->close();
4. دریافت پیامها
4.1. تعریف یک صف و متصلسازی تبادل
برای مصرف پیامهای صف، ابتدا باید یک صف تعریف کرده و سپس آن را به تبادل مورد نظر متصل کنید. در زیر، یک صف تعریف شده و به یک تبادل خاص متصل شده است.
// تعریف یک صف، اگر نام صف خالی باشد، یک شناسه یکتا به صورت خودکار تولید میشود و نام صف برگردانده میشود
list($queue_name, ,) = $channel->queue_declare(
"", // نام صف، اجازه تکرار ندارد، اگر خالی باشد، یک شناسه یکتا به صورت خودکار تولید میشود، اینجور صف را صف ناشناس میکند
false,
false, // آیا باید پایدار باشد؟
true,
false
);
// متصلسازی صف به تبادل مشخص
$channel->queue_bind(
$queue_name, // نام صف
'tizi365.fanout' // نام تبادل
);
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
// ایجاد اتصال rabbitmq
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
// ایجاد یک کانال
$channel = $connection->channel();
// تعریف یک تبادل
$channel->exchange_declare(
'tizi365.fanout', // نام تبادل، باید یکتا باشد و نمیتواند تکراری باشد
'fanout', // نوع تبادل
false,
false, // آیا باید پایدار باشد؟
false
);
// تعریف یک صف
list($queue_name, ,) = $channel->queue_declare(
"", // نام صف، اجازه تکرار ندارد، اگر خالی باشد تولید یک شناسه یکتا، سپس یکصف ناشناس است
false,
false, // آیا باید پایدار باشد؟
true,
false
);
// متصلسازی صف به تبادل مشخص
$channel->queue_bind(
$queue_name, // نام صف
'tizi365.fanout' // نام تبادل
);
echo " [*] DWaiting for message. To exit press CTRL+C\n";
// تعریف تابع پردازش پیام (اینجا از یک تابع ناشناس استفاده میکنیم)
$callback = function ($msg) {
// منطق پردازش پیام
echo ' [x] ', $msg->body, "\n";
};
// ایجاد یک مصرفکننده
$channel->basic_consume(
$queue_name, // نام صف، صفی برای مصرف
'', // برچسب مصرفکننده، نادیده گرفته میشود، سپس یک شناسه یکتا تولید میشود
false,
true, // آیا باید پیام را به صورت خودکار تأیید کند؟ یعنی به rabbitmq بگوید که پیام با موفقیت پردازش شده است.
false,
false,
$callback // تابع پردازش پیام
);
// اگر کانال بسته نشده باشد، فرآیند را مسدود نگهدارید تا از خروج جلوگیری شود
while ($channel->is_open()) {
$channel->wait();
}
// آزاد کردن منابع
$channel->close();
$connection->close();