Golang RabbitMQ Yayın/Abone Deseni (Yayın Modu, Yayın Toplama Modu)

RabbitMQ'da yayın/abone deseni, bir üretici tarafından gönderilen bir mesajın birden fazla tüketici tarafından işlenmesi anlamına gelir.

Yayın Toplama Modu

Açıklama:

  • P, üreticiyi temsil eder, C1 ve C2 tüketicileri temsil eder, kırmızı kuyrukları gösterir, ve X değişimi temsil eder.
  • Değişim, değişime bağlı kuyruklara mesajları iletmekten sorumludur.
  • Aynı değişime bağlı birden fazla kuyruk tanımlanabilir.
  • Her kuyruk bir veya daha fazla tüketiciye sahip olabilir.

Not: RabbitMQ hakkında bilgi sahibi değilseniz lütfen öncelikle RabbitMQ Temel Kavramlar bölümünü okuyun.

1. Bağımlılık Paketi Kurulumu

go get github.com/streadway/amqp

2. Mesaj Gönderme

Mesaj üreticisinin mesaj gönderme adımları aşağıda gösterilmektedir.

2.1. RabbitMQ Sunucusuna Bağlanma

// RabbitMQ Sunucusuna Bağlan
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
defer conn.Close()

Bağlantı adresi açıklaması:

amqp://kullanıcıadı:şifre@RabbitMQAdresi:Port/

2.2. Kanal Oluşturma

Çoğu işlem kanal üzerinde gerçekleştirilir.

ch, err := conn.Channel()
defer ch.Close()

2.3. Değişim Bildirimi

Mesajlar önce değişime gönderilir. Değişim, stratejisine bağlı olarak mesajları kuyruklara iletti.

err = ch.ExchangeDeclare(
	"tizi365",   // Değişim adı
	"fanout", // Değişim türü, burada yayın/abone deseni için yayın toplama türü kullanılıyor
	true,     // Dayanıklı
	false,    // Otomatik silinme
	false,    // İç
	false,    // Bekleme yok
	nil,      // Argümanlar
)

2.4. Mesaj Yayınlama

// Mesaj içeriği
body := "Merhaba Tizi365.com!"

// Mesajı yayınla
err = ch.Publish(
  "tizi365",     // Değişim (önceki bildirimle eşleşen değişim adı)
  "", // Yönlendirme anahtarı, yayın toplama türü değişim için yönlendirme anahtarı otomatik olarak yok sayıldığından sağlanması gerekli değil
  false,  // Zorunlu
  false,  // Hemen
  amqp.Publishing {
    ContentType: "text/plain", // Mesaj içerik türü, burada düz metin
    Body:        []byte(body),  // Mesaj içeriği
  })

2.5. Mesaj Yayınlama Kodunu Tamamlama

package main

import (
	"log"
	"github.com/streadway/amqp"
)

func failOnError(err error, msg string) {
	if err != nil {
		log.Fatalf("%s: %s", msg, err)
	}
}

func main() {
	// Rabbitmq'ya bağlan
	conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
	failOnError(err, "RabbitMQ'ya bağlanılamadı")
	defer conn.Close()

	// Bir kanal oluştur
	ch, err := conn.Channel()
	failOnError(err, "Kanal açılamadı")
	defer ch.Close()

	// Değişimi bildir
	err = ch.ExchangeDeclare(
		"tizi365",   // Değişim adı
		"fanout", // Değişim türü, yayın/abone modu için fanout kullanılır
		true,     // Dayanıklı
		false,    // Otomatik silinme
		false,    // İç
		false,    // Bekleme yok
		nil,      // Argümanlar
	)
	failOnError(err, "Değişim bildirilemedi")

	// Mesaj içeriği
	body := "Merhaba Tizi365.com!"
	// Mesajı gönder
	err = ch.Publish(
		"tizi365",     // Değişim (yukarıdaki bildirimle eşleşen)
		"", // Yönlendirme anahtarı, fanout türü değişimlerde yönlendirme anahtarı otomatik olarak yok sayılır
		false,  // Zorunlu
		false,  // Hemen
		amqp.Publishing {
			ContentType: "text/plain", // Mesaj içerik türü, burada düz metin
			Body:        []byte(body),  // Mesaj içeriği
		})

	log.Printf("Gönderilen içerik %s", body)
}

3. Mesajları Al

Mesaj alımı için ilk üç adım—RabbitMQ'ya bağlanma, bir kanal oluşturma ve değişim bildirimi—mesaj gönderme adımlarıyla aynıdır. İlgili bölümlere (2.1, 2.2 ve 2.3) bakınız.

3.1. Kuyruk Bildirimi

İşlem yapılacak kuyruğu bildirin

q, err := ch.QueueDeclare(
		"",    // Kuyruk adı, belirtilmezse rastgele bir ad oluşturulur
		false, // Kalıcı
		false, // Kullanılmadığında silin
		true,  // Özel
		false, // Bekleme yok
		nil,   // Argümanlar
	)

3.2. Kuyruğu Değişimle Bağlama

Mesajları almak için kuyruğun değişimle bağlanması gerekir

err = ch.QueueBind(
		q.Name, // Kuyruk adı
		"",     // Yönlendirme anahtarı, fanout tipi değişimler için yönlendirme anahtarı otomatik olarak yok sayılır
		"tizi365", // Değişim adı, mesaj gönderen tarafından tanımlananla eşleşmeli
		false,
		nil)

Not: Gerçek uygulamalarda, değişimle iletilen mesajları almak için aynı değişime bağlı N kuyruk tanımlayabiliriz. Bu, yayın/abone deseninin yansıtıldığı yerdir

3.3. Tüketici Oluşturma

msgs, err := ch.Consume(
		q.Name, // Yukarıdaki kuyruk adına referans
		"",     // Tüketici adı, belirtilmezse otomatik olarak oluşturulur
		true,   // Mesajın otomatik olarak işlendiğini doğrulama
		false,  // Özel
		false,  // Yerel yok
		false,  // Bekleme yok
		nil,    // Argümanlar
	)
	
// Mesajları işlemek için döngü
for d := range msgs {
	log.Printf("Mesaj alındı=%s", d.Body)
}

3.4. Tüketici Kodu Tamamlama

package main

import (
	"log"

	"github.com/streadway/amqp"
)

func failOnError(err error, msg string) {
	if err != nil {
		log.Fatalf("%s: %s", msg, err)
	}
}

func main() {
	// RabbitMQ'ya bağlan
	conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
	failOnError(err, "RabbitMQ'ya bağlanılamadı")
	defer conn.Close()

	// Kanal oluştur, genellikle her tüketicinin başına bir tane
	ch, err := conn.Channel()
	failOnError(err, "Kanal açılamadı")
	defer ch.Close()

	// Değişimi bildir
	err = ch.ExchangeDeclare(
		"tizi365",   // Değişim adı, mesaj gönderen tarafından kullanılanla eşleşmeli
		"fanout", // Değişim tipi
		true,     // Kalıcı
		false,    // Otomatik silinme
		false,    // İç
		false,    // Bekleme yok
		nil,      // Argümanlar
	)
	failOnError(err, "Değişim bildirilemedi")

	// İşlem yapılacak kuyruğu bildir
	q, err := ch.QueueDeclare(
		"",    // Kuyruk adı, boşsa rastgele bir ad oluşturulacak
		false, // Kalıcı
		false, // Kullanılmadığında silinme
		true,  // Özel
		false, // Bekleme yok
		nil,   // Argümanlar
	)
	failOnError(err, "Bir kuyruk bildirilemedi")

	// Kuyruğu belirtilen değişime bağla
	err = ch.QueueBind(
		q.Name, // Kuyruk adı
		"",     // Yönlendirme anahtarı, fanout değişimleri için yok sayılır
		"tizi365", // Değişim adı, mesaj gönderen tarafından tanımlananla eşleşmeli
		false,
		nil)
	failOnError(err, "Kuyruğa bağlama başarısız")

	// Tüketici oluştur
	msgs, err := ch.Consume(
		q.Name, // Önceki kuyruk adına referans
		"",     // Tüketici adı, boşsa otomatik oluşturulur
		true,   // Otomatik onay
		false,  // Özel
		false,  // Yerel yok
		false,  // Bekleme yok
		nil,    // Argümanlar
	)
	failOnError(err, "Tüketicinin kaydı başarısız")

	// Kuyruktan döngü içinde mesajları tüket
	for d := range msgs {
		log.Printf("Mesaj alındı: %s", d.Body)
	}
}

3.5. Birden Fazla Tüketici

Work mode bölümüne başvurun ve sadece gorutinler kullanarak birden fazla tüketiciyi başlatın.