نمط النشر/الاشتراك في RabbitMQ باستخدام PHP (المعروف أيضًا بنمط النشر/الاشتراك بنظام المراوح)

نمط النشر/الاشتراك بنظام المراوح في RabbitMQ هو نمط حيث يتم معالجة الرسائل المرسلة بواسطة منتج من قبل عدة مستهلكين في صفوف مختلفة، كما هو موضح في رسم البياني المعماري أدناه.

وضع عمل RabbitMQ

يمكن لصرف المراوح إعادة توجيه الرسائل إلى جميع الصفوف المرتبطة بها.

ملاحظة: بغض النظر عن وضع العمل الذي يتم استخدامه في RabbitMQ، الفارق يكمن في نوع الصرف ومعلمات التوجيه المستخدمة.

1. البرنامج التعليمي الأولي

الرجاء قراءة الفصول التالية لفهم المعرفة ذات الصلة:

2. تعريف صرف المراوح

تعريف الصرف من خلال وظيفة exchange_declare في القناة.

$channel->exchange_declare(
    'tizi365.fanout', // اسم الصرف، يجب أن يكون فريدًا ولا يمكن تكراره
    'fanout', // نوع الصرف
    false,
    false, // ما إذا كان يجب الاحتفاظ بها
    false
);

ملاحظة: كل من منتجات الرسائل والمستهلكين تحتاج إلى الصرف.

3. إرسال الرسائل

نقوم بإرسال الرسائل إلى الصرف، ويقوم الصرف بتوصيل الرسائل إلى الصفوف المقابلة استنادًا إلى قواعد التوجيه.

<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

// إنشاء اتصال RabbitMQ
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
// إنشاء قناة
$channel = $connection->channel();

// تعريف الصرف
$channel->exchange_declare(
    'tizi365.fanout', // اسم الصرف، يجب أن يكون فريدًا ولا يمكن تكراره
    'fanout', // نوع الصرف
    false,
    false, // ما إذا كان يجب الاحتفاظ بها
    false
);
// كائن الرسالة، مع محتوى الرسالة كمعلمة
$msg = new AMQPMessage("مرحبًا tizi365.com");

$channel->basic_publish(
    $msg, // كائن الرسالة
    'tizi365.fanout' // اسم الصرف
);

echo ' [x] تم الإرسال ', $msg->getBody(), "\n";

// قم بإطلاق الموارد
$channel->close();
$connection->close();

4. استقبال الرسائل

4.1. تعريف طابور وربط التبادل

لاستهلاك رسائل الطابور، عليك أولاً تعريف طابور ومن ثم ربطه بالتبادل المستهدف. أدناه، يتم تعريف طابور وربطه بتبادل محدد.

// قم بتعريف طابور، إذا كان اسم الطابور فارغًا، يتم إنشاء معرف فريد تلقائيًا، ويتم إرجاع اسم الطابور
list($queue_name, ,) = $channel->queue_declare(
    "", // اسم الطابور، لا يُسمح بتكراره، إذا كان فارغًا، يتم إنشاء معرف فريد تلقائيًا، مما يجعله طابورًا مجهول المصدر
    false,
    false, // ما إذا كان لديه القابلية للصمود
    true,
    false
);

// ربط الطابور بالتبادل المحدد
$channel->queue_bind(
    $queue_name, // اسم الطابور
    'tizi365.fanout' // اسم التبادل
);
<?php 
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;

// إنشاء اتصال rabbitmq
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
// إنشاء قناة
$channel = $connection->channel();

// تعريف التبادل
$channel->exchange_declare(
    'tizi365.fanout', // اسم التبادل، يجب أن يكون فريدًا ولا يمكن تكراره
    'fanout', // نوع التبادل
    false,
    false, // ما إذا كان لديه القابلية للصمود
    false
);

// تعريف طابور
list($queue_name, ,) = $channel->queue_declare(
    "", // اسم الطابور، لا يُسمح بتكراره، إذا كان فارغًا يتم إنشاء معرف فريد تلقائيًا مما يجعله طابورًا مجهول المصدر
    false,
    false, // ما إذا كان لديه القابلية للصمود
    true,
    false
);

// ربط الطابور بالتبادل المحدد
$channel->queue_bind(
    $queue_name, // اسم الطابور
    'tizi365.fanout' // اسم التبادل
);

echo " [*] في انتظار الرسالة. للخروج، اضغط CTRL+C\n";

// تعريف وظيفة معالجة الرسالة (استخدام دالة مجهولة هنا)
$callback = function ($msg) {
    // منطق معالجة الرسالة
    echo ' [x] ', $msg->body, "\n";
};

// إنشاء مستهلك
$channel->basic_consume(
    $queue_name, // اسم الطابور، الطابور المراد استهلاكه
    '', // علامة المستهلك، تجاهلها، ثم إنشاء معرف فريد
    false,
    true, // ما إذا كان تلقائيًا يقر الرسالة، أي يخبر rabbitmq بأن الرسالة تم معالجتها بنجاح
    false,
    false,
    $callback // دالة معالجة الرسالة
);

// إذا كانت القناة غير مغلقة، قم بحجب العملية لمنع الخروج
while ($channel->is_open()) {
    $channel->wait();
}

// تحرير الموارد
$channel->close();
$connection->close();