PHP RabbitMQ Yönlendirme Modu (Doğrudan Mod)

RabbitMQ'nun yönlendirme modunda kullanılan exchange türü "direct" tir. Yayın-abonelik modundan ana farkı, doğrudan exchange'in iletileri tam olarak eşleşen yönlendirme parametrelerine sahip kuyruklara ileti göndermesidir. Mimarisi aşağıdaki grafikte gösterildiği gibidir:

RabbitMQ Direct Mode

Not: RabbitMQ'da kullanılan çalışma modundan bağımsız olarak, fark exchange türünde ve yönlendirme parametrelerindedir.

1. Hazırlayıcı Öğretici

İlgili bilgileri anlamanız için öncelikle aşağıdaki bölümleri okuyunuz:

2. Doğrudan Exchange Tanımlama

// Exchange'i tanımla
$channel->exchange_declare(
    'tizi365.direct', // Benzersiz exchange adı
    'direct', // Exchange türü
    false,
    false, // Kalıcı olup olmadığı
    false
);

Not: İleti üreticileri ve tüketicilerinin her ikisi de bir exchange'e ihtiyaç duyar.

3. İleti Gönderme

Exchange'e ileti göndeririz ve exchange, yönlendirme kurallarına göre ilgili kuyruklara ileti iletilir.

<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

// RabbitMQ bağlantısı oluştur
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
// Bir kanal oluştur
$channel = $connection->channel();

// Exchange'i tanımla
$channel->exchange_declare(
    'tizi365.direct', // Benzersiz exchange adı
    'direct', // Exchange türü
    false,
    false, // Kalıcı olup olmadığı
    false
);

// İleti objesi, ileti içeriği parametredir
$msg = new AMQPMessage("merhaba tizi365.com");

// Üçüncü parametreye dikkat, yönlendirme parametresi
$channel->basic_publish(
    $msg, // İleti objesi
    'tizi365.direct', // Exchange adı
    "tizi365" // Yönlendirme parametresi, ihtiyaca göre istediğiniz gibi tanımlanabilir
);

echo ' [x] Gönderildi ', $msg->getBody(), "\n";

// Kaynakları serbest bırak
$channel->close();
$connection->close();

Not: basic_publish methodundaki üçüncü parametre bir anahtar parametresidir.

4. İleti Alma

4.1. Kuyruğu Tanımla ve Değişim'e Bağla

Kuyruk mesajlarını tüketmek için önce bir kuyruk tanımlamanız ve daha sonra kuyruğu hedef değişime bağlamanız gerekmektedir.

// Anonim bir kuyruk tanımla
list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);

// Kuyruğu belirtilen değişime bağla
$channel->queue_bind(
    $queue_name, // Kuyruk adı
    'tizi365.fanout', // Değişim adı
    "tizi365" // Bağlama yönlendirme parametresi, burada 'tizi365' bağlanacak
);

Not: Doğrudan değişim kurallarına göre, bir mesaj gönderilirken taşınan yönlendirme parametresi, kuyruğun değişime bağlanırken ayarlanan yönlendirme parametresiyle eşleşirse, mesaj bu kuyruğa iletilir.

<?php

require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;

// Bir rabbitmq bağlantısı oluştur
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
// Bir Kanal oluştur
$channel = $connection->channel();

// Bir değişim tanımla
$channel->exchange_declare(
    'tizi365.direct', // Değişim adı, benzersiz olmalı ve tekrarlanmamalıdır
    'direct', // Değişim türü
    false,
    false, // Kalıcı mı olduğu
    false
);

// Anonim bir kuyruk tanımla
list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);

// Kuyruğu belirli bir değişime bağla
$channel->queue_bind(
    $queue_name, // Kuyruk adı
    'tizi365.fanout', // Değişim adı
    "tizi365" // Bağlama yönlendirme parametresi, burada 'tizi365' bağlanacak
);

echo " [*] Mesaj bekleniyor. Çıkmak için CTRL+C'ye basın\n";

// Mesaj işleme fonksiyonunu tanımla (burada anonim bir fonksiyon kullanılıyor)
$callback = function ($msg) {
    // Mesaj işleme mantığı
    echo ' [x] ', $msg->body, "\n";
};

// Bir tüketici oluştur
$channel->basic_consume(
    $queue_name, // Tüketilecek kuyruk adı
    '', // Tüketici etiketi, görmezden gelinir, benzersiz bir kimlik otomatik olarak oluşturulur
    false,
    true, // Mesajı otomatik olarak tanımayı, yani rabbitmq'ya mesajın başarılı bir şekilde işlendiğini otomatik olarak bildirmeyi etkinleştir
    false,
    false,
    $callback // Mesaj işleme fonksiyonu
);

// Kanal kapanana kadar süreci bloke et, sürecin çıkmasını önlemek için
while ($channel->is_open()) {
    $channel->wait();
}

// Kaynakları serbest bırak
$channel->close();
$connection->close();