PHP RabbitMQ Publish/Subscribe Pattern (also known as Fanout Pattern)

The Fanout mode in RabbitMQ is a pattern where messages sent by a producer are processed by multiple consumers in different queues, as shown in the architecture diagram below.

RabbitMQ Work Mode

The Fanout exchange can forward messages to all bound queues.

Note: Regardless of the RabbitMQ working mode used, the difference lies in the type of exchange and routing parameters used.

1. Prerequisite Tutorial

Please read the following chapters to understand the relevant knowledge:

2. Defining a Fanout Exchange

Define the exchange through the channel’s exchange_declare function.

$channel->exchange_declare(
    'tizi365.fanout', // Exchange name, must be unique and cannot be duplicated
    'fanout', // Exchange type
    false,
    false, // Whether to persist
    false
);

Note: Both message producers and consumers require the exchange.

3. Sending Messages

We send messages to the exchange, and the exchange delivers the messages to the corresponding queues based on the routing rules.

<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

// Create a RabbitMQ connection
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
// Create a Channel
$channel = $connection->channel();

// Declare the exchange
$channel->exchange_declare(
    'tizi365.fanout', // Exchange name, must be unique and cannot be duplicated
    'fanout', // Exchange type
    false,
    false, // Whether to persist
    false
);
// Message object, with the message content as the parameter
$msg = new AMQPMessage("hello tizi365.com");

$channel->basic_publish(
    $msg, // Message object
    'tizi365.fanout' // Exchange name
);

echo ' [x] Sent ', $msg->getBody(), "\n";

// Release resources
$channel->close();
$connection->close();

4. Receiving Messages

4.1. Defining a Queue & Binding the Exchange

To consume queue messages, you need to first define a queue and then bind the queue to the target exchange.
Below, a queue is defined and bound to a specific exchange.

// Declare a queue, if the queue name is empty, a unique ID is generated automatically, and the queue name is returned
list($queue_name, ,) = $channel->queue_declare(
    "", // Queue name, not allowed to be duplicated, if empty, a unique ID is generated automatically, making it an anonymous queue
    false,
    false, // Whether to persist
    true,
    false
);

// Bind the queue to the specified exchange
$channel->queue_bind(
    $queue_name, // Queue name
    'tizi365.fanout' // Exchange name
);
<?php 
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;

// Create a rabbitmq connection
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
// Create a Channel
$channel = $connection->channel();

// Declare an exchange
$channel->exchange_declare(
    'tizi365.fanout', // Exchange name, needs to be unique and cannot be repeated
    'fanout', // Exchange type
    false,
    false, // Whether to persist
    false
);

// Declare a queue
list($queue_name, ,) = $channel->queue_declare(
    "", // Queue name, not allowed to be repeated, if empty then generate a unique ID, then it is an anonymous queue
    false,
    false, // Whether to persist
    true,
    false
);

// Bind the queue to the specified exchange
$channel->queue_bind(
    $queue_name, // Queue name
    'tizi365.fanout' // Exchange name
);

echo " [*] Waiting for message. To exit press CTRL+C\n";

// Define the message processing function (using an anonymous function here)
$callback = function ($msg) {
    // Message processing logic
    echo ' [x] ', $msg->body, "\n";
};

// Create a consumer
$channel->basic_consume(
    $queue_name, // Queue name, the queue to consume
    '', // Consumer tag, ignore, then generate a unique ID
    false,
    true, // Whether to automatically acknowledge the message, i.e. tell rabbitmq that the message has been successfully processed.
    false,
    false,
    $callback // Message processing function
);

// If the channel is not closed, keep blocking the process to prevent it from exiting
while ($channel->is_open()) {
    $channel->wait();
}

// Release resources
$channel->close();
$connection->close();