اس صفحہ میں Redis Cluster کو Asynq میں ایک پیغام بروکر کے طور پر استعمال کرنے کا طریقہ بیان کیا گیا ہے۔

Redis Cluster استعمال کرنے کے فوائد

Redis Cluster کا استعمال کرکے، آپ مندرجہ ذیل فوائد حاصل کرسکتے ہیں:

  • مختلف Redis نوڈز میں ڈیٹا کو آسانی سے شارڈ کرنا
  • کچھ نوڈز کے خراب ہونے کی صورت میں دستیابی برقرار رکھنا
  • خود بخود فیل اوور کرنا

کلفٹ

Cluster Queue Illustration

Asynq چھوںدیتا کی قیمتوں کے بناپر ڈیٹا شارڈ کرتا ہے۔ مندرجہ بالا تصویر میں ہمارے پاس 6 مثالیں (3 ماسٹر اور 3 غلام) اور 4 قوم کی ایک ریڈس کلسٹر ہے۔

  • ماسٹر1 (اور اس کا نقل، غلام1) q1 اور q2 کو ہوسٹ کرتا ہے۔
  • ماسٹر2 (اور اس کا نقل، غلام2) q3 کو ہوسٹ کرتا ہے۔
  • ماسٹر3 (اور اس کا نقل، غلام3) q4 کو ہوسٹ کرتا ہے۔

جب آپ asynq.Client استعمال کرکے ٹاسکس کو انکیو کرتے ہیں، تو آپ Queue آپشن کا استعمال کرکے قوم کو مخصوص کرسکتے ہیں۔ انکیو کیے گئے ٹاسکس asynq.Server کے ذریعے استعمال کیے جائیں گے جو یہ قومز سے ٹاسکس کھینچنے والے ہیں۔

ہدایت نامہ

اس حصہ میں ہم آپکو بتائیں گے کہ آپ کس طرح Asynq کے لیے پیغام بروکر کے طور پر Redis کلسٹر استعمال کر سکتے ہیں۔ ہم یہ فرض کریں گے کہ آپ کے پاس پورٹس 7000-7005 پر چل رہے 6 ریڈس مثالی کلسٹر ہے۔ نیچے redis.conf فائل کی مثال دی گئی ہے۔

port 7000
cluster-enabled yes
cluster-config-file nodes.conf
cluster-node-timeout 5000
appendonly yes

اگلے، ہم دو بائنری فائلز بنائیں گے: client اور worker۔

go mod init asynq-redis-cluster-quickstart
mkdir client worker
touch client/client.go worker/worker.go

client.go میں، ہم ایک نیا asynq.Client بنائیں گے اور یہ بتائیں گے کہ ریڈس کلسٹر سے کنکٹ ہونے کا طریقہ کیسے ہوگا، RedisClusterClientOpt کو منتقل کر کے۔

client := asynq.NewClient(asynq.RedisClusterClientOpt{
    Addrs: []string{":7000", ":7001", ":7002", ":7003", ":7004", ":7005"},
})

جیسے ہی ہمارے پاس کلائنٹ ہوتا ہے، ہم تسکس بنائیں گے اور انہیں تین مختلف قیوزز میں انکیو کریں گے:

  • notifications
  • webhooks
  • images
// client.go

package main

import (
    "fmt"
    "log"

    "github.com/hibiken/asynq"
)

// قیوز نام
const (
    QueueNotifications = "notifications"
    QueueWebhooks      = "webhooks"
    QueueImages        = "images"
)

func main() {
    client := asynq.NewClient(asynq.RedisClusterClientOpt{
        Addrs: []string{":7000", ":7001", ":7002", ":7003", ":7004", ":7005"},
    })
    defer client.Close()

    // "notifications:email" تسک بنائیں اور اسے "notifications" قیو میں انکیو کریں۔
    task := asynq.NewTask("notifications:email", map[string]interface{}{"to": 123, "from": 456})
    res, err := client.Enqueue(task, asynq.Queue(QueueNotifications))
    if err != nil {
        log.Fatal(err)
    }
    fmt.Printf("انکیو کردیا گیا: %+v\n", res)

    // "webhooks:sync" تسک بنائیں اور اسے "webhooks" قیو میں انکیو کریں۔
    task = asynq.NewTask("webhooks:sync", map[string]interface{}{"data": 123})
    res, err = client.Enqueue(task, asynq.Queue(QueueWebhooks))
    if err != nil {
        log.Fatal(err)
    }
    fmt.Printf("انکیو کردیا گیا: %+v\n", res)

    // "images:resize" تسک بنائیں اور اسے "images" قیو میں انکیو کریں۔
    task = asynq.NewTask("images:resize", map[string]interface{}{"src": "some/path/to/image"})
    res, err = client.Enqueue(task, asynq.Queue(QueueImages))
    if err != nil {
        log.Fatal(err)
    }
    fmt.Printf("انکیو کردیا گیا: %+v\n", res)
}

ہم اس پروگرام کو چلائیں تاکہ تین تسکس کو قیوز میں انکیو کیا جا سکے۔

go run client/client.go

اب، چلیں ورکر میں جا کر ان تین تسکس کو پروسیس کرنے۔ worker.go میں، ہم ایک asynq.Server بنائیں گے تاکہ ہم ان تین قیوزز سے ٹاسکس کو کنسوم کر سکیں۔ اسی طرح، ہم RedisClusterClientOpt کا استعمال کرتے ہوئے ہمارے ریڈس کلسٹر سے کنکٹ کریں گے۔

// worker.ur.go

package main

import (
	"context"
	"fmt"
	"log"

	"github.com/hibiken/asynq"
)

func main() {
	redisConnOpt := asynq.RedisClusterClientOpt{Addrs: []string{":7000", ":7001", ":7002", ":7003", ":7004", ":7005"}}
	srv := asynq.NewServer(redisConnOpt, asynq.Config{
		Concurrency: 20,
		// یہاں ہر قطار کے لئے ایک ہی پرائیورٹی سیٹ کریں
		Queues: map[string]int{
			"notifications": 1,
			"webhooks":      1,
			"images":        1,
		},
	})
	mux := asynq.NewServeMux()
	mux.HandleFunc("notifications:email", handleEmailTask)
	mux.HandleFunc("webhooks:sync", handleWebhookSyncTask)
	mux.HandleFunc("images:resize", handleImageResizeTask)
	if err := srv.Run(mux); err != nil {
		log.Fatalf("سرور کو شروع کرنے میں ناکام رہا: %v", err)
	}
}

func handleEmailTask(ctx context.Context, t *asynq.Task) error {
	to, err := t.Payload.GetInt("to")
	if err != nil {
		return err
	}
	from, err := t.Payload.GetInt("from")
	if err != nil {
		return err
	}
	fmt.Printf("%d سے %d تک ایک ای میل بھیجنا\n", from, to)
	return nil
}

func handleWebhookSyncTask(ctx context.Context, t *asynq.Task) error {
	data, err := t.Payload.GetInt("data")
	if err != nil {
		return err
	}
	fmt.Printf("Webhook ٹاسک کا سنبھال: %d\n", data)
	return nil
}

func handleImageResizeTask(ctx context.Context, t *asynq.Task) error {
	src, err := t.Payload.GetString("src")
	if err != nil {
		return err
	}
	fmt.Printf("تصویر کا سائز کم کرنا: %s\n", src)
	return nil
}