1.Pre-tutorial
Please read the following sections first to understand the relevant knowledge
- RabbitMQ Basic Concepts
- Principle of RabbitMQ Topic Pattern
- PHP RabbitMQ Quick Start Chapter (required, because subsequent chapters will not duplicate the code, only show the key code)
- PHP RabbitMQ Publish/Subscribe Pattern Chapter (required, because the code is almost the same, only the exchange type and routing parameters are different)
2.Define Topic Exchange
// Declare the exchange
$channel->exchange_declare(
'tizi365.topic', // Exchange name, needs to be unique, cannot be repeated
'topic', // Exchange type
false,
false, // Whether it is durable
false
);
Note: Both message producers and consumers need exchanges.
3.Send Message
We send messages to the exchange, and the exchange delivers the messages to the corresponding queue based on the routing rules.
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
// Create a rabbitmq connection
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
// Create Channel
$channel = $connection->channel();
// Declare the exchange
$channel->exchange_declare(
'tizi365.topic', // Exchange name, needs to be unique, cannot be repeated
'topic', // Exchange type
false,
false, // Whether it is durable
false
);
// Message object, the parameter is the message content
$msg = new AMQPMessage("hello tizi365.com");
// Send message
// Note the third parameter, the routing parameter
$channel->basic_publish(
$msg, // Message object
'tizi365.topic', // Exchange name
"www.tizi365.com" // Routing parameter, can be arbitrarily defined according to requirements
);
echo ' [x] Sent ', $msg->getBody(), "\n";
// Release resources
$channel->close();
$connection->close();
4.Receive Message
4.1.Define Queue & Bind Exchange
To consume queue messages, you need to define a queue first, and then bind the queue to the target exchange.
// Declare an anonymous queue
list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);
// Bind the queue to the specified exchange
$channel->queue_bind(
$queue_name, // Queue name
'tizi365.topic', // Exchange name
"*.tizi365.com" // Bind routing parameter, here using wildcard * (asterisk), which can match a single word
Note: The set routing parameters all use the * (asterisk) wildcard, which can match a single word. If changed to # (hash), it can match multiple words.
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
// Create a rabbitmq connection
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
// Create a Channel
$channel = $connection->channel();
// Declare an exchange
$channel->exchange_declare(
'tizi365.topic', // Exchange name, needs to be unique and cannot be repeated
'topic', // Exchange type
false,
false, // Whether it is durable
false
);
// Declare an anonymous queue
list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);
// Bind the queue to the specified exchange
$channel->queue_bind(
$queue_name, // Queue name
'tizi365.topic', // Exchange name
"*.tizi365.com" // Binding routing key, using wildcard * here, which can match a single word
);
echo " [*] Waiting for message. To exit press CTRL+C\n";
// Define the message handling function (using an anonymous function here)
$callback = function ($msg) {
// Message processing logic
echo ' [x] ', $msg->body, "\n";
};
// Create a consumer
$channel->basic_consume(
$queue_name, // Queue name to consume from
'', // Consumer tag, if ignored, a unique ID will be generated automatically
false,
true, // Whether to automatically acknowledge the message, i.e., automatically tell rabbitmq that the message has been processed successfully
false,
false,
$callback // Message handling function
);
// If the channel is not closed, keep blocking the process to avoid process exit
while ($channel->is_open()) {
$channel->wait();
}
// Release resources
$channel->close();
$connection->close();
Because the routing parameter set when binding the exchange is *.tizi365.com, it matches the routing parameter of the message (www.tizi365.com), so the message can be received.