PHP RabbitMQ Routing Mode (Direct Mode)
In the routing mode of RabbitMQ, the type of exchange used is “direct”. The main difference from the publish-subscribe mode is that the direct exchange delivers messages to queues whose routing parameters completely match. The architecture is as shown in the following graph:
Note: Regardless of the work mode used in RabbitMQ, the difference lies in the type of exchange used and the routing parameters.
1. Preparatory Tutorial
Please read the following sections first to understand the relevant knowledge:
- RabbitMQ Basic Concepts
- RabbitMQ Routing Mode Principles
- Quick Start for PHP with RabbitMQ (required, as subsequent sections will not repeat the code, only display key code)
- PHP RabbitMQ Publish-Subscribe Pattern Section (required, as the code structure is almost the same, except for the type of exchange and the routing parameters)
2. Define Direct Exchange
// Declare the exchange
$channel->exchange_declare(
'tizi365.direct', // Unique exchange name
'direct', // Exchange type
false,
false, // Whether to persist
false
);
Note: Both message producers and consumers need an exchange.
3. Sending Messages
We send messages to the exchange, and the exchange delivers messages to the corresponding queue based on the routing rules.
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
// Create a RabbitMQ connection
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
// Create a channel
$channel = $connection->channel();
// Declare the exchange
$channel->exchange_declare(
'tizi365.direct', // Unique exchange name
'direct', // Exchange type
false,
false, // Whether to persist
false
);
// Message object, message content is the parameter
$msg = new AMQPMessage("hello tizi365.com");
// Pay attention to the third parameter, routing parameter
$channel->basic_publish(
$msg, // Message object
'tizi365.direct', // Exchange name
"tizi365" // Routing parameter, can be arbitrarily defined as needed
);
echo ' [x] Sent ', $msg->getBody(), "\n";
// Release resources
$channel->close();
$connection->close();
Note: The third parameter in the
basic_publish
method is a key parameter.
4. Receiving Messages
4.1. Define Queue & Bind Exchange
To consume queue messages, you need to first define a queue, and then bind the queue to the target exchange.
// Declare an anonymous queue
list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);
// Bind the queue to the specified exchange
$channel->queue_bind(
$queue_name, // Queue name
'tizi365.fanout', // Exchange name
"tizi365" // Bind routing parameter, bind 'tizi365' here
);
Note: According to the direct exchange rules, if the routing parameter carried when sending a message matches the routing parameter set when binding the queue to the exchange, the message will be delivered to this queue.
```php
<?php
requireonce _DIR . ‘/vendor/autoload.php’;
use PhpAmqpLib\Connection\AMQPStreamConnection;
// Create a rabbitmq connection
$connection = new AMQPStreamConnection(‘localhost’, 5672, ‘guest’, ‘guest’);
// Create a Channel
$channel = $connection->channel();
// Declare an exchange
$channel->exchange_declare(
‘tizi365.direct’, // Exchange name, needs to be unique and cannot be repeated
‘direct’, // Exchange type
false,
false, // Whether it is durable
false
);
// Declare an anonymous queue
list($queue_name, ,) = $channel->queue_declare(“”, false, false, true, false);
// Bind the queue to a specific exchange
$channel->queue_bind(
$queue_name, // Queue name
‘tizi365.fanout’, // Exchange name
“tizi365” // Binding routing parameter, binds ‘tizi365’ here
);
echo “ [*] Waiting for message. To exit press CTRL+C\n”;
// Define the message handling function (using an anonymous function here)
$callback = function ($msg) {
// Message handling logic
echo ‘ [x] ‘, $msg->body, “\n”;
};
// Create a consumer
$channel->basic_consume(
$queue_name, // Queue name, the name of the queue to be consumed
‘’, // Consumer tag, ignored, a unique ID is generated automatically
false,
true, // Whether to automatically acknowledge the message, i.e. to automatically inform rabbitmq that the message has been successfully processed.
false,
false,
$callback // Message handling function
);
// Block the process until the channel is not closed, to prevent the process from exiting
while ($channel->is_open()) {
$channel->wait();
}
// Release resources
$channel->close();
$connection->close();
```