PHP RabbitMQ 라우팅 모드 (다이렉트 모드)
RabbitMQ의 라우팅 모드에서 사용되는 교환 타입은 "direct"입니다. 발행-구독 모드와의 주된 차이점은 다이렉트 교환은 라우팅 파라미터가 완전히 일치하는 대기열로 메시지를 전송한다는 것입니다. 아키텍처는 다음 그래프와 같습니다:
참고: RabbitMQ에서 사용하는 작업 모드에 관계없이 교환 타입 및 라우팅 파라미터에서 차이가 있습니다.
1. 사전 교육 자료
관련 지식을 이해하기 위해 먼저 다음 섹션을 읽어주십시오:
- RabbitMQ 기본 개념
- RabbitMQ 라우팅 모드 원리
- PHP와 RabbitMQ를 위한 빠른 시작 (필수입니다. 이후 섹션에서는 코드를 반복하지 않고 주요 코드만 표시합니다)
- PHP RabbitMQ 발행-구독 패턴 섹션 (교환 타입 및 라우팅 파라미터를 제외하고 코드 구조는 거의 동일합니다)
2. 다이렉트 교환 정의
// 교환을 선언
$channel->exchange_declare(
'tizi365.direct', // 고유한 교환 이름
'direct', // 교환 타입
false,
false, // 지속 여부
false
);
참고: 메시지 생성자와 소비자 모두 교환을 필요로 합니다.
3. 메시지 전송
우리는 메시지를 교환에 보내고, 교환은 라우팅 규칙에 따라 해당 대기열로 메시지를 전달합니다.
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
// RabbitMQ 연결 생성
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
// 채널 생성
$channel = $connection->channel();
// 교환을 선언
$channel->exchange_declare(
'tizi365.direct', // 고유한 교환 이름
'direct', // 교환 타입
false,
false, // 지속 여부
false
);
// 메시지 객체, 메시지 내용이 파라미터로 전달됩니다
$msg = new AMQPMessage("안녕하세요 tizi365.com");
// 세번째 파라미터에 주의하세요, 라우팅 파라미터입니다
$channel->basic_publish(
$msg, // 메시지 객체
'tizi365.direct', // 교환 이름
"tizi365" // 라우팅 파라미터, 필요에 따라 임의로 정의 가능합니다
);
echo ' [x] 전송됨 ', $msg->getBody(), "\n";
// 리소스 해제
$channel->close();
$connection->close();
참고:
basic_publish
메서드의 세번째 파라미터는 중요한 파라미터입니다.
4. 메시지 수신
4.1. Queue 및 Exchange 바인딩 정의
큐 메시지를 소비하려면 먼저 큐를 정의하고, 그런 다음 해당 큐를 대상 익스체인지에 바인딩해야 합니다.
// 익명 큐 선언
list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);
// 큐를 지정된 익스체인지에 바인딩
$channel->queue_bind(
$queue_name, // 큐 이름
'tizi365.fanout', // 익스체인지 이름
"tizi365" // 바인딩 라우팅 매개변수, 여기서 'tizi365'를 바인딩합니다
);
참고: 직접 익스체인지 규칙에 따르면 메시지를 보낼 때 전달되는 라우팅 매개변수가 익스체인지에 큐를 바인딩할 때 설정된 라우팅 매개변수와 일치하는 경우 메시지가 해당 큐에 전달됩니다.
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
// 래빗엠큐 연결 생성
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
// 채널 생성
$channel = $connection->channel();
// 익스체인지 선언
$channel->exchange_declare(
'tizi365.direct', // 익스체인지 이름, 고유해야 하며 반복해서 사용될 수 없음
'direct', // 익스체인지 유형
false,
false, // 내구성 유무
false
);
// 익명 큐 선언
list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);
// 큐를 특정 익스체인지에 바인딩
$channel->queue_bind(
$queue_name, // 큐 이름
'tizi365.fanout', // 익스체인지 이름
"tizi365" // 바인딩 라우팅 매개변수, 여기서 'tizi365'를 바인딩함
);
echo " [*] 메시지 수신 대기 중. 종료하려면 CTRL+C를 누르세요\n";
// 메시지 처리 함수 정의 (여기서는 익명 함수 사용)
$callback = function ($msg) {
// 메시지 처리 로직
echo ' [x] ', $msg->body, "\n";
};
// 소비자 생성
$channel->basic_consume(
$queue_name, // 소비될 큐 이름
'', // 소비자 태그, 무시됨, 자동으로 고유한 ID가 생성됨
false,
true, // 메시지를 자동으로 확인할지 여부, 즉, 래빗엠큐에 메시지가 성공적으로 처리되었음을 자동으로 알릴지 여부
false,
false,
$callback // 메시지 처리 함수
);
// 채널이 닫힐 때까지 프로세스를 차단하여 프로세스가 종료되는 것을 방지
while ($channel->is_open()) {
$channel->wait();
}
// 리소스 해제
$channel->close();
$connection->close();