یہ صفحہ Asynq کی ٹاسک ایگریگیشن فیچر کا تعارف پیش کرتا ہے۔
وضاحت
ٹاسک ایگریگیشن کی اجازت دیتی ہے کہ آپ ایک کے بعد ایک انہیں "ہینڈلر" کو دینے کی بجائے متعدد ٹاسکس کو ترتیب سے قطار میں ڈالیں۔ یہ فیچر آپ کو متعدد متواتر عملوں کو ایک میں بیچنے، کو انفاق کو بچانے، کیشنگ کو بہتر بنانے یا بیچ میں اطلاعات کو بیچنے کی اجازت دیتا ہے۔
کام کا اصول
ٹاسک ایگریگیشن فیچر کا استعمال کرنے کے لئے، آپ کو وہی گروپ نام استعمال کر کے ٹاسکس کو ایک ہی قطار میں ڈالنا ہوگا۔ وہ ٹاسکس جو ہمیشہ ساتھ میں (قطار، گروپ)
جوڑی سے قطاروں پر ڈالے جاتے ہیں، وہ آپ کی مواصلت کرنے والی GroupAggregator
سے ایک ٹاسک میں ایکٹیگر ہوتے ہیں، اور یہ ایکٹیگر ٹاسک ہینڈلر کو دیا جاتا ہے۔
ایک ایکٹیگرٹیڈ ٹاسک بناتے وقت، Asynq سرور نئے ٹاسکس کا انتظار کرے گا جب تک قابل تنظیم گریس موقع ختم نہیں ہوجاتا۔ ہر بار جب کوئی نیا ٹاسک (قطار، گروپ)
سے قطار میں ڈالا جاتا ہے، گریس موقع کو اپ ڈیٹ کیا جاتا ہے۔
گریس موقع کا ایک قابل تنظیم اپر لمحہ ہوتا ہے: آپ زیادہ سے زیادہ ایگریگیشن دیریہ وقت کو ترتیب دے سکتے ہیں، جس کے بعد Asynq سرور باقی گریس موقع کو نظرانداز کرے گا اور ٹاسکس کو ایکٹیگریٹ کرے گا۔
آپ یہ بھی سیٹ کر سکتے ہیں کہ کتنے زیادہ سے زیادہ تاسکس ایک ساتھ ایگریگیٹ ہوسکتے ہیں۔ اسی تعداد تک پہنچنے کی صورت میں، Asynq سرور فوراً ٹاسکس کو ایکٹیگریٹ کرے گا۔
نوٹ: ٹاسک شیڈولنگ اور ایگریگیشن آپس میں تصادم رکھتے ہیں، جہاں شیڈولنگ ایگریگیشن کی پہلی برتری حاصل کرتا ہے۔
تیز مثال
کلائنٹ سائیڈ پر، Queue
اور Group
اختیارات کا استعمال کرتے ہوئے تمام تسکات کو ایک ہی گروپ میں قبول کرنے کے لئے تسکات کو قطاروں میں شامل کریں۔
// تین تسکات کو ایک ہی گروپ میں قطاروں میں شامل کریں۔
client.Enqueue(task1, asynq.Queue("notifications"), asynq.Group("user1:email"))
client.Enqueue(task2, asynq.Queue("notifications"), asynq.Group("user1:email"))
client.Enqueue(task3, asynq.Queue("notifications"), asynq.Group("user1:email"))
سرور سائیڈ پر، GroupAggregator
فراہم کریں تاکہ تسک اوول بندی ممکن بنا سکے۔ آپ GroupGracePeriod
، GroupMaxDelay
، اور GroupMaxSize
کو تشکیل دینے کے ذریعے تخصیصی اوول بندی پالیسی کو ترتیب دیتے ہیں۔
// یہ فعل، مختلف تسکات کو ایک تسک میں تجمیع کرنے کے لئے استعمال ہوتا ہے۔
func aggregate(group string, tasks []*asynq.Task) *asynq.Task {
// ... دی گئی تسکات کو تجمیع کرنے اور تجمیع شدہ تسک واپس میں دینے کے لئے آپ کا منطق۔
// ... ضرورت پیش آنے پر، NewTask(typename, payload, opts...) کا استعمال کریں تاکہ نیا تسک تخلیق کریں اور اختیارات ترتیب دیں۔
// ... (نوٹ) قطاروں کا اختیار نظرانداز ہوجائے گا، اور تجمیع شدہ تسک ہمیشہ انہی قطار میں قطاروں میں شامل ہو گا جو گروپ کی طرف کوئی ہوگا۔
}
srv := asynq.NewServer(
redisConnOpt,
asynq.Config{
GroupAggregator: asynq.GroupAggregatorFunc(aggregate),
GroupMaxDelay: 10 * time.Minute,
GroupGracePeriod: 2 * time.Minute,
GroupMaxSize: 20,
Queues: map[string]int{"notifications": 1},
},
)
سبق
اس حصہ میں، ہم اوول بندی خصوصیت کے استعمال کا مثال دینے کے لئے ایک سادہ پروگرام فراہم کرتے ہیں۔
سب سے پہلے، نیچے دیئے گئے کوڈ کے ساتھ ایک کلائنٹ پروگرام بنائیں:
// client.go
package main
import (
"flag"
"log"
"github.com/hibiken/asynq"
)
var (
flagRedisAddr = flag.String("redis-addr", "localhost:6379", "Redis server address")
flagMessage = flag.String("message", "hello", "Message to be printed when processing the task")
)
func main() {
flag.Parse()
c := asynq.NewClient(asynq.RedisClientOpt{Addr: *flagRedisAddr})
defer c.Close()
task := asynq.NewTask("aggregation-tutorial", []byte(*flagMessage))
info, err := c.Enqueue(task, asynq.Queue("tutorial"), asynq.Group("example-group"))
if err != nil {
log.Fatalf("ناکام ہوئی تسک کا اوول بندی: %v", err)
}
log.Printf("تسک کو کامیابی سے قطاروں میں شامل کر دیا گیا ہے: %s", info.ID)
}
آپ اس پروگرام کو متعدد باروں چلا سکتے ہیں:
$ go build -o client client.go
$ ./client --redis-addr=
$ ./client --message=hi --redis-addr=
$ ./client --message=bye --redis-addr=
اب، اگر آپ CLI یا ویب یو آئی کے ذریعے قطار کا جائزہ لیں، تو آپ کو دیکھنے کو ملے گا کہ تسکات قطار میں تجمیع ہو رہے ہیں۔
اگلا، نیچے دیئے گئے کوڈ کے ساتھ ایک سرور پروگرام بنائیں:
// server.go
package main
import (
"context"
"flag"
"log"
"strings"
"time"
"github.com/hibiken/asynq"
)
var (
flagRedisAddr = flag.String("redis-addr", "localhost:6379", "ریڈس سرور کا پتہ")
flagGroupGracePeriod = flag.Duration("grace-period", 10*time.Second, "گروپ کے لئے گریس مہلت")
flagGroupMaxDelay = flag.Duration("max-delay", 30*time.Second, "گروپ کے لئے زیادہ سے زیادہ دیری")
flagGroupMaxSize = flag.Int("max-size", 20, "گروپ کے لئے زیادہ سے زیادہ سائز")
)
// سادہ اجمالی تفصیل
// تمام کاموں کے پیغامات کو ایک میں ملا دیتا ہے، ہر پیغام ایک لائن لینے کے ساتھ۔
func aggregate(group string, tasks []*asynq.Task) *asynq.Task {
log.Printf("گروپ %q سے %d کاموں کو جمع کیا گیا", len(tasks), group)
var b strings.Builder
for _, t := range tasks {
b.Write(t.Payload())
b.WriteString("\n")
}
return asynq.NewTask("aggregated-task", []byte(b.String()))
}
func handleAggregatedTask(ctx context.Context, task *asynq.Task) error {
log.Print("ہینڈلر نے جمع شدہ ٹاسک کو وصول کیا")
log.Printf("جمع کردہ پیغام: %s", task.Payload())
return nil
}
func main() {
flag.Parse()
srv := asynq.NewServer(
asynq.RedisClientOpt{Addr: *flagRedisAddr},
asynq.Config{
Queues: map[string]int{"tutorial": 1},
GroupAggregator: asynq.GroupAggregatorFunc(aggregate),
GroupGracePeriod: *flagGroupGracePeriod,
GroupMaxDelay: *flagGroupMaxDelay,
GroupMaxSize: *flagGroupMaxSize,
},
)
mux := asynq.NewServeMux()
mux.HandleFunc("aggregated-task", handleAggregatedTask)
if err := srv.Run(mux); err != nil {
log.Fatalf("سرور کو شروع کرنے میں ناکام ہوگیا: %v", err)
}
}
آپ اس پروگرام کو چلا کر اس کا افراط دیکھ سکتے ہیں:
$ go build -o server server.go
$ ./server --redis-addr=
آپ کو دیکھنا چاہئے کہ افراط کرنے والے سرور نے گروپ میں کاموں کو جمع کیا ہے اور پروسیسر نے جمع شدہ کاموں کا خاتمہ کیا ہے۔ محض آزمانے کے لئے مندرجہ بالا پروگرام میں --grace-period
، --max-delay
، اور --max-size
flags کو تبدیل کرنے کی کوشش کریں تاکہ آپ دیکھ سکیں کہ یہ اجمالی استراتیجی کو کیسے متاثر کرتے ہیں۔