PHP RabbitMQ Yayın/Abone Örüntüsü (Fanout Örüntüsü olarak da bilinir)
RabbitMQ'da Fanout modu, bir üretici tarafından gönderilen mesajların, aşağıdaki mimari diyagramında gösterildiği gibi farklı kuyruklardaki birden fazla tüketici tarafından işlenmesi için bir örüntüdür.
Fanout değişimi, tüm bağlı kuyruklara mesajları iletebilir.
Not: Kullanılan RabbitMQ çalışma modundan bağımsız olarak, farklılık değişim türünde ve yönlendirme parametrelerinde yer almaktadır.
1. Önkoşul Öğretici
İlgili bilgileri anlamak için lütfen aşağıdaki bölümleri okuyun:
- RabbitMQ Temelleri
- RabbitMQ Yayın/Abone Örüntüsü
- RabbitMQ PHP Hızlı Başlangıç (Zorunlu, çünkü sonraki bölümler sadece anahtar kod parçalarını gösterir, kodları tekrar etmez)
2. Fanout Değişimi Tanımlama
Değişimi kanalın exchange_declare fonksiyonu aracılığıyla tanımlayın.
$channel->exchange_declare(
'tizi365.fanout', // Exchange adı, benzersiz olmalı ve çoğaltılamaz
'fanout', // Exchange türü
false,
false, // Kalıcı olup olmadığı
false
);
Not: Mesaj üreticileri ve tüketicileri değişime ihtiyaç duyar.
3. Mesaj Gönderme
Mesajları değişime göndeririz ve değişim, yönlendirme kurallarına göre mesajları ilgili kuyruklara iletir.
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
// RabbitMQ bağlantısı oluştur
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
// Bir Kanal oluştur
$channel = $connection->channel();
// Değişimi bildir
$channel->exchange_declare(
'tizi365.fanout', // Exchange adı, benzersiz olmalı ve çoğaltılamaz
'fanout', // Exchange türü
false,
false, // Kalıcı olup olmadığı
false
);
// Mesaj nesnesi, mesaj içeriği parametre olarak
$msg = new AMQPMessage("merhaba tizi365.com");
$channel->basic_publish(
$msg, // Mesaj nesnesi
'tizi365.fanout' // Exchange adı
);
echo ' [x] Gönderildi ', $msg->getBody(), "\n";
// Kaynakları serbest bırak
$channel->close();
$connection->close();
4. Mesaj Alımı
4.1. Kuyruk Tanımlama ve Değişime Bağlama
Kuyruk mesajlarını tüketmek için önce bir kuyruk tanımlamanız ve daha sonra kuyruğu hedef değişime bağlamanız gerekmektedir. Aşağıda, belirli bir değişime tanımlanan bir kuyruk ve bağlanmıştır.
// Kuyruğu bildir, kuyruk adı boş ise otomatik olarak benzersiz bir kimlik oluşturulur ve kuyruk adı geri döner
list($queue_name, ,) = $channel->queue_declare(
"", // Kuyruk adı, tekrarlanmasına izin verilmez, boş ise otomatik olarak benzersiz bir kimlik oluşturulur ve anonim bir kuyruk haline gelir
false,
false, // Kalıcı olup olmadığı
true,
false
);
// Kuyruğu belirli bir değişime bağla
$channel->queue_bind(
$queue_name, // Kuyruk adı
'tizi365.fanout' // Değişim adı
);
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
// Bir rabbitmq bağlantısı oluştur
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
// Bir kanal oluştur
$channel = $connection->channel();
// Bir değişim bildir
$channel->exchange_declare(
'tizi365.fanout', // Değişim adı, benzersiz olmalı ve tekrarlanmamalı
'fanout', // Değişim türü
false,
false, // Kalıcı olup olmadığı
false
);
// Bir kuyruk bildir
list($queue_name, ,) = $channel->queue_declare(
"", // Kuyruk adı, tekrarlanmasına izin verilmez, boş ise otomatik olarak benzersiz bir kimlik oluşturulur ve anonim kuyruk olur
false,
false, // Kalıcı olup olmadığı
true,
false
);
// Kuyruğu belirli bir değişime bağla
$channel->queue_bind(
$queue_name, // Kuyruk adı
'tizi365.fanout' // Değişim adı
);
echo " [*] Mesaj bekleniyor. Çıkmak için CTRL+C'ye basın\n";
// Mesaj işleme fonksiyonunu tanımla (burada anonim bir fonksiyon kullanılıyor)
$callback = function ($msg) {
// Mesaj işleme mantığı
echo ' [x] ', $msg->body, "\n";
};
// Bir tüketici oluştur
$channel->basic_consume(
$queue_name, // Tüketilecek kuyruk adı
'', // Tüketici etiketi, ihmal et, sonra benzersiz bir kimlik oluştur
false,
true, // Mesajı otomatik olarak kabul edip etmeyeceği, yani rabbitmq'ya mesajın başarıyla işlendiğini belirt
false,
false,
$callback // Mesaj işleme fonksiyonu
);
// Kanal kapatılmamışsa, sürekli olarak işlemi engellemek ve çıkışını önlemek için bloklamaya devam et
while ($channel->is_open()) {
$channel->wait();
}
// Kaynakları serbest bırak
$channel->close();
$connection->close();