صفحه حاضر ویژگی تجمیع وظایف Asynq را معرفی می‌کند.

مرور

تجمیع وظایف به شما امکان می‌دهد تا چندین وظیفه را پشت سر هم در صف قرار دهید، به جای اینکه آن‌ها را یکی یکی به یک “دستگیر” منتقل کنید. این ویژگی به شما این امکان را می‌دهد که چندین عملیات پشت سر هم را ترکیب کرده، هزینه را صرفه‌جویی کنید، حافظه‌های نهان را بهینه کنید و یا اطلاعیه‌ها را دسته‌بندی کنید.

اصل کار

برای استفاده از ویژگی تجمیع وظایف، شما باید وظایفی را با همان نام گروه به همان صف قرار دهید. وظایف قرار داده شده با همان (queue، group) توسط GroupAggregator ارائه شده توسط شما، تجمیع شده و وظیفه تجمیع شده به دستگیر منتقل می‌شود.

هنگام ایجاد یک وظیفه تجمیع شده، سرور Asynq منتظر وظایف بیشتر می‌ماند تا زمان گریس قابل پیکربندی منقضی شود. هر بار که وظیفه جدید با همان (صف، گروه) قرار داده می‌شود، زمان گریس به‌روزرسانی می‌شود.

زمان گریس حداکثر بالقوه‌ای قابل پیکربندی دارد: می‌توانید حداکثر تاخیر تجمیع را تنظیم کنید و پس از آن، سرور Asynq از باقی‌مانده زمان گریس چشم‌پوشی می‌کند و وظایف را تجمیع می‌کند.

همچنین می‌توانید حداکثر تعداد وظایف قابل تجمیع را تنظیم کنید. در صورت رسیدن به این تعداد، سرور Asynq بلافاصله وظایف را تجمیع می‌کند.

توجه: زمان‌بندی و تجمیع وظایف ویژگی‌های متضادی هستند، با زمان‌بندی اولویت بر تجمیع را دارد.

مثال سریع

در سمت مشتری، از گزینه‌های Queue و Group برای درصف کردن وظایف در یک گروه استفاده کنید.

// درج سه وظیفه در یک گروه
client.Enqueue(task1, asynq.Queue("notifications"), asynq.Group("user1:email"))
client.Enqueue(task2, asynq.Queue("notifications"), asynq.Group("user1:email"))
client.Enqueue(task3, asynq.Queue("notifications"), asynq.Group("user1:email"))

در سمت سرور، یک GroupAggregator ارائه دهید تا تجمیع وظایف را فعال کنید. می‌توانید با تنظیم GroupGracePeriod، GroupMaxDelay و GroupMaxSize استراتژی تجمیع را سفارشی کنید.

// این تابع برای تجمیع چندین وظیفه به یک وظیفه استفاده می‌شود.
func aggregate(group string, tasks []*asynq.Task) *asynq.Task {
    // ... منطق خود را برای تجمیع وظایف داده شده پیاده کنید و وظیفه تجمیع شده را برگردانید.
    // ... در صورت لزوم، از NewTask(typename، payload، opts...) برای ایجاد یک وظیفه جدید و تنظیم گزینه‌ها استفاده کنید.
    // ... (نکته) گزینه صف نادیده گرفته می‌شود و وظیفه تجمیع شده همیشه به همان صفی که گروه است، در صف درخواهد آمد.
}

srv := asynq.NewServer(
           redisConnOpt,
           asynq.Config{
               GroupAggregator:  asynq.GroupAggregatorFunc(aggregate),
               GroupMaxDelay:    10 * time.Minute,
               GroupGracePeriod: 2 * time.Minute,
               GroupMaxSize:     20,
               Queues: map[string]int{"notifications": 1},
           },
       )

آموزش

در این بخش، یک برنامه ساده برای نشان دادن استفاده از ویژگی تجمیع ارائه می‌دهیم.

ابتدا، یک برنامه مشتری با کد زیر ایجاد کنید:

// client.go
package main

import (
        "flag"
        "log"

        "github.com/hibiken/asynq"
)

var (
       flagRedisAddr = flag.String("redis-addr", "localhost:6379", "آدرس سرور Redis")
       flagMessage = flag.String("message", "hello", "پیامی که هنگام پردازش وظیفه چاپ می‌شود")
)

func main() {
        flag.Parse()

        c := asynq.NewClient(asynq.RedisClientOpt{Addr: *flagRedisAddr})
        defer c.Close()

        task := asynq.NewTask("aggregation-tutorial", []byte(*flagMessage))
        info, err := c.Enqueue(task, asynq.Queue("tutorial"), asynq.Group("example-group"))
        if err != nil {
                log.Fatalf("فرستادن وظیفه به صف ناموفق بود: %v", err)
        }
        log.Printf("موفقیت‌آمیز درج وظیفه: %s", info.ID)
}

می‌توانید این برنامه را چندین بار اجرا کنید:

$ go build -o client client.go 
$ ./client --redis-addr=
$ ./client --message=hi --redis-addr=
$ ./client --message=bye --redis-addr=

اکنون، اگر از رابط خط فرمان یا رابط کاربری وب صف را بررسی کنید، وظایف در حال تجمیع را در صف مشاهده خواهید کرد.

سپس، یک برنامه سرور با کد زیر ایجاد کنید:

// server.go
package main

import (
    "context"
    "flag"
    "log"
    "strings"
    "time"

    "github.com/hibiken/asynq"
)

var (
    flagRedisAddr         = flag.String("redis-addr", "localhost:6379", "آدرس سرور Redis")
    flagGroupGracePeriod  = flag.Duration("grace-period", 10*time.Second, "دوره گریس گروه ها")
    flagGroupMaxDelay     = flag.Duration("max-delay", 30*time.Second, "حداکثر تاخیر برای گروه ها")
    flagGroupMaxSize      = flag.Int("max-size", 20, "حداکثر اندازه برای گروه ها")
)

// تابع جمع بندی ساده.
// پیام های تمام وظایف را با هم ترکیب می کند، هر پیام یک خط را به خود اختصاص داده است.
func aggregate(group string, tasks []*asynq.Task) *asynq.Task {
    log.Printf("تعداد وظایف %d از گروه %q جمع بندی شد", len(tasks), group)
    var b strings.Builder
    for _, t := range tasks {
        b.Write(t.Payload())
        b.WriteString("\n")
    }
    return asynq.NewTask("وظیفه-جمع-بندی-شده", []byte(b.String()))
}

func handleAggregatedTask(ctx context.Context, task *asynq.Task) error {
    log.Print("دستگاه دریافت کننده وظیفه جمع بندی شده")
    log.Printf("پیام جمع بندی شده: %s", task.Payload())
    return nil
}

func main() {
    flag.Parse()

    srv := asynq.NewServer(
        asynq.RedisClientOpt{Addr: *flagRedisAddr},
        asynq.Config{
            Queues:           map[string]int{"آموزشی": 1},
            GroupAggregator:  asynq.GroupAggregatorFunc(aggregate),
            GroupGracePeriod: *flagGroupGracePeriod,
            GroupMaxDelay:    *flagGroupMaxDelay,
            GroupMaxSize:     *flagGroupMaxSize,
        },
    )

    mux := asynq.NewServeMux()
    mux.HandleFunc("وظیفه-جمع-بندی-شده", handleAggregatedTask)

    if err := srv.Run(mux); err != nil {
        log.Fatalf("شکست در آغاز سرور: %v", err)
    }
}

شما می‌توانید این برنامه را اجرا کرده و خروجی را مشاهده نمایید:

$ go build -o server server.go
$ ./server --redis-addr=

شما باید ببینید که سرور تسک‌ها را در گروه جمع بندی کرده است و پردازنده تسک‌های جمع بندی شده را پردازش کرده است. احساس راحتی داشته باشید که می‌توانید --grace-period، --max-delay و --max-size را در برنامه فوق تغییر دهید و تأثیر آن‌ها براستیژ جمع بندی را مشاهده کنید.