مزایای استفاده از کلاستر Redis

با استفاده از کلاستر Redis، می‌توانید به مزایای زیر دست یابید:

  • به راحتی داده‌ها را بین چندین گره Redis به اشتراک بگذارید
  • در صورت خرابی چندین گره، قابلیت دسترسی را حفظ کنید
  • به‌طور خودکار فیلوور را انجام دهید

مرور

تصویر نمایش صف کلاستر

Asynq اطلاعات را براساس صف‌ها شار می‌کند. در شکل بالا، ما یک کلاستر Redis با ۶ نمونه (3 مستر و 3 اسلیو) و ۴ صف (q1، q2، q3، q4) داریم.

  • مستر۱ (و نمونه تکراری آن، اسلیو۱) صف q1 و q2 را میزبانی می‌کند.
  • مستر۲ (و نمونه تکراری آن، اسلیو۲) صف q3 را میزبانی می‌کند.
  • مستر۳ (و نمونه تکراری آن، اسلیو۳) صف q4 را میزبانی می‌کند.

هنگام استفاده از asynq.Client برای قرار دادن وظایف در صف، می‌توانید صف را با استفاده از گزینه Queue مشخص کنید. وظایف قرار داده شده توسط asynq.Server توسط یک یا چند سرور از این صف‌ها خواهند بود.

آموزش

در این بخش، چگونگی استفاده از Redis Cluster به عنوان پیام‌رسان برای Asynq را معرفی خواهیم کرد. فرض می‌کنیم که شما یک خوشه 6 نمونه Redis را با پورت‌های 7000-7005 در حال اجرا دارید. پایین تر مثالی از فایل redis.conf آمده است:

port 7000
cluster-enabled yes
cluster-config-file nodes.conf
cluster-node-timeout 5000
appendonly yes

بعداً، دو فایل دودویی ایجاد خواهیم کرد: مشتری (client) و کارگر (worker).

go mod init asynq-redis-cluster-quickstart
mkdir client worker
touch client/client.go worker/worker.go

در client.go، یک asynq.Client جدید ایجاد می‌کنیم و نحوه اتصال به Redis Cluster را با استفاده از RedisClusterClientOpt مشخص می‌کنیم.

client := asynq.NewClient(asynq.RedisClusterClientOpt{
    Addrs: []string{":7000", ":7001", ":7002", ":7003", ":7004", ":7005"},
})

هنگامی که مشتری را داشته باشیم، وظایف را ایجاد و آن‌ها را در سه صف مختلف قرار می‌دهیم:

  • notifications
  • webhooks
  • images
// client.go

package main

import (
    "fmt"
    "log"

    "github.com/hibiken/asynq"
)

// نام‌های صف
const (
    QueueNotifications = "notifications"
    QueueWebhooks      = "webhooks"
    QueueImages        = "images"
)

func main() {
    client := asynq.NewClient(asynq.RedisClusterClientOpt{
        Addrs: []string{":7000", ":7001", ":7002", ":7003", ":7004", ":7005"},
    })
    defer client.Close()

    // ایجاد وظیفه "notifications:email" و قرار دادن آن در صف "notifications".
    task := asynq.NewTask("notifications:email", map[string]interface{}{"to": 123, "from": 456})
    res, err := client.Enqueue(task, asynq.Queue(QueueNotifications))
    if err != nil {
        log.Fatal(err)
    }
    fmt.Printf("قرار دادن با موفقیت: %+v\n", res)

    // ایجاد وظیفه "webhooks:sync" و قرار دادن آن در صف "webhooks".
    task = asynq.NewTask("webhooks:sync", map[string]interface{}{"data": 123})
    res, err = client.Enqueue(task, asynq.Queue(QueueWebhooks))
    if err != nil {
        log.Fatal(err)
    }
    fmt.Printf("قرار دادن با موفقیت: %+v\n", res)

    // ایجاد وظیفه "images:resize" و قرار دادن آن در صف "images".
    task = asynq.NewTask("images:resize", map[string]interface{}{"src": "some/path/to/image"})
    res, err = client.Enqueue(task, asynq.Queue(QueueImages))
    if err != nil {
        log.Fatal(err)
    }
    fmt.Printf("قرار دادن با موفقیت: %+v\n", res)
}

حالا این برنامه را اجرا کنیم تا سه وظیفه را در صف‌ها قرار دهیم.

go run client/client.go

حالا بیایید به کارگر برویم تا این سه وظیفه را پردازش کنیم. در worker.go، یک asynq.Server برای مصرف وظایف از این سه صف ایجاد خواهیم کرد. به همین ترتیب، با استفاده از RedisClusterClientOpt به خوشه Redis خود متصل می‌شویم.

// worker.go

package main

import (
	"context"
	"fmt"
	"log"

	"github.com/hibiken/asynq"
)

func main() {
	redisConnOpt := asynq.RedisClusterClientOpt{Addrs: []string{":7000", ":7001", ":7002", ":7003", ":7004", ":7005"}}
	srv := asynq.NewServer(redisConnOpt, asynq.Config{
		Concurrency: 20,
		// در اینجا اولویت یکسانی را برای هر صف تنظیم کنید
		Queues: map[string]int{
			"notifications": 1,
			"webhooks":      1,
			"images":        1,
		},
	})
	mux := asynq.NewServeMux()
	mux.HandleFunc("notifications:email", handleEmailTask)
	mux.HandleFunc("webhooks:sync", handleWebhookSyncTask)
	mux.HandleFunc("images:resize", handleImageResizeTask)
	if err := srv.Run(mux); err != nil {
		log.Fatalf("شکست در آغاز سرور: %v", err)
	}
}

func handleEmailTask(ctx context.Context, t *asynq.Task) error {
	to, err := t.Payload.GetInt("to")
	if err != nil {
		return err
	}
	from, err := t.Payload.GetInt("from")
	if err != nil {
		return err
	}
	fmt.Printf("در حال ارسال ایمیل از %d به %d\n", from, to)
	return nil
}

func handleWebhookSyncTask(ctx context.Context, t *asynq.Task) error {
	data, err := t.Payload.GetInt("data")
	if err != nil {
		return err
	}
	fmt.Printf("در حال بررسی وظیفه وب‌هوک: %d\n", data)
	return nil
}

func handleImageResizeTask(ctx context.Context, t *asynq.Task) error {
	src, err := t.Payload.GetString("src")
	if err != nil {
		return err
	}
	fmt.Printf("تغییر اندازه تصویر: %s\n", src)
	return nil
}

بیایید این سرور کارگر را اجرا کنیم تا وظایف سه‌گانه‌ی ایجاد شده از قبل را انجام دهد.

go run worker/worker.go

باید بتوانید پیام‌های چاپ شده از هر دستگیره را مشاهده کنید.


همانطور که در بخش مرور گفته شد، Asynq داده‌ها را بر اساس صف‌ها به بخش‌های کوچکتری تقسیم می‌کند. تمام وظایف اضافه شده به همان صف به یک نود Redis متصل می‌شوند. پس، هر نود Redis کدام صف را دارایی می‌کند؟

ما می‌توانیم از رابط خط فرمان برای پاسخ به این سوال استفاده کنیم.

asynq queue ls --cluster


این دستور یک لیست از صف‌ها را چاپ خواهد کرد، به همراه:

- نود خوشه که به آن صف تعلق دارد
- تخته هش خوشه که به آن صف نگاشته شده است

خروجی ممکن است شبیه به این باشد:

Queue Cluster KeySlot Cluster Nodes


images 9450 [{d54231bccd6c1765ea15caf95a41c67b10b91e58 127.0.0.1:7001} {70a7d4569eac28eed577ee91863703ffab98d2e0 127.0.0.1:7005}] webhooks 4418 [{d58959f6057ad0911d92d86d1d16dc2242e9ec48 127.0.0.1:7004} {e2fb9f1296a8d3a49818e0f9be3bfd74fdc052ea 127.0.0.1:7000}] notifications 16340 [{c738a8a98c5f5f9161e9563fa739f9c8191b7f1a 127.0.0.1:7002} {18cdaa0712191d74656f08017371df41eeaad5fa 127.0.0.1:7003}]