هذه الصفحة تقدم ميزة تجميع المهام لـ Asynq.

نظرة عامة

تتيح ميزة تجميع المهام لك إضافة مهام متعددة إلى قائمة الانتظار بتعاقب، بدلاً من تمريرها واحدة تلو الأخرى إلى "المعالج". تُمكنك هذه الميزة من دُمج عمليات متتالية متعددة في مهمة واحدة، مما يوفر التكاليف ويحسن التخزين المؤقت، أو يُجمع الإشعارات.

مبدأ العمل

لاستخدام ميزة تجميع المهام، تحتاج إلى إضافة المهام بنفس اسم المجموعة إلى نفس قائمة الانتظار. ستُجمع المهام التي تمت إضافتها باستخدام الزوج نفسه (queue, group) في مهمة واحدة بواسطة GroupAggregator الذي تقدمه، وستمرر المهمة المُجمَعة إلى المعالج.

عند إنشاء مهمة مُجَمَعة، سيراقب خادم Asynq المزيد من المهام حتى ينتهي فترة السماح القابلة للتكوين. في كل مرة يتم إضافة مهمة جديدة بنفس (queue, group)، يُحدث فترة السماح.

تحتوي فترة السماح على حد أقصى قابل للتكوين: يمكنك تعيين أقصى وقت تأخير للتجميع، بعد الذي سيتجاهل خادم Asynq الفترة الباقية ويجمع المهام.

يمكنك أيضًا تعيين أقصى عدد للمهام التي يمكن دمجها معًا. إذا تم الوصول إلى هذا الرقم، سيقوم خادم Asynq بجمع المهام على الفور.

ملاحظة: جدولة المهام وتجميعها ميزتان متصادمتان، حيث تأخذ جدولة الأولوية على التجميع.

مثال سريع

على الجانب العميل، استخدم خيارات Queue و Group لإضافة مهام إلى نفس المجموعة.

// قم بإضافة ثلاث مهام إلى نفس المجموعة.
client.Enqueue(task1, asynq.Queue("notifications"), asynq.Group("user1:email"))
client.Enqueue(task2, asynq.Queue("notifications"), asynq.Group("user1:email"))
client.Enqueue(task3, asynq.Queue("notifications"), asynq.Group("user1:email"))

على الجانب الخادم، قم بتوفير GroupAggregator لتمكين تجميع المهام. يمكنك تخصيص استراتيجية التجميع عن طريق تكوين GroupGracePeriod و GroupMaxDelay و GroupMaxSize.

// تُستخدم هذه الوظيفة لتجميع مهام متعددة في مهمة واحدة.
func aggregate(group string, tasks []*asynq.Task) *asynq.Task {
    // ... منطقك لتجميع المهام المعطاة وإرجاع المهمة المجمعة.
    // ... إذا لزم الأمر، استخدم NewTask(typename, payload, opts...) لإنشاء مهمة جديدة وتعيين الخيارات.
    // ... (ملاحظة) سيتم تجاهل خيار الانتظاب، وستُضاف المهمة المتجمعة دائمًا إلى نفس الانتظار كمجموعة.
}

srv := asynq.NewServer(
           redisConnOpt,
           asynq.Config{
               GroupAggregator:  asynq.GroupAggregatorFunc(aggregate),
               GroupMaxDelay:    10 * time.Minute,
               GroupGracePeriod: 2 * time.Minute,
               GroupMaxSize:     20,
               Queues: map[string]int{"notifications": 1},
           },
       )

البرنامج التعليمي

في هذا القسم، نقدم برنامجًا بسيطًا لتوضيح استخدام ميزة التجميع.

أولاً، قم بإنشاء برنامج عميل باستخدام الكود التالي:

// client.go
package main

import (
        "flag"
        "log"

        "github.com/hibiken/asynq"
)

var (
       flagRedisAddr = flag.String("redis-addr", "localhost:6379", "عنوان خادم Redis")
       flagMessage = flag.String("message", "مرحبًا", "الرسالة التي ستُطبع عند معالجة المهمة")
)

func main() {
        flag.Parse()

        c := asynq.NewClient(asynq.RedisClientOpt{Addr: *flagRedisAddr})
        defer c.Close()

        task := asynq.NewTask("aggregation-tutorial", []byte(*flagMessage))
        info, err := c.Enqueue(task, asynq.Queue("tutorial"), asynq.Group("example-group"))
        if err != nil {
                log.Fatalf("فشل في إضافة المهمة إلى الانتظار: %v", err)
        }
        log.Printf("تمت إضافة المهمة بنجاح: %s", info.ID)
}

يمكنك تشغيل هذا البرنامج عدة مرات:

$ go build -o client client.go 
$ ./client --redis-addr=
$ ./client --message=hi --redis-addr=
$ ./client --message=bye --redis-addr=

الآن، إذا قمت بالتحقق من الانتظار من خلال واجهة سطر الأوامر أو واجهة المستخدم على الويب، سترى المهام المُجمَّعة في الانتظار.

بعد ذلك، قم بإنشاء برنامج خادم بالكود التالي:

// server.go
package main

import (
	"context"
	"flag"
	"log"
	"strings"
	"time"

	"github.com/hibiken/asynq"
)

var (
	flagRedisAddr         = flag.String("redis-addr", "localhost:6379", "عنوان خادم Redis")
	flagGroupGracePeriod  = flag.Duration("grace-period", 10*time.Second, "فترة سماح للمجموعات")
	flagGroupMaxDelay     = flag.Duration("max-delay", 30*time.Second, "أقصى تأخير للمجموعات")
	flagGroupMaxSize      = flag.Int("max-size", 20, "أقصى حجم للمجموعات")
)

// وظيفة تجميع بسيطة.
// تجمع رسائل جميع المهام في رسالة واحدة، حيث تحتل كل رسالة سطرًا واحدًا.
func تجميع(group string, tasks []*asynq.Task) *asynq.Task {
	log.Printf("تم تجميع %d مهام من المجموعة %q", len(tasks), group)
	var b strings.Builder
	for _, t := range tasks {
		b.Write(t.Payload())
		b.WriteString("\n")
	}
	return asynq.NewTask("aggregated-task", []byte(b.String()))
}

func معالجةالمهمةالمجمعة(ctx context.Context, task *asynq.Task) error {
	log.Print("تلقى المعالج المهمة المجمعة")
	log.Printf("الرسالة المجمعة: %s", task.Payload())
	return nil
}

func main() {
	flag.Parse()

	srv := asynq.NewServer(
		asynq.RedisClientOpt{Addr: *flagRedisAddr},
		asynq.Config{
			Queues:           map[string]int{"tutorial": 1},
			GroupAggregator:  asynq.GroupAggregatorFunc(تجميع),
			GroupGracePeriod: *flagGroupGracePeriod,
			GroupMaxDelay:    *flagGroupMaxDelay,
			GroupMaxSize:     *flagGroupMaxSize,
		},
	)

	mux := asynq.NewServeMux()
	mux.HandleFunc("aggregated-task", معالجةالمهمةالمجمعة)

	if err := srv.Run(mux); err != nil {
		log.Fatalf("فشل بدء الخادم: %v", err)
	}
}

يمكنك تشغيل هذا البرنامج ومراقبة النتائج:

$ go build -o server server.go
$ ./server --redis-addr=

يجب أن تكون قادرًا على رؤية في النتائج أن الخادم قد قام بتجميع المهام في المجموعة وأن المعالج قام بمعالجة المهام المجمعة. لا تتردد في محاولة تغيير العلمين --grace-period، --max-delay، و --max-size في البرنامج أعلاه لرؤية كيفية تأثيرها على استراتيجية التجميع.