সংক্ষিপ্ত বিবরণ
এই পৃষ্ঠা এসিংক এর টাস্ক সংগ্রাহক বৈশিষ্ট্য উল্লেখ করে।
সারমর্ম
টাস্ক সংগ্রাহক আপনাকে "হ্যান্ডলার" একাধিক টাস্ক ধারণ করার বিপরীতে একটি ধারণার জন্য একত্রিত করতে দেয়। এই বৈশিষ্ট্যের মাধ্যমে আপনি একাধিক অনুক্রমিক অপারেশনকে একসাথে ব্যবহার করতে পারেন, যা মূল্য সংরক্ষণ, ক্যাশিং অপটিমাইজেশন, বা ব্যাচ বিজ্ঞপ্তিতে সাহায্য করে।
কর্মপ্রণালী
টাস্ক সংগ্রাহক বৈশিষ্ট্যটি ব্যবহার করতে, আপনার প্রয়োজন হবে সমান গ্রুপ নাম দিয়ে একই কিউতে টাস্ক ধারণ করা। একই (কিউ, গ্রুপ)
জোড়া ব্যবহার করে ধারণিত টাস্কগুলি আপনার প্রদত্ত GroupAggregator
দ্বারা একটি টাস্কে সংগ্রাহযোগ্য হবে, এবং সংগ্রাহ টাস্কটি হ্যান্ডলারে পাঠানো হবে।
সংগ্রাহক টাস্ক তৈরি করার সময়, Asynq সার্ভারটি কনফিগার করা গ্রেস পিরিয়ড উত্তীর্ণ হওয়া পর্যন্ত অধিক টাস্কের জন্য অপেক্ষা করবে। প্রতি বার যখন একটি নতুন টাস্ক একই (কিউ, গ্রুপ)
দিয়ে ধারণ করা হয়, তখন গ্রেস পিরিয়ডটি আপডেট হয়।
গ্রেস পিরিয়ডটির একটি কনফিগারেবল সীমানা আছে: আপনি যে গাড়িতে সংগ্রাহ বিলম্ব সময় সেট করতে পারেন, তারপরে Asynq সার্ভারটি অবশিষ্ট গ্রেস পিরিয়ড বাদ দেওয়ার পরে টাস্কগুলি সংগ্রহ করবে।
আপনি এবং সাথে সংগ্রহ করা যাবে সর্বাধিক সংগ্রহ বিলম্ব সময় যে আপনি সেট করতে পারেন।
নোট: টাস্ক অনুসূচনা এবং সংগ্রহণ সংঘটিত বৈশিষ্ট্যগুলি, সংঘটনার সাথে সংঘাত করা, যেগুলি প্রাধান্যে গ্রহণ করে।
দ্রুত উদাহরণ
ক্লায়েন্ট সাইডে, সমান গ্রুপে টাস্ক এনকিউ করতে কিউ
এবং গ্রুপ
অপশনগুলি ব্যবহার করুন।
// একই গ্রুপে তিনটি টাস্ক এনকিউ করুন।
client.Enqueue(task1, asynq.Queue("notifications"), asynq.Group("user1:email"))
client.Enqueue(task2, asynq.Queue("notifications"), asynq.Group("user1:email"))
client.Enqueue(task3, asynq.Queue("notifications"), asynq.Group("user1:email"))
সার্ভার সাইডে, টাস্ক এগ্রিগেশন সক্রিয় করতে GroupAggregator
প্রদান করুন। GroupGracePeriod
, GroupMaxDelay
, এবং GroupMaxSize
কনফিগার করে ওই এগ্রিগেশন রম্বণ পরিবর্তন করতে পারেন।
// এই ফাংশনটি একাধিক টাস্ককে একটি টাস্কের মধ্যে সংগ্রহীত করার জন্য ব্যবহৃত হয়।
func aggregate(group string, tasks []*asynq.Task) *asynq.Task {
// ... প্রদত্ত টাস্কগুলি সংগ্রহীত করার প্রয়োজনীয় লোজিক এবং এগ্রিগেটেড টাস্ক প্রদান করার জন্য।
// ... প্রয়োজনে, একটি নতুন টাস্ক তৈরি এবং অপশন সেট করার জন্য NewTask(typename, payload, opts...) ব্যবহার করুন।
// ... (নোট) কিউ অপশনটি উপেক্ষা করা হবে, এবং এগ্রিগেটেড টাস্কটি সবসময় একই কিউ-তে এনকিউ করা হবে যেমন গ্রুপে।
}
srv := asynq.NewServer(
redisConnOpt,
asynq.Config{
GroupAggregator: asynq.GroupAggregatorFunc(aggregate),
GroupMaxDelay: 10 * time.Minute,
GroupGracePeriod: 2 * time.Minute,
GroupMaxSize: 20,
Queues: map[string]int{"notifications": 1},
},
)
টিউটোরিয়াল
এই বিভাগে, আমরা এগ্রিগেশন ফিচারের ব্যবহার প্রদর্শন করার জন্য একটি সহজ প্রোগ্রাম প্রদান করি।
প্রথমে, নিম্নলিখিত কোড দিয়ে ক্লায়েন্ট প্রোগ্রাম তৈরি করুন:
// client.go
package main
import (
"flag"
"log"
"github.com/hibiken/asynq"
)
var (
flagRedisAddr = flag.String("redis-addr", "localhost:6379", "Redis server address")
flagMessage = flag.String("message", "hello", "Message to be printed when processing the task")
)
func main() {
flag.Parse()
c := asynq.NewClient(asynq.RedisClientOpt{Addr: *flagRedisAddr})
defer c.Close()
task := asynq.NewTask("aggregation-tutorial", []byte(*flagMessage))
info, err := c.Enqueue(task, asynq.Queue("tutorial"), asynq.Group("example-group"))
if err != nil {
log.Fatalf("Failed to enqueue task: %v", err)
}
log.Printf("Successfully enqueued task: %s", info.ID)
}
আপনি এই প্রোগ্রামটি একাধিক বার চালাতে পারেন:
$ go build -o client client.go
$ ./client --redis-addr=
$ ./client --message=hi --redis-addr=
$ ./client --message=bye --redis-addr=
এখন, যদি আপনি কন্সোল অথবা ওয়েব ইউআই দিয়ে কিউ চেক করেন, তাহলে আপনি দেখবেন যে কিউতে টাস্কগুলি এগ্রিগেট করা হয়েছে।
পরবর্তী, নিম্নলিখিত কোড দিয়ে একটি সার্ভার প্রোগ্রাম তৈরি করুন:
// server.go
package main
import (
"context"
"flag"
"log"
"strings"
"time"
"github.com/hibiken/asynq"
)
var (
flagRedisAddr = flag.String("redis-addr", "localhost:6379", "রেডিস সার্ভারের ঠিকানা")
flagGroupGracePeriod = flag.Duration("grace-period", 10*time.Second, "গ্রুপের জন্য গ্রেস পিরিয়ড")
flagGroupMaxDelay = flag.Duration("max-delay", 30*time.Second, "গ্রুপের জন্য সর্বাধিক বিলম্ব")
flagGroupMaxSize = flag.Int("max-size", 20, "গ্রুপের জন্য সর্বাধিক আকার")
)
// সহজ সংযোজন ফাংশন।
// সমস্ত টাস্কের বার্তা একত্রিত করে, যেখানে প্রতিটি বার্তা একটি লাইন নিয়ে।
func aggregate(group string, tasks []*asynq.Task) *asynq.Task {
log.Printf("%d টাস্ক সমূহ সংগ্রহ করা হয়েছে %q গ্রুপ থেকে", len(tasks), group)
var b strings.Builder
for _, t := range tasks {
b.Write(t.Payload())
b.WriteString("\n")
}
return asynq.NewTask("সংগৃহীত-টাস্ক", []byte(b.String()))
}
func handleAggregatedTask(ctx context.Context, task *asynq.Task) error {
log.Print("হ্যান্ডলার গ্রুপ করা টাস্কটি পেয়েছে")
log.Printf("সংগৃহীত বার্তা: %s", task.Payload())
return nil
}
func main() {
flag.Parse()
srv := asynq.NewServer(
asynq.RedisClientOpt{Addr: *flagRedisAddr},
asynq.Config{
Queues: map[string]int{"tutorial": 1},
GroupAggregator: asynq.GroupAggregatorFunc(aggregate),
GroupGracePeriod: *flagGroupGracePeriod,
GroupMaxDelay: *flagGroupMaxDelay,
GroupMaxSize: *flagGroupMaxSize,
},
)
mux := asynq.NewServeMux()
mux.HandleFunc("সংগৃহীত-টাস্ক", handleAggregatedTask)
if err := srv.Run(mux); err != nil {
log.Fatalf("সার্ভার চালু করতে ব্যর্থ: %v", err)
}
}
আপনি এই প্রোগ্রাম চালাতে পারেন এবং আউটপুট দেখতে পারেন:
$ go build -o server server.go
$ ./server --redis-addr=
আপনি বার্তা দেখতে পারবেন যে সার্ভারটি কোম্পিউটারের শ্রেণি তৈরি করেছে এবং প্রসেসরটি কোম্পিউটারে সংগৃহীত কর্মসূচিগুলি প্রসেস করেছে। উপরের প্রোগ্রামে --grace-period
, --max-delay
, এবং --max-size
ফ্লাগগুলি পরিবর্তন করে পরিবর্তন কীভাবে আসর করে তা দেখতে ভালোবাসবেন।