PHP RabbitMQ Publish/Subscribe 패턴 (또는 팬아웃 패턴)

RabbitMQ의 팬아웃 모드는 프로듀서가 보낸 메시지가 다른 큐에있는 여러 소비자에 의해 처리되는 패턴으로, 아키텍처 다이어그램에 표시된 것과 같습니다.

RabbitMQ 작업 모드

팬아웃 교환은 모든 바인딩된 큐로 메시지를 전달할 수 있습니다.

참고: RabbitMQ 작업 모드를 사용하든 사용하지 않든, 차이는 사용하는 교환 유형과 라우팅 매개변수에 있습니다.

1. 사전 요구 사항 튜토리얼

관련 지식을 이해하기 위해 다음 챕터를 읽어보세요:

2. 팬아웃 교환 정의

채널의 exchange_declare 함수를 사용하여 교환을 정의합니다.

$channel->exchange_declare(
    'tizi365.fanout', // 교환 이름, 고유해야 하며 중복될 수 없음
    'fanout', // 교환 유형
    false,
    false, // 지속 여부
    false
);

참고: 메시지 프로듀서와 컨슈머 모두 교환을 필요로 합니다.

3. 메시지 보내기

교환에 메시지를 보내고, 교환은 라우팅 규칙에 따라 메시지를 해당 큐로 전달합니다.

<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

// RabbitMQ 연결 생성
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
// 채널 생성
$channel = $connection->channel();

// 교환 선언
$channel->exchange_declare(
    'tizi365.fanout', // 교환 이름, 고유해야 하며 중복될 수 없음
    'fanout', // 교환 유형
    false,
    false, // 지속 여부
    false
);
// 메시지 객체, 메시지 내용을 매개변수로 사용
$msg = new AMQPMessage("안녕 tizi365.com");

$channel->basic_publish(
    $msg, // 메시지 객체
    'tizi365.fanout' // 교환 이름
);

echo ' [x] Sent ', $msg->getBody(), "\n";

// 리소스 해제
$channel->close();
$connection->close();

4. 메시지 받기

4.1. 큐 정의 및 익스체인지에 연결

메시지를 소비하기 위해서는 먼저 큐를 정의하고 해당 큐를 대상 익스체인지에 바인딩해야 합니다. 아래는 특정 익스체인지에 정의된 큐를 바인딩하는 과정입니다.

// 큐를 선언합니다. 큐 이름이 비어 있으면 자동으로 고유 ID가 생성되며 익명 큐로 만들어집니다
list($queue_name, ,) = $channel->queue_declare(
    "", // 큐 이름, 중복될 수 없음, 비어 있으면 자동으로 고유 ID를 생성하여 익명 큐로 만듬
    false,
    false, // 보존할지 여부
    true,
    false
);

// 큐를 지정한 익스체인지에 바인딩합니다
$channel->queue_bind(
    $queue_name, // 큐 이름
    'tizi365.fanout' // 익스체인지 이름
);
<?php 
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;

// RabbitMQ 연결을 생성합니다
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
// 채널을 생성합니다
$channel = $connection->channel();

// 익스체인지를 선언합니다
$channel->exchange_declare(
    'tizi365.fanout', // 익스체인지 이름, 고유해야 하며 중복될 수 없음
    'fanout', // 익스체인지 타입
    false,
    false, // 보존할지 여부
    false
);

// 큐를 선언합니다
list($queue_name, ,) = $channel->queue_declare(
    "", // 큐 이름, 중복될 수 없음, 비어 있으면 고유 ID를 생성하여 익명 큐로 만듬
    false,
    false, // 보존할지 여부
    true,
    false
);

// 큐를 지정한 익스체인지에 바인딩합니다
$channel->queue_bind(
    $queue_name, // 큐 이름
    'tizi365.fanout' // 익스체인지 이름
);

echo " [*] 메시지를 기다리는 중입니다. 종료하려면 CTRL+C를 눌러주세요\n";

// 메시지 처리 함수를 정의합니다 (여기서는 익명 함수를 사용합니다)
$callback = function ($msg) {
    // 메시지 처리 로직
    echo ' [x] ', $msg->body, "\n";
};

// 소비자를 만듭니다
$channel->basic_consume(
    $queue_name, // 소비할 큐 이름
    '', // 소비자 태그, 무시하면 고유 ID를 생성함
    false,
    true, // 메시지를 자동으로 확인할지 여부, 즉, RabbitMQ에게 메시지가 성공적으로 처리되었음을 알림
    false,
    false,
    $callback // 메시지 처리 함수
);

// 채널이 닫히지 않은 상태라면 프로세스가 종료되지 않도록 계속해서 블록합니다
while ($channel->is_open()) {
    $channel->wait();
}

// 리소스를 해제합니다
$channel->close();
$connection->close();