PHP RabbitMQ 라우팅 모드 (다이렉트 모드)

RabbitMQ의 라우팅 모드에서 사용되는 교환 타입은 "direct"입니다. 발행-구독 모드와의 주된 차이점은 다이렉트 교환은 라우팅 파라미터가 완전히 일치하는 대기열로 메시지를 전송한다는 것입니다. 아키텍처는 다음 그래프와 같습니다:

RabbitMQ Direct Mode

참고: RabbitMQ에서 사용하는 작업 모드에 관계없이 교환 타입 및 라우팅 파라미터에서 차이가 있습니다.

1. 사전 교육 자료

관련 지식을 이해하기 위해 먼저 다음 섹션을 읽어주십시오:

2. 다이렉트 교환 정의

// 교환을 선언
$channel->exchange_declare(
    'tizi365.direct', // 고유한 교환 이름
    'direct', // 교환 타입
    false,
    false, // 지속 여부
    false
);

참고: 메시지 생성자와 소비자 모두 교환을 필요로 합니다.

3. 메시지 전송

우리는 메시지를 교환에 보내고, 교환은 라우팅 규칙에 따라 해당 대기열로 메시지를 전달합니다.

<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

// RabbitMQ 연결 생성
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
// 채널 생성
$channel = $connection->channel();

// 교환을 선언
$channel->exchange_declare(
    'tizi365.direct', // 고유한 교환 이름
    'direct', // 교환 타입
    false,
    false, // 지속 여부
    false
);

// 메시지 객체, 메시지 내용이 파라미터로 전달됩니다
$msg = new AMQPMessage("안녕하세요 tizi365.com");

// 세번째 파라미터에 주의하세요, 라우팅 파라미터입니다
$channel->basic_publish(
    $msg, // 메시지 객체
    'tizi365.direct', // 교환 이름
    "tizi365" // 라우팅 파라미터, 필요에 따라 임의로 정의 가능합니다
);

echo ' [x] 전송됨 ', $msg->getBody(), "\n";

// 리소스 해제
$channel->close();
$connection->close();

참고: basic_publish 메서드의 세번째 파라미터는 중요한 파라미터입니다.

4. 메시지 수신

4.1. Queue 및 Exchange 바인딩 정의

큐 메시지를 소비하려면 먼저 큐를 정의하고, 그런 다음 해당 큐를 대상 익스체인지에 바인딩해야 합니다.

// 익명 큐 선언
list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);

// 큐를 지정된 익스체인지에 바인딩
$channel->queue_bind(
    $queue_name, // 큐 이름
    'tizi365.fanout', // 익스체인지 이름
    "tizi365" // 바인딩 라우팅 매개변수, 여기서 'tizi365'를 바인딩합니다
);

참고: 직접 익스체인지 규칙에 따르면 메시지를 보낼 때 전달되는 라우팅 매개변수가 익스체인지에 큐를 바인딩할 때 설정된 라우팅 매개변수와 일치하는 경우 메시지가 해당 큐에 전달됩니다.

<?php

require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;

// 래빗엠큐 연결 생성
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
// 채널 생성
$channel = $connection->channel();

// 익스체인지 선언
$channel->exchange_declare(
    'tizi365.direct', // 익스체인지 이름, 고유해야 하며 반복해서 사용될 수 없음
    'direct', // 익스체인지 유형
    false,
    false, // 내구성 유무
    false
);

// 익명 큐 선언
list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);

// 큐를 특정 익스체인지에 바인딩
$channel->queue_bind(
    $queue_name, // 큐 이름
    'tizi365.fanout', // 익스체인지 이름
    "tizi365" // 바인딩 라우팅 매개변수, 여기서 'tizi365'를 바인딩함
);

echo " [*] 메시지 수신 대기 중. 종료하려면 CTRL+C를 누르세요\n";

// 메시지 처리 함수 정의 (여기서는 익명 함수 사용)
$callback = function ($msg) {
    // 메시지 처리 로직
    echo ' [x] ', $msg->body, "\n";
};

// 소비자 생성
$channel->basic_consume(
    $queue_name, // 소비될 큐 이름
    '', // 소비자 태그, 무시됨, 자동으로 고유한 ID가 생성됨
    false,
    true, // 메시지를 자동으로 확인할지 여부, 즉, 래빗엠큐에 메시지가 성공적으로 처리되었음을 자동으로 알릴지 여부
    false,
    false,
    $callback // 메시지 처리 함수
);

// 채널이 닫힐 때까지 프로세스를 차단하여 프로세스가 종료되는 것을 방지
while ($channel->is_open()) {
    $channel->wait();
}

// 리소스 해제
$channel->close();
$connection->close();