PHP RabbitMQ पब्लिश/सब्सक्राइब पैटर्न (जिसे फैनआउट पैटर्न भी कहते हैं)
रैबिटएमक्यू में फैनआउट मोड एक पैटर्न है जहां उत्पादक द्वारा भेजे गए संदेश को विभिन्न क्यू में से कई उपभोक्ताओं द्वारा प्रसंस्कृत किया जाता है, जैसा कि निम्नलिखित संरचना आरेख में दिखाया गया है।
फैनआउट एक्सचेंज सभी बाउंड क्यू को संदेश फॉरवर्ड कर सकता है।
ध्यान दें: रैबिटएमक्यू काम करने के तरीके के बावजूद, विभाजन प्राथमिकता और मार्गनिर्देशन पैरामीटरों में अंतर है।
1. पूर्वापेक्षित ट्यूटोरियल
संबंधित ज्ञान समझने के लिए कृपया निम्न अध्याय पढ़ें:
- रैबिटएमक्यू बेसिक्स
- रैबिटएमक्यू पब्लिश/सब्सक्राइब पैटर्न
- रैबिटएमक्यू PHP क्विक स्टार्ट (अनिवार्य, क्योंकि आगामी अध्यायों में कोड को इतिहासात्मक करने के बजाय केवल मुख्य कोड स्निपेट दिखाएंगे)
2. फैनआउट एक्सचेंज की परिभाषा करना
चैनल के exchange_declare कार्य के माध्यम से एक्सचेंज की परिभाषा करें।
$channel->exchange_declare(
'tizi365.fanout', // एक्सचेंज का नाम, यह अद्वितीय होना चाहिए और दोहराया नहीं जा सकता
'fanout', // एक्सचेंज प्रकार
false,
false, // क्या हमें सटीक करना चाहिए
false
);
ध्यान दें: संदेश उत्पादक और उपभोक्ताओं दोनों को एक्सचेंज की आवश्यकता होती है।
3. संदेश भेजना
हम संदेश को एक्सचेंज को भेजते हैं, और एक्सचेंज मार्गनिर्देशन नियमों के आधार पर संदेशों को संबंधित क्यू को वितरित करता है।
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
// रैबिटएमक्यू कनेक्शन बनाएं
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
// चैनल बनाएं
$channel = $connection->channel();
// एक्सचेंज की पुष्टि करें
$channel->exchange_declare(
'tizi365.fanout', // एक्सचेंज का नाम, यह अद्वितीय होना चाहिए और दोहराया नहीं जा सकता
'fanout', // एक्सचेंज प्रकार
false,
false, // क्या हमें सटीक करना चाहिए
false
);
// संदेश ऑब्ज
### 4.1. कतार की परिभाषा और एक्सचेंज को बाइंड करना
कतार संदेश को कंज्यूम करने के लिए, आपको पहले एक कतार की परिभाषा करनी होगी और फिर उस कतार को लक्षित एक्सचेंज से बाइंड करना होगा।
नीचे, एक कतार की परिभाषा की गई है और उसे एक विशिष्ट एक्सचेंज से बाइंड किया गया है।
```php
// एक कतार की परिभाषा करें, अगर कतार का नाम खाली है, तो स्वचालित रूप से एक अद्वितीय आईडी उत्पन्न होता है, और कतार का नाम लौटाया जाता है
list($queue_name, ,) = $channel->queue_declare(
"", // कतार का नाम, दोहराया न जाने दें, अगर खाली है, तो स्वचालित रूप से एक अद्वितीय आईडी उत्पन्न होता है, जिससे यह एक अनामक कतार बन जाता है
false,
false, // क्या स्थिर रखना है
true,
false
);
// कतार को निशिचित एक्सचेंज से बाइंड करें
$channel->queue_bind(
$queue_name, // कतार का नाम
'tizi365.fanout' // एक्सचेंज का नाम
);
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
// एक रैबिटएमक्यू कनेक्शन बनाएं
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
// एक चैनल बनाएं
$channel = $connection->channel();
// एक एक्सचेंज को घोषित करें
$channel->exchange_declare(
'tizi365.fanout', // एक्सचेंज का नाम, यह अद्वितीय होना चाहिए और दोहराया न जाने दें
'fanout', // एक्सचेंज प्रकार
false,
false, // क्या स्थिर रखना है
false
);
// एक कतार घोषित करें
list($queue_name, ,) = $channel->queue_declare(
"", // कतार का नाम, दोहराया न जाने दें, अगर खाली है तो एक अद्वितीय आईडी उत्पन्न होता है, फिर यह एक अनामक कतार होता है
false,
false, // क्या स्थिर रखना है
true,
false
);
// कतार को निशिचित एक्सचेंज से बाइंड करें
$channel->queue_bind(
$queue_name, // कतार का नाम
'tizi365.fanout' // एक्सचेंज का नाम
);
echo " [*] संदेश के लिए प्रतीक्षा कर रहा है। बाहर निकलने के लिए CTRL+C दबाएं\n";
// संदेश प्रसंस्करण कार्य की परिभाषा करें (यहां एक अनामक फ़ंक्शन का उपयोग किया गया है)
$callback = function ($msg) {
// संदेश प्रसंस्करण तर्क
echo ' [x] ', $msg->body, "\n";
};
// एक उपभोक्ता बनाएं
$channel->basic_consume(
$queue_name, // कतार का नाम, किस कतार को कंज्यूम करना है
'', // उपभोक्ता टैग, ध्यान न दें, तो स्वचालित रूप से एक अद्वितीय आईडी उत्पन्न होता है
false,
true, // क्या स्वचालित रूप से संदेश को स्वीकार करना है, अर्थात बताएं कि रैबिटएमक्यू को संदेश सफलतापूर्वक प्रसंस्कृत किया गया है।
false,
false,
$callback // संदेश प्रसंस्करण कार्य
);
// यदि चैनल बंद नहीं किया गया है, तो प्रक्रिया को ब्लॉक करके इसे बंद होने से रोकें
while ($channel->is_open()) {
$channel->wait();
}
// संसाधनों को रिलीज़ करें
$channel->close();
$connection->close();