صفحه حاضر ویژگی تجمیع وظایف Asynq را معرفی میکند.
مرور
تجمیع وظایف به شما امکان میدهد تا چندین وظیفه را پشت سر هم در صف قرار دهید، به جای اینکه آنها را یکی یکی به یک “دستگیر” منتقل کنید. این ویژگی به شما این امکان را میدهد که چندین عملیات پشت سر هم را ترکیب کرده، هزینه را صرفهجویی کنید، حافظههای نهان را بهینه کنید و یا اطلاعیهها را دستهبندی کنید.
اصل کار
برای استفاده از ویژگی تجمیع وظایف، شما باید وظایفی را با همان نام گروه به همان صف قرار دهید. وظایف قرار داده شده با همان (queue، group)
توسط GroupAggregator
ارائه شده توسط شما، تجمیع شده و وظیفه تجمیع شده به دستگیر منتقل میشود.
هنگام ایجاد یک وظیفه تجمیع شده، سرور Asynq منتظر وظایف بیشتر میماند تا زمان گریس قابل پیکربندی منقضی شود. هر بار که وظیفه جدید با همان (صف، گروه)
قرار داده میشود، زمان گریس بهروزرسانی میشود.
زمان گریس حداکثر بالقوهای قابل پیکربندی دارد: میتوانید حداکثر تاخیر تجمیع را تنظیم کنید و پس از آن، سرور Asynq از باقیمانده زمان گریس چشمپوشی میکند و وظایف را تجمیع میکند.
همچنین میتوانید حداکثر تعداد وظایف قابل تجمیع را تنظیم کنید. در صورت رسیدن به این تعداد، سرور Asynq بلافاصله وظایف را تجمیع میکند.
توجه: زمانبندی و تجمیع وظایف ویژگیهای متضادی هستند، با زمانبندی اولویت بر تجمیع را دارد.
مثال سریع
در سمت مشتری، از گزینههای Queue
و Group
برای درصف کردن وظایف در یک گروه استفاده کنید.
// درج سه وظیفه در یک گروه
client.Enqueue(task1, asynq.Queue("notifications"), asynq.Group("user1:email"))
client.Enqueue(task2, asynq.Queue("notifications"), asynq.Group("user1:email"))
client.Enqueue(task3, asynq.Queue("notifications"), asynq.Group("user1:email"))
در سمت سرور، یک GroupAggregator
ارائه دهید تا تجمیع وظایف را فعال کنید. میتوانید با تنظیم GroupGracePeriod
، GroupMaxDelay
و GroupMaxSize
استراتژی تجمیع را سفارشی کنید.
// این تابع برای تجمیع چندین وظیفه به یک وظیفه استفاده میشود.
func aggregate(group string, tasks []*asynq.Task) *asynq.Task {
// ... منطق خود را برای تجمیع وظایف داده شده پیاده کنید و وظیفه تجمیع شده را برگردانید.
// ... در صورت لزوم، از NewTask(typename، payload، opts...) برای ایجاد یک وظیفه جدید و تنظیم گزینهها استفاده کنید.
// ... (نکته) گزینه صف نادیده گرفته میشود و وظیفه تجمیع شده همیشه به همان صفی که گروه است، در صف درخواهد آمد.
}
srv := asynq.NewServer(
redisConnOpt,
asynq.Config{
GroupAggregator: asynq.GroupAggregatorFunc(aggregate),
GroupMaxDelay: 10 * time.Minute,
GroupGracePeriod: 2 * time.Minute,
GroupMaxSize: 20,
Queues: map[string]int{"notifications": 1},
},
)
آموزش
در این بخش، یک برنامه ساده برای نشان دادن استفاده از ویژگی تجمیع ارائه میدهیم.
ابتدا، یک برنامه مشتری با کد زیر ایجاد کنید:
// client.go
package main
import (
"flag"
"log"
"github.com/hibiken/asynq"
)
var (
flagRedisAddr = flag.String("redis-addr", "localhost:6379", "آدرس سرور Redis")
flagMessage = flag.String("message", "hello", "پیامی که هنگام پردازش وظیفه چاپ میشود")
)
func main() {
flag.Parse()
c := asynq.NewClient(asynq.RedisClientOpt{Addr: *flagRedisAddr})
defer c.Close()
task := asynq.NewTask("aggregation-tutorial", []byte(*flagMessage))
info, err := c.Enqueue(task, asynq.Queue("tutorial"), asynq.Group("example-group"))
if err != nil {
log.Fatalf("فرستادن وظیفه به صف ناموفق بود: %v", err)
}
log.Printf("موفقیتآمیز درج وظیفه: %s", info.ID)
}
میتوانید این برنامه را چندین بار اجرا کنید:
$ go build -o client client.go
$ ./client --redis-addr=
$ ./client --message=hi --redis-addr=
$ ./client --message=bye --redis-addr=
اکنون، اگر از رابط خط فرمان یا رابط کاربری وب صف را بررسی کنید، وظایف در حال تجمیع را در صف مشاهده خواهید کرد.
سپس، یک برنامه سرور با کد زیر ایجاد کنید:
// server.go
package main
import (
"context"
"flag"
"log"
"strings"
"time"
"github.com/hibiken/asynq"
)
var (
flagRedisAddr = flag.String("redis-addr", "localhost:6379", "آدرس سرور Redis")
flagGroupGracePeriod = flag.Duration("grace-period", 10*time.Second, "دوره گریس گروه ها")
flagGroupMaxDelay = flag.Duration("max-delay", 30*time.Second, "حداکثر تاخیر برای گروه ها")
flagGroupMaxSize = flag.Int("max-size", 20, "حداکثر اندازه برای گروه ها")
)
// تابع جمع بندی ساده.
// پیام های تمام وظایف را با هم ترکیب می کند، هر پیام یک خط را به خود اختصاص داده است.
func aggregate(group string, tasks []*asynq.Task) *asynq.Task {
log.Printf("تعداد وظایف %d از گروه %q جمع بندی شد", len(tasks), group)
var b strings.Builder
for _, t := range tasks {
b.Write(t.Payload())
b.WriteString("\n")
}
return asynq.NewTask("وظیفه-جمع-بندی-شده", []byte(b.String()))
}
func handleAggregatedTask(ctx context.Context, task *asynq.Task) error {
log.Print("دستگاه دریافت کننده وظیفه جمع بندی شده")
log.Printf("پیام جمع بندی شده: %s", task.Payload())
return nil
}
func main() {
flag.Parse()
srv := asynq.NewServer(
asynq.RedisClientOpt{Addr: *flagRedisAddr},
asynq.Config{
Queues: map[string]int{"آموزشی": 1},
GroupAggregator: asynq.GroupAggregatorFunc(aggregate),
GroupGracePeriod: *flagGroupGracePeriod,
GroupMaxDelay: *flagGroupMaxDelay,
GroupMaxSize: *flagGroupMaxSize,
},
)
mux := asynq.NewServeMux()
mux.HandleFunc("وظیفه-جمع-بندی-شده", handleAggregatedTask)
if err := srv.Run(mux); err != nil {
log.Fatalf("شکست در آغاز سرور: %v", err)
}
}
شما میتوانید این برنامه را اجرا کرده و خروجی را مشاهده نمایید:
$ go build -o server server.go
$ ./server --redis-addr=
شما باید ببینید که سرور تسکها را در گروه جمع بندی کرده است و پردازنده تسکهای جمع بندی شده را پردازش کرده است. احساس راحتی داشته باشید که میتوانید --grace-period
، --max-delay
و --max-size
را در برنامه فوق تغییر دهید و تأثیر آنها براستیژ جمع بندی را مشاهده کنید.