Bu sayfa, Asynq'ta mesaj aracı olarak Redis Cluster'ı nasıl kullanacağınızı tanıtır.

Redis Cluster Kullanmanın Avantajları

Redis Cluster kullanarak aşağıdaki avantajları elde edebilirsiniz:

  • Verileri birden çok Redis düğümü arasında kolayca shard etme
  • Belirli düğüm başarısızlıklarında kullanılabilirliği sürdürme
  • Otomatik olarak failover'ı gerçekleştirme

Genel Bakış

Küme Kuyruk Şeması

Asynq, kuyruklara dayalı olarak verileri shard eder. Yukarıdaki diyagramda, 3 master ve 3 slave içeren bir Redis Cluster ve 4 kuyruk (q1, q2, q3, q4) bulunmaktadır.

  • Master1 (ve replikası, Slave1) q1 ve q2'yi barındırır.
  • Master2 (ve replikası, Slave2) q3'ü barındırır.
  • Master3 (ve replikası, Slave3) q4'ü barındırır.

Görevleri kuyruklara göndermek için asynq.Client kullanırken, Queue seçeneğini kullanarak kuyruğu belirtebilirsiniz. Kuyruklardan görevleri çeken asynq.Server(ler) tarafından gönderilen görevler işlenir.

Öğretici

Bu bölümde, Asynq için mesaj aracı olarak Redis Cluster'ı nasıl kullanacağımızı tanıtacağız. 7000-7005 portlarında çalışan 6 Redis örneğinden oluşan bir kümeniz olduğunu varsayıyoruz. Aşağıda redis.conf dosyasının bir örneği bulunmaktadır:

port 7000
cluster-enabled yes
cluster-config-file nodes.conf
cluster-node-timeout 5000
appendonly yes

Daha sonra, iki adet ikili dosya oluşturacağız: istemci ve işçi.

go mod init asynq-redis-cluster-quickstart
mkdir client worker
touch client/client.go worker/worker.go

client.go dosyasında, yeni bir asynq.Client oluşturacak ve RedisClusterClientOpt geçerek Redis Cluster'a nasıl bağlanacağını belirteceğiz.

client := asynq.NewClient(asynq.RedisClusterClientOpt{
    Addrs: []string{":7000", ":7001", ":7002", ":7003", ":7004", ":7005"},
})

İstemciyi elde ettikten sonra görevler oluşturacak ve bunları üç farklı kuyruğa yerleştireceğiz:

  • bildirimler
  • web kancaları
  • resimler
// client.go

package main

import (
    "fmt"
    "log"

    "github.com/hibiken/asynq"
)

// Kuyruk isimleri
const (
    QueueNotifications = "notifications"
    QueueWebhooks      = "webhooks"
    QueueImages        = "images"
)

func main() {
    client := asynq.NewClient(asynq.RedisClusterClientOpt{
        Addrs: []string{":7000", ":7001", ":7002", ":7003", ":7004", ":7005"},
    })
    defer client.Close()

    // "notifications:email" adında bir görev oluşturup bunu "notifications" kuyruğuna yerleştirin.
    task := asynq.NewTask("notifications:email", map[string]interface{}{"to": 123, "from": 456})
    res, err := client.Enqueue(task, asynq.Queue(QueueNotifications))
    if err != nil {
        log.Fatal(err)
    }
    fmt.Printf("Başarıyla yerleştirildi: %+v\n", res)

    // "webhooks:sync" adında bir görev oluşturup bunu "webhooks" kuyruğuna yerleştirin.
    task = asynq.NewTask("webhooks:sync", map[string]interface{}{"data": 123})
    res, err = client.Enqueue(task, asynq.Queue(QueueWebhooks))
    if err != nil {
        log.Fatal(err)
    }
    fmt.Printf("Başarıyla yerleştirildi: %+v\n", res)

    // "images:resize" adında bir görev oluşturup bunu "images" kuyruğuna yerleştirin.
    task = asynq.NewTask("images:resize", map[string]interface{}{"src": "some/path/to/image"})
    res, err = client.Enqueue(task, asynq.Queue(QueueImages))
    if err != nil {
        log.Fatal(err)
    }
    fmt.Printf("Başarıyla yerleştirildi: %+v\n", res)
}

Bu programı çalıştırarak üç görevi kuyruklara yerleştirelim.

go run client/client.go

Şimdi, işleyiciye geçerek bu üç görevi işlemek için işçi tarafına geçelim. worker.go dosyasında, bu üç kuyruktan görevleri tüketmek için bir asynq.Server oluşturacağız. Benzer şekilde, RedisClusterClientOpt kullanarak Redis Cluster'a bağlanacağız.

// worker.go

package main

import (
	"context"
	"fmt"
	"log"

	"github.com/hibiken/asynq"
)

func main() {
	redisConnOpt := asynq.RedisClusterClientOpt{Addrs: []string{":7000", ":7001", ":7002", ":7003", ":7004", ":7005"}}
	srv := asynq.NewServer(redisConnOpt, asynq.Config{
		Concurrency: 20,
		// Burada her kuyruk için aynı önceliği ayarlayın
		Queues: map[string]int{
			"notifications": 1,
			"webhooks":      1,
			"images":        1,
		},
	})
	mux := asynq.NewServeMux()
	mux.HandleFunc("notifications:email", handleEmailTask)
	mux.HandleFunc("webhooks:sync", handleWebhookSyncTask)
	mux.HandleFunc("images:resize", handleImageResizeTask)
	if err := srv.Run(mux); err != nil {
		log.Fatalf("Sunucuyu başlatma başarısız: %v", err)
	}
}

func handleEmailTask(ctx context.Context, t *asynq.Task) error {
	to, err := t.Payload.GetInt("to")
	if err != nil {
		return err
	}
	from, err := t.Payload.GetInt("from")
	if err != nil {
		return err
	}
	fmt.Printf("%d'dan %d'ye bir e-posta gönderiliyor\n", from, to)
	return nil
}

func handleWebhookSyncTask(ctx context.Context, t *asynq.Task) error {
	data, err := t.Payload.GetInt("data")
	if err != nil {
		return err
	}
	fmt.Printf("Webhook görevi işleniyor: %d\n", data)
	return nil
}

func handleImageResizeTask(ctx context.Context, t *asynq.Task) error {
	src, err := t.Payload.GetString("src")
	if err != nil {
		return err
	}
	fmt.Printf("Resim boyutlandırılıyor: %s\n", src)
	return nil
}

Bu işçi sunucusunu önceden oluşturduğumuz üç görevi işlemek için çalıştıralım.

go run worker/worker.go

Her bir işleyiciden yazdırılan mesajları görebilmelisiniz.

Genel Bakışda belirtildiği gibi, Asynq kuyruklara dayalı olarak verileri parçalar. Aynı kuyruğa eklenen tüm görevler aynı Redis düğümüne aittir. Peki, hangi Redis düğümü hangi kuyruğa sahip?

Bu soruyu cevaplamak için CLI'ı kullanabiliriz.

asynq queue ls --cluster

Bu komut, kuyrukların bir listesini ve aşağıdakileri yazdırır:

  • Kuyruğun ait olduğu küme düğümü
  • Kuyruğun harita yapıldığı küme hash yuvası

Çıktı aşağı yukarı şöyle görünebilir:

Kuyruk          Küme KeySlot     Küme Düğümler
-----          ---------------  -------------
images         9450              [{d54231bccd6c1765ea15caf95a41c67b10b91e58 127.0.0.1:7001} {70a7d4569eac28eed577ee91863703ffab98d2e0 127.0.0.1:7005}]
webhooks       4418              [{d58959f6057ad0911d92d86d1d16dc2242e9ec48 127.0.0.1:7004} {e2fb9f1296a8d3a49818e0f9be3bfd74fdc052ea 127.0.0.1:7000}]
notifications  16340             [{c738a8a98c5f5f9161e9563fa739f9c8191b7f1a 127.0.0.1:7002} {18cdaa0712191d74656f08017371df41eeaad5fa 127.0.0.1:7003}]