PHP RabbitMQ Publish/Subscribe 패턴 (또는 팬아웃 패턴)
RabbitMQ의 팬아웃 모드는 프로듀서가 보낸 메시지가 다른 큐에있는 여러 소비자에 의해 처리되는 패턴으로, 아키텍처 다이어그램에 표시된 것과 같습니다.
팬아웃 교환은 모든 바인딩된 큐로 메시지를 전달할 수 있습니다.
참고: RabbitMQ 작업 모드를 사용하든 사용하지 않든, 차이는 사용하는 교환 유형과 라우팅 매개변수에 있습니다.
1. 사전 요구 사항 튜토리얼
관련 지식을 이해하기 위해 다음 챕터를 읽어보세요:
- RabbitMQ Basics
- RabbitMQ Publish/Subscribe Pattern
- RabbitMQ PHP Quick Start (후속 챕터에서 코드를 반복하지 않고 핵심 코드 조각만 표시하므로 필수입니다.)
2. 팬아웃 교환 정의
채널의 exchange_declare 함수를 사용하여 교환을 정의합니다.
$channel->exchange_declare(
'tizi365.fanout', // 교환 이름, 고유해야 하며 중복될 수 없음
'fanout', // 교환 유형
false,
false, // 지속 여부
false
);
참고: 메시지 프로듀서와 컨슈머 모두 교환을 필요로 합니다.
3. 메시지 보내기
교환에 메시지를 보내고, 교환은 라우팅 규칙에 따라 메시지를 해당 큐로 전달합니다.
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
// RabbitMQ 연결 생성
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
// 채널 생성
$channel = $connection->channel();
// 교환 선언
$channel->exchange_declare(
'tizi365.fanout', // 교환 이름, 고유해야 하며 중복될 수 없음
'fanout', // 교환 유형
false,
false, // 지속 여부
false
);
// 메시지 객체, 메시지 내용을 매개변수로 사용
$msg = new AMQPMessage("안녕 tizi365.com");
$channel->basic_publish(
$msg, // 메시지 객체
'tizi365.fanout' // 교환 이름
);
echo ' [x] Sent ', $msg->getBody(), "\n";
// 리소스 해제
$channel->close();
$connection->close();
4. 메시지 받기
4.1. 큐 정의 및 익스체인지에 연결
메시지를 소비하기 위해서는 먼저 큐를 정의하고 해당 큐를 대상 익스체인지에 바인딩해야 합니다. 아래는 특정 익스체인지에 정의된 큐를 바인딩하는 과정입니다.
// 큐를 선언합니다. 큐 이름이 비어 있으면 자동으로 고유 ID가 생성되며 익명 큐로 만들어집니다
list($queue_name, ,) = $channel->queue_declare(
"", // 큐 이름, 중복될 수 없음, 비어 있으면 자동으로 고유 ID를 생성하여 익명 큐로 만듬
false,
false, // 보존할지 여부
true,
false
);
// 큐를 지정한 익스체인지에 바인딩합니다
$channel->queue_bind(
$queue_name, // 큐 이름
'tizi365.fanout' // 익스체인지 이름
);
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
// RabbitMQ 연결을 생성합니다
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
// 채널을 생성합니다
$channel = $connection->channel();
// 익스체인지를 선언합니다
$channel->exchange_declare(
'tizi365.fanout', // 익스체인지 이름, 고유해야 하며 중복될 수 없음
'fanout', // 익스체인지 타입
false,
false, // 보존할지 여부
false
);
// 큐를 선언합니다
list($queue_name, ,) = $channel->queue_declare(
"", // 큐 이름, 중복될 수 없음, 비어 있으면 고유 ID를 생성하여 익명 큐로 만듬
false,
false, // 보존할지 여부
true,
false
);
// 큐를 지정한 익스체인지에 바인딩합니다
$channel->queue_bind(
$queue_name, // 큐 이름
'tizi365.fanout' // 익스체인지 이름
);
echo " [*] 메시지를 기다리는 중입니다. 종료하려면 CTRL+C를 눌러주세요\n";
// 메시지 처리 함수를 정의합니다 (여기서는 익명 함수를 사용합니다)
$callback = function ($msg) {
// 메시지 처리 로직
echo ' [x] ', $msg->body, "\n";
};
// 소비자를 만듭니다
$channel->basic_consume(
$queue_name, // 소비할 큐 이름
'', // 소비자 태그, 무시하면 고유 ID를 생성함
false,
true, // 메시지를 자동으로 확인할지 여부, 즉, RabbitMQ에게 메시지가 성공적으로 처리되었음을 알림
false,
false,
$callback // 메시지 처리 함수
);
// 채널이 닫히지 않은 상태라면 프로세스가 종료되지 않도록 계속해서 블록합니다
while ($channel->is_open()) {
$channel->wait();
}
// 리소스를 해제합니다
$channel->close();
$connection->close();