Golang RabbitMQ Yayın/Abone Deseni (Yayın Modu, Yayın Toplama Modu)
RabbitMQ'da yayın/abone deseni, bir üretici tarafından gönderilen bir mesajın birden fazla tüketici tarafından işlenmesi anlamına gelir.
Açıklama:
- P, üreticiyi temsil eder, C1 ve C2 tüketicileri temsil eder, kırmızı kuyrukları gösterir, ve X değişimi temsil eder.
- Değişim, değişime bağlı kuyruklara mesajları iletmekten sorumludur.
- Aynı değişime bağlı birden fazla kuyruk tanımlanabilir.
- Her kuyruk bir veya daha fazla tüketiciye sahip olabilir.
Not: RabbitMQ hakkında bilgi sahibi değilseniz lütfen öncelikle RabbitMQ Temel Kavramlar bölümünü okuyun.
1. Bağımlılık Paketi Kurulumu
go get github.com/streadway/amqp
2. Mesaj Gönderme
Mesaj üreticisinin mesaj gönderme adımları aşağıda gösterilmektedir.
2.1. RabbitMQ Sunucusuna Bağlanma
// RabbitMQ Sunucusuna Bağlan
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
defer conn.Close()
Bağlantı adresi açıklaması:
amqp://kullanıcıadı:şifre@RabbitMQAdresi:Port/
2.2. Kanal Oluşturma
Çoğu işlem kanal üzerinde gerçekleştirilir.
ch, err := conn.Channel()
defer ch.Close()
2.3. Değişim Bildirimi
Mesajlar önce değişime gönderilir. Değişim, stratejisine bağlı olarak mesajları kuyruklara iletti.
err = ch.ExchangeDeclare(
"tizi365", // Değişim adı
"fanout", // Değişim türü, burada yayın/abone deseni için yayın toplama türü kullanılıyor
true, // Dayanıklı
false, // Otomatik silinme
false, // İç
false, // Bekleme yok
nil, // Argümanlar
)
2.4. Mesaj Yayınlama
// Mesaj içeriği
body := "Merhaba Tizi365.com!"
// Mesajı yayınla
err = ch.Publish(
"tizi365", // Değişim (önceki bildirimle eşleşen değişim adı)
"", // Yönlendirme anahtarı, yayın toplama türü değişim için yönlendirme anahtarı otomatik olarak yok sayıldığından sağlanması gerekli değil
false, // Zorunlu
false, // Hemen
amqp.Publishing {
ContentType: "text/plain", // Mesaj içerik türü, burada düz metin
Body: []byte(body), // Mesaj içeriği
})
2.5. Mesaj Yayınlama Kodunu Tamamlama
package main
import (
"log"
"github.com/streadway/amqp"
)
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}
func main() {
// Rabbitmq'ya bağlan
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "RabbitMQ'ya bağlanılamadı")
defer conn.Close()
// Bir kanal oluştur
ch, err := conn.Channel()
failOnError(err, "Kanal açılamadı")
defer ch.Close()
// Değişimi bildir
err = ch.ExchangeDeclare(
"tizi365", // Değişim adı
"fanout", // Değişim türü, yayın/abone modu için fanout kullanılır
true, // Dayanıklı
false, // Otomatik silinme
false, // İç
false, // Bekleme yok
nil, // Argümanlar
)
failOnError(err, "Değişim bildirilemedi")
// Mesaj içeriği
body := "Merhaba Tizi365.com!"
// Mesajı gönder
err = ch.Publish(
"tizi365", // Değişim (yukarıdaki bildirimle eşleşen)
"", // Yönlendirme anahtarı, fanout türü değişimlerde yönlendirme anahtarı otomatik olarak yok sayılır
false, // Zorunlu
false, // Hemen
amqp.Publishing {
ContentType: "text/plain", // Mesaj içerik türü, burada düz metin
Body: []byte(body), // Mesaj içeriği
})
log.Printf("Gönderilen içerik %s", body)
}
3. Mesajları Al
Mesaj alımı için ilk üç adım—RabbitMQ'ya bağlanma, bir kanal oluşturma ve değişim bildirimi—mesaj gönderme adımlarıyla aynıdır. İlgili bölümlere (2.1, 2.2 ve 2.3) bakınız.
3.1. Kuyruk Bildirimi
İşlem yapılacak kuyruğu bildirin
q, err := ch.QueueDeclare(
"", // Kuyruk adı, belirtilmezse rastgele bir ad oluşturulur
false, // Kalıcı
false, // Kullanılmadığında silin
true, // Özel
false, // Bekleme yok
nil, // Argümanlar
)
3.2. Kuyruğu Değişimle Bağlama
Mesajları almak için kuyruğun değişimle bağlanması gerekir
err = ch.QueueBind(
q.Name, // Kuyruk adı
"", // Yönlendirme anahtarı, fanout tipi değişimler için yönlendirme anahtarı otomatik olarak yok sayılır
"tizi365", // Değişim adı, mesaj gönderen tarafından tanımlananla eşleşmeli
false,
nil)
Not: Gerçek uygulamalarda, değişimle iletilen mesajları almak için aynı değişime bağlı N kuyruk tanımlayabiliriz. Bu, yayın/abone deseninin yansıtıldığı yerdir
3.3. Tüketici Oluşturma
msgs, err := ch.Consume(
q.Name, // Yukarıdaki kuyruk adına referans
"", // Tüketici adı, belirtilmezse otomatik olarak oluşturulur
true, // Mesajın otomatik olarak işlendiğini doğrulama
false, // Özel
false, // Yerel yok
false, // Bekleme yok
nil, // Argümanlar
)
// Mesajları işlemek için döngü
for d := range msgs {
log.Printf("Mesaj alındı=%s", d.Body)
}
3.4. Tüketici Kodu Tamamlama
package main
import (
"log"
"github.com/streadway/amqp"
)
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}
func main() {
// RabbitMQ'ya bağlan
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "RabbitMQ'ya bağlanılamadı")
defer conn.Close()
// Kanal oluştur, genellikle her tüketicinin başına bir tane
ch, err := conn.Channel()
failOnError(err, "Kanal açılamadı")
defer ch.Close()
// Değişimi bildir
err = ch.ExchangeDeclare(
"tizi365", // Değişim adı, mesaj gönderen tarafından kullanılanla eşleşmeli
"fanout", // Değişim tipi
true, // Kalıcı
false, // Otomatik silinme
false, // İç
false, // Bekleme yok
nil, // Argümanlar
)
failOnError(err, "Değişim bildirilemedi")
// İşlem yapılacak kuyruğu bildir
q, err := ch.QueueDeclare(
"", // Kuyruk adı, boşsa rastgele bir ad oluşturulacak
false, // Kalıcı
false, // Kullanılmadığında silinme
true, // Özel
false, // Bekleme yok
nil, // Argümanlar
)
failOnError(err, "Bir kuyruk bildirilemedi")
// Kuyruğu belirtilen değişime bağla
err = ch.QueueBind(
q.Name, // Kuyruk adı
"", // Yönlendirme anahtarı, fanout değişimleri için yok sayılır
"tizi365", // Değişim adı, mesaj gönderen tarafından tanımlananla eşleşmeli
false,
nil)
failOnError(err, "Kuyruğa bağlama başarısız")
// Tüketici oluştur
msgs, err := ch.Consume(
q.Name, // Önceki kuyruk adına referans
"", // Tüketici adı, boşsa otomatik oluşturulur
true, // Otomatik onay
false, // Özel
false, // Yerel yok
false, // Bekleme yok
nil, // Argümanlar
)
failOnError(err, "Tüketicinin kaydı başarısız")
// Kuyruktan döngü içinde mesajları tüket
for d := range msgs {
log.Printf("Mesaj alındı: %s", d.Body)
}
}
3.5. Birden Fazla Tüketici
Work mode bölümüne başvurun ve sadece gorutinler kullanarak birden fazla tüketiciyi başlatın.