هذه الصفحة تقدم كيفية استخدام Redis Cluster كوكيل رسائل في Asynq.

مزايا استخدام Redis Cluster

من خلال استخدام Redis Cluster ، يمكنك الاستفادة من الفوائد التالية:

  • تجزئة البيانات بسهولة عبر عدة عقد Redis
  • الحفاظ على التوفر في حالة فشل بعض العقد
  • القيام تلقائيًا بالانتقال إلى الفشل

نظرة عامة

Cluster Queue Illustration

يجزئ Asynq البيانات استنادًا إلى الطوابير. في الرسم البياني أعلاه ، لدينا Redis Cluster مع 6 مثيلات (3 رؤساء و 3 عبيد) و 4 طوابير (q1، q2، q3، q4).

  • يستضيف Master1 (ونسخته الاحتياطية ، Slave1) q1 و q2.
  • يستضيف Master2 (ونسخته الاحتياطية ، Slave2) q3.
  • يستضيف Master3 (ونسخته الاحتياطية ، Slave3) q4.

عند استخدام asynq.Client لوضع المهام في الانتظار ، يمكنك تحديد الطابور باستخدام الخيار Queue. سيتم استهلاك المهام التي تم وضعها في الانتظار بواسطة asynq.Server(s) الذي يستخرج المهام من هذه الطوابير.

البرنامج التعليمي

في هذا القسم، سنقوم بشرح كيفية استخدام Redis Cluster كجهة اتصال رسالات لAsynq. نفترض أن لديك مجموعة من 6 حالات Redis تعمل على المنافذ 7000-7005. أدناه مثال على ملف redis.conf:

port 7000
cluster-enabled yes
cluster-config-file nodes.conf
cluster-node-timeout 5000
appendonly yes

بعد ذلك، سنقوم بإنشاء ملفين ثنائيين: client و worker.

go mod init asynq-redis-cluster-quickstart
mkdir client worker
touch client/client.go worker/worker.go

في client.go، سنقوم بإنشاء asynq.Client جديد وتحديد كيفية الاتصال بـRedis Cluster عن طريق تمرير RedisClusterClientOpt.

client := asynq.NewClient(asynq.RedisClusterClientOpt{
    Addrs: []string{":7000", ":7001", ":7002", ":7003", ":7004", ":7005"},
})

بمجرد الحصول على العميل، سنقوم بإنشاء المهام وإدراجها في ثلاثة طوابير مختلفة:

  • الإشعارات
  • ويبهوكس
  • الصور
// client.go
package main
import (
    "fmt"
    "log"
    "github.com/hibiken/asynq"
)
// أسماء الطوابير
const (
    QueueNotifications = "notifications"
    QueueWebhooks      = "webhooks"
    QueueImages        = "images"
)
func main() {
    client := asynq.NewClient(asynq.RedisClusterClientOpt{
        Addrs: []string{":7000", ":7001", ":7002", ":7003", ":7004", ":7005"},
    })
    defer client.Close()
    // إنشاء مهمة "notifications:email" وإضافتها إلى طابور "notifications".
    task := asynq.NewTask("notifications:email", map[string]interface{}{"to": 123, "from": 456})
    res, err := client.Enqueue(task, asynq.Queue(QueueNotifications))
    if err != nil {
        log.Fatal(err)
    }
    fmt.Printf("تمت إضافته بنجاح: %+v\n", res)
    // إنشاء مهمة "webhooks:sync" وإضافتها إلى طابور "webhooks".
    task = asynq.NewTask("webhooks:sync", map[string]interface{}{"data": 123})
    res, err = client.Enqueue(task, asynq.Queue(QueueWebhooks))
    if err != nil {
        log.Fatal(err)
    }
    fmt.Printf("تمت إضافته بنجاح: %+v\n", res)
    // إنشاء مهمة "images:resize" وإضافتها إلى طابور "images".
    task = asynq.NewTask("images:resize", map[string]interface{}{"src": "some/path/to/image"})
    res, err = client.Enqueue(task, asynq.Queue(QueueImages))
    if err != nil {
        log.Fatal(err)
    }
    fmt.Printf("تمت إضافته بنجاح: %+v\n", res)
}

دعنا نقوم بتشغيل هذا البرنامج لإدراج المهام الثلاث في الطوابير.

go run client/client.go

الآن، لننتقل إلى العامل لمعالجة هذه المهام الثلاث. في worker.go، سنقوم بإنشاء asynq.Server لاستهلاك المهام من هذه الطوابير الثلاث. بنفس الطريقة، سنقوم بالاتصال بمجموعتنا من Redis Cluster باستخدام RedisClusterClientOpt.

// worker.go

package main

import (
	"context"
	"fmt"
	"log"

	"github.com/hibiken/asynq"
)

func main() {
	redisConnOpt := asynq.RedisClusterClientOpt{Addrs: []string{":7000", ":7001", ":7002", ":7003", ":7004", ":7005"}}
	srv := asynq.NewServer(redisConnOpt, asynq.Config{
		Concurrency: 20,
		// Set the same priority for each queue here
		Queues: map[string]int{
			"notifications": 1,
			"webhooks":      1,
			"images":        1,
		},
	})
	mux := asynq.NewServeMux()
	mux.HandleFunc("notifications:email", handleEmailTask)
	mux.HandleFunc("webhooks:sync", handleWebhookSyncTask)
	mux.HandleFunc("images:resize", handleImageResizeTask)
	if err := srv.Run(mux); err != nil {
	    log.Fatalf("Failed to start the server: %v", err)
	}
}

func handleEmailTask(ctx context.Context, t *asynq.Task) error {
    to, err := t.Payload.GetInt("to")
    if err != nil {
	    return err
    }
	from, err := t.Payload.GetInt("from")
	if err != nil {
	    return err
    }
	fmt.Printf("إرسال بريد إلكتروني من %d إلى %d\n", from, to)
	return nil
}

func handleWebhookSyncTask(ctx context.Context, t *asynq.Task) error {
	data, err := t.Payload.GetInt("data")
	if err != nil {
		return err
	}
	fmt.Printf("معالجة مهمة الويب هوك: %d\n", data)
	return nil
}

func handleImageResizeTask(ctx context.Context, t *asynq.Task) error {
	src, err := t.Payload.GetString("src")
	if err != nil {
		return err
	}
	fmt.Printf("تغيير حجم الصورة: %s\n", src)
	return nil
}

دعنا نقوم بتشغيل خادم العامل لمعالجة المهام الثلاث التي قمنا بإنشائها سابقًا.

go run worker/worker.go

يجب أن تتمكن من رؤية الرسائل المطبوعة من كل معالج.

كما ذكر في نظرة عامة، يقسم Asynq البيانات استنادًا إلى الصفوف. جميع المهام المضافة إلى نفس الصف تنتمي إلى نفس كرة Redis. إذاً، أي كرة Redis يستضيف أي صف؟

يمكننا استخدام واجهة السطر الخاصة بالمستخدم للإجابة على هذا السؤال.

asynq queue ls --cluster

سيقوم هذا الأمر بطباعة قائمة الصفوف، جنباً إلى جنب مع:

  • عقدة المجموعة التي ينتمي إليها الصف
  • فتحة التجزئة التي يتم تعيين الصف إليها

قد يبدو الإخراج مثل ما يلي:

Queue          Cluster KeySlot  Cluster Nodes
-----          ---------------  -------------
images         9450              [{d54231bccd6c1765ea15caf95a41c67b10b91e58 127.0.0.1:7001} {70a7d4569eac28eed577ee91863703ffab98d2e0 127.0.0.1:7005}]
webhooks       4418              [{d58959f6057ad0911d92d86d1d16dc2242e9ec48 127.0.0.1:7004} {e2fb9f1296a8d3a49818e0f9be3bfd74fdc052ea 127.0.0.1:7000}]
notifications  16340             [{c738a8a98c5f5f9161e9563fa739f9c8191b7f1a 127.0.0.1:7002} {18cdaa0712191d74656f08017371df41eeaad5fa 127.0.0.1:7003}]