โหมดการจัดทำเส้นทางของ RabbitMQ โหมดเดียว (Direct Mode)
ในโหมดการจัดทำเส้นทางของ RabbitMQ ใช้ประเภทของแลกเปลี่ยนคือ "direct" ความแตกต่างหลักจากโหมดการส่งข้อมูลแบบเผยแพร่-สับสนคือแลกเปลี่ยนเดียวจัดส่งข้อมูลไปยังคิวที่มีพารามิเตอร์การเปลี่ยนเส้นทางตรงกันอย่างสมบูรณ์ โครงสร้างมีดังภาพต่อไปนี้:
หมายเหตุ: ไม่ว่าจะใช้โหมดการทำงานใดใน RabbitMQ ความแตกต่างอยู่ที่ประเภทของแลกเปลี่ยนที่ใช้และพารามิเตอร์การเปลี่ยนเส้นทาง
1. บทแนะนำเบื้องต้น
โปรดอ่านส่วนต่อไปนี้ก่อนเพื่อที่จะเข้าใจความรู้ที่เกี่ยวข้อง:
- แนวคิดพื้นฐานของ RabbitMQ
- หลักการของโหมดการจัดทำเส้นทางของ RabbitMQ
- การเริ่มต้นด่วนสำหรับ PHP กับ RabbitMQ (จำเป็น, เนื่องจากส่วนต่อไปจะไม่ทำซ้ำโค้ด แต่แสดงโค้ดระดับสำคัญเท่านั้น)
- ส่วนแบบเผยแพร่-สับสนของ RabbitMQ สำหรับ PHP (จำเป็น, เนื่องจากโครงสร้างของโค้ดเกือบเหมือนกัน นอกจากประเภทของแลกเปลี่ยนและพารามิเตอร์การเปลี่ยนเส้นทาง)
2. นิยามแลกเปลี่ยนเดียว
// ประกาศแลกเปลี่ยน
$channel->exchange_declare(
'tizi365.direct', // ชื่อแลกเปลี่ยนที่ไม่ซ้ำกัน
'direct', // ประเภทของแลกเปลี่ยน
false,
false, // ว่าจะรักษาไว้
false
);
หมายเหตุ: ทั้งผู้ผลิตข้อมูลและผู้บริโภคข้อมูลต้องมีแลกเปลี่ยน
3. การส่งข้อมูล
เราส่งข้อมูลไปยังแลกเปลี่ยน และแลกเปลี่ยนจัดส่งข้อมูลไปยังคิวที่เกี่ยวข้องโดยขึ้นอยู่กับกฎเส้นทาง
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
// สร้างการเชื่อมต่อกับ RabbitMQ
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
// สร้างช่อง
$channel = $connection->channel();
// ประกาศแลกเปลี่ยน
$channel->exchange_declare(
'tizi365.direct', // ชื่อแลกเปลี่ยนที่ไม่ซ้ำกัน
'direct', // ประเภทของแลกเปลี่ยน
false,
false, // ว่าจะรักษาไว้
false
);
// ออบเจกต์ของข้อมูล, พารามิเตอร์ของข้อมูลเป็นเนื้อข้อความ
$msg = new AMQPMessage("สวัสดี tizi365.com");
// สังเกตพารามิเตอร์ที่สาม, พารามิเตอร์การเปลี่ยนเส้นทาง
$channel->basic_publish(
$msg, // ออบเจกต์ของข้อมูล
'tizi365.direct', // ชื่อแลกเปลี่ยน
"tizi365" // พารามิเตอร์การเปลี่ยนเส้นทาง, สามารถกำหนดได้ตามต้องการ
);
echo ' [x] ส่ง ', $msg->getBody(), "\n";
// ปิดทรัพยากร
$channel->close();
$connection->close();
หมายเหตุ: พารามิเตอร์ที่สามในวิธี
basic_publish
เป็นพารามิเตอร์ตัวบกพร่อ
4. การรับข้อมูล
4.1. กำหนดคิวและผูก Exchange
เพื่อการรับข้อความจากคิว คุณต้องกำหนดคิวก่อน แล้วจึงผูกคิวกับ Exchange เป้าหมาย
// กำหนดคิวแบบไม่ระบุชื่อ
list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);
// ผูกคิวกับ Exchange ที่กำหนด
$channel->queue_bind(
$queue_name, // ชื่อคิว
'tizi365.fanout', // ชื่อ Exchange
"tizi365" // พารามิเตอร์การผูก, ที่นี่ผูก 'tizi365'
);
หมายเหตุ: ตามกฎของ direct exchange หากพารามิเตอร์การส่งข้อควมตรงกับพารามิเตอร์การผูกคิวกับ Exchange ข้อควมจะถูกส่งไปยังคิวนี้.
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
// สร้างการเชื่อมต่อ rabbitmq
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
// สร้างช่อง
$channel = $connection->channel();
// กำหนด Exchange
$channel->exchange_declare(
'tizi365.direct', // ชื่อ Exchange, ต้องเป็นชื่อที่ไม่ซ้ำและไม่สามารถซ้ำใช้ได้
'direct', // ประเภทของ Exchange
false,
false, // ว่าที่แข็งแรง
false
);
// กำหนดคิวแบบไม่ระบุชื่อ
list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);
// ผูกคิวกับ Exchange ที่กำหนด
$channel->queue_bind(
$queue_name, // ชื่อคิว
'tizi365.fanout', // ชื่อ Exchange
"tizi365" // พารามิเตอร์การผูก, ที่นี่ผูก 'tizi365'
);
echo " [*] รอรับข้อความ หากต้องการออกให้กด CTRL+C\n";
// กำหนดฟังก์ชันการจัดการข้อความ (ใช้ฟังก์ชันไม่ระบุชื่อที่นี่)
$callback = function ($msg) {
// การจัดการข้อความ
echo ' [x] ', $msg->body, "\n";
};
// สร้างผู้บริโภค
$channel->basic_consume(
$queue_name, // ชื่อคิว, ชื่อคิวที่ต้องการบริโภค
'', // แท็กของผู้บริโภค, จะถูกละเว้น, สร้าง ID ที่ไม่ซ้ำกันโดยอัตโนมัติ
false,
true, // ใช้การยอมรับข้อความโดยอัตโนมัติหรือไม่, ก็คือให้ rabbitmq ทราบโดยอัตโนมัติว่าข้อความถูกประมวลผลเรียบร้อยแล้ว
false,
false,
$callback // ฟังก์ชันการจัดการข้อความ
);
// บล็อกกระบวนการจนกว่าช่องจะถูกปิด, เพื่อป้องกันไม่ให้กระบวนการออก
while ($channel->is_open()) {
$channel->wait();
}
// ปล่อยทรัพยากร
$channel->close();
$connection->close();