PHP RabbitMQ Routing Mode (Direct Mode)

In the routing mode of RabbitMQ, the type of exchange used is “direct”. The main difference from the publish-subscribe mode is that the direct exchange delivers messages to queues whose routing parameters completely match. The architecture is as shown in the following graph:

RabbitMQ Direct Mode

Note: Regardless of the work mode used in RabbitMQ, the difference lies in the type of exchange used and the routing parameters.

1. Preparatory Tutorial

Please read the following sections first to understand the relevant knowledge:

2. Define Direct Exchange

// Declare the exchange
$channel->exchange_declare(
    'tizi365.direct', // Unique exchange name
    'direct', // Exchange type
    false,
    false, // Whether to persist
    false
);

Note: Both message producers and consumers need an exchange.

3. Sending Messages

We send messages to the exchange, and the exchange delivers messages to the corresponding queue based on the routing rules.

<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

// Create a RabbitMQ connection
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
// Create a channel
$channel = $connection->channel();

// Declare the exchange
$channel->exchange_declare(
    'tizi365.direct', // Unique exchange name
    'direct', // Exchange type
    false,
    false, // Whether to persist
    false
);

// Message object, message content is the parameter
$msg = new AMQPMessage("hello tizi365.com");

// Pay attention to the third parameter, routing parameter
$channel->basic_publish(
    $msg, // Message object
    'tizi365.direct', // Exchange name
    "tizi365" // Routing parameter, can be arbitrarily defined as needed
);

echo ' [x] Sent ', $msg->getBody(), "\n";

// Release resources
$channel->close();
$connection->close();

Note: The third parameter in the basic_publish method is a key parameter.

4. Receiving Messages

4.1. Define Queue & Bind Exchange

To consume queue messages, you need to first define a queue, and then bind the queue to the target exchange.

// Declare an anonymous queue
list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);

// Bind the queue to the specified exchange
$channel->queue_bind(
    $queue_name, // Queue name
    'tizi365.fanout', // Exchange name
    "tizi365" // Bind routing parameter, bind 'tizi365' here
);

Note: According to the direct exchange rules, if the routing parameter carried when sending a message matches the routing parameter set when binding the queue to the exchange, the message will be delivered to this queue.
```php
<?php

requireonce _DIR . ‘/vendor/autoload.php’;
use PhpAmqpLib\Connection\AMQPStreamConnection;

// Create a rabbitmq connection
$connection = new AMQPStreamConnection(‘localhost’, 5672, ‘guest’, ‘guest’);
// Create a Channel
$channel = $connection->channel();

// Declare an exchange
$channel->exchange_declare(
‘tizi365.direct’, // Exchange name, needs to be unique and cannot be repeated
‘direct’, // Exchange type
false,
false, // Whether it is durable
false
);

// Declare an anonymous queue
list($queue_name, ,) = $channel->queue_declare(“”, false, false, true, false);

// Bind the queue to a specific exchange
$channel->queue_bind(
$queue_name, // Queue name
‘tizi365.fanout’, // Exchange name
“tizi365” // Binding routing parameter, binds ‘tizi365’ here
);

echo “ [*] Waiting for message. To exit press CTRL+C\n”;

// Define the message handling function (using an anonymous function here)
$callback = function ($msg) {
// Message handling logic
echo ‘ [x] ‘, $msg->body, “\n”;
};

// Create a consumer
$channel->basic_consume(
$queue_name, // Queue name, the name of the queue to be consumed
‘’, // Consumer tag, ignored, a unique ID is generated automatically
false,
true, // Whether to automatically acknowledge the message, i.e. to automatically inform rabbitmq that the message has been successfully processed.
false,
false,
$callback // Message handling function
);

// Block the process until the channel is not closed, to prevent the process from exiting
while ($channel->is_open()) {
$channel->wait();
}

// Release resources
$channel->close();
$connection->close();
```