সংক্ষিপ্ত বিবরণ

এই পৃষ্ঠা এসিংক এর টাস্ক সংগ্রাহক বৈশিষ্ট্য উল্লেখ করে।

সারমর্ম

টাস্ক সংগ্রাহক আপনাকে "হ্যান্ডলার" একাধিক টাস্ক ধারণ করার বিপরীতে একটি ধারণার জন্য একত্রিত করতে দেয়। এই বৈশিষ্ট্যের মাধ্যমে আপনি একাধিক অনুক্রমিক অপারেশনকে একসাথে ব্যবহার করতে পারেন, যা মূল্য সংরক্ষণ, ক্যাশিং অপটিমাইজেশন, বা ব্যাচ বিজ্ঞপ্তিতে সাহায্য করে।

কর্মপ্রণালী

টাস্ক সংগ্রাহক বৈশিষ্ট্যটি ব্যবহার করতে, আপনার প্রয়োজন হবে সমান গ্রুপ নাম দিয়ে একই কিউতে টাস্ক ধারণ করা। একই (কিউ, গ্রুপ) জোড়া ব্যবহার করে ধারণিত টাস্কগুলি আপনার প্রদত্ত GroupAggregator দ্বারা একটি টাস্কে সংগ্রাহযোগ্য হবে, এবং সংগ্রাহ টাস্কটি হ্যান্ডলারে পাঠানো হবে।

সংগ্রাহক টাস্ক তৈরি করার সময়, Asynq সার্ভারটি কনফিগার করা গ্রেস পিরিয়ড উত্তীর্ণ হওয়া পর্যন্ত অধিক টাস্কের জন্য অপেক্ষা করবে। প্রতি বার যখন একটি নতুন টাস্ক একই (কিউ, গ্রুপ) দিয়ে ধারণ করা হয়, তখন গ্রেস পিরিয়ডটি আপডেট হয়।

গ্রেস পিরিয়ডটির একটি কনফিগারেবল সীমানা আছে: আপনি যে গাড়িতে সংগ্রাহ বিলম্ব সময় সেট করতে পারেন, তারপরে Asynq সার্ভারটি অবশিষ্ট গ্রেস পিরিয়ড বাদ দেওয়ার পরে টাস্কগুলি সংগ্রহ করবে।

আপনি এবং সাথে সংগ্রহ করা যাবে সর্বাধিক সংগ্রহ বিলম্ব সময় যে আপনি সেট করতে পারেন।

নোট: টাস্ক অনুসূচনা এবং সংগ্রহণ সংঘটিত বৈশিষ্ট্যগুলি, সংঘটনার সাথে সংঘাত করা, যেগুলি প্রাধান্যে গ্রহণ করে।

দ্রুত উদাহরণ

ক্লায়েন্ট সাইডে, সমান গ্রুপে টাস্ক এনকিউ করতে কিউ এবং গ্রুপ অপশনগুলি ব্যবহার করুন।

// একই গ্রুপে তিনটি টাস্ক এনকিউ করুন।
client.Enqueue(task1, asynq.Queue("notifications"), asynq.Group("user1:email"))
client.Enqueue(task2, asynq.Queue("notifications"), asynq.Group("user1:email"))
client.Enqueue(task3, asynq.Queue("notifications"), asynq.Group("user1:email"))

সার্ভার সাইডে, টাস্ক এগ্রিগেশন সক্রিয় করতে GroupAggregator প্রদান করুন। GroupGracePeriod, GroupMaxDelay, এবং GroupMaxSize কনফিগার করে ওই এগ্রিগেশন রম্বণ পরিবর্তন করতে পারেন।

// এই ফাংশনটি একাধিক টাস্ককে একটি টাস্কের মধ্যে সংগ্রহীত করার জন্য ব্যবহৃত হয়।
func aggregate(group string, tasks []*asynq.Task) *asynq.Task {
    // ... প্রদত্ত টাস্কগুলি সংগ্রহীত করার প্রয়োজনীয় লোজিক এবং এগ্রিগেটেড টাস্ক প্রদান করার জন্য।
    // ... প্রয়োজনে, একটি নতুন টাস্ক তৈরি এবং অপশন সেট করার জন্য NewTask(typename, payload, opts...) ব্যবহার করুন।
    // ... (নোট) কিউ অপশনটি উপেক্ষা করা হবে, এবং এগ্রিগেটেড টাস্কটি সবসময় একই কিউ-তে এনকিউ করা হবে যেমন গ্রুপে।
}

srv := asynq.NewServer(
           redisConnOpt,
           asynq.Config{
               GroupAggregator:  asynq.GroupAggregatorFunc(aggregate),
               GroupMaxDelay:    10 * time.Minute,
               GroupGracePeriod: 2 * time.Minute,
               GroupMaxSize:     20,
               Queues: map[string]int{"notifications": 1},
           },
       )

টিউটোরিয়াল

এই বিভাগে, আমরা এগ্রিগেশন ফিচারের ব্যবহার প্রদর্শন করার জন্য একটি সহজ প্রোগ্রাম প্রদান করি।

প্রথমে, নিম্নলিখিত কোড দিয়ে ক্লায়েন্ট প্রোগ্রাম তৈরি করুন:

// client.go
package main

import (
        "flag"
        "log"

        "github.com/hibiken/asynq"
)

var (
       flagRedisAddr = flag.String("redis-addr", "localhost:6379", "Redis server address")
       flagMessage = flag.String("message", "hello", "Message to be printed when processing the task")
)

func main() {
        flag.Parse()

        c := asynq.NewClient(asynq.RedisClientOpt{Addr: *flagRedisAddr})
        defer c.Close()

        task := asynq.NewTask("aggregation-tutorial", []byte(*flagMessage))
        info, err := c.Enqueue(task, asynq.Queue("tutorial"), asynq.Group("example-group"))
        if err != nil {
                log.Fatalf("Failed to enqueue task: %v", err)
        }
        log.Printf("Successfully enqueued task: %s", info.ID)
}

আপনি এই প্রোগ্রামটি একাধিক বার চালাতে পারেন:

$ go build -o client client.go 
$ ./client --redis-addr=
$ ./client --message=hi --redis-addr=
$ ./client --message=bye --redis-addr=

এখন, যদি আপনি কন্সোল অথবা ওয়েব ইউআই দিয়ে কিউ চেক করেন, তাহলে আপনি দেখবেন যে কিউতে টাস্কগুলি এগ্রিগেট করা হয়েছে।

পরবর্তী, নিম্নলিখিত কোড দিয়ে একটি সার্ভার প্রোগ্রাম তৈরি করুন:

// server.go
package main

import (
	"context"
	"flag"
	"log"
	"strings"
	"time"

	"github.com/hibiken/asynq"
)

var (
	flagRedisAddr         = flag.String("redis-addr", "localhost:6379", "রেডিস সার্ভারের ঠিকানা")
	flagGroupGracePeriod  = flag.Duration("grace-period", 10*time.Second, "গ্রুপের জন্য গ্রেস পিরিয়ড")
	flagGroupMaxDelay     = flag.Duration("max-delay", 30*time.Second, "গ্রুপের জন্য সর্বাধিক বিলম্ব")
	flagGroupMaxSize      = flag.Int("max-size", 20, "গ্রুপের জন্য সর্বাধিক আকার")
)

// সহজ সংযোজন ফাংশন।
// সমস্ত টাস্কের বার্তা একত্রিত করে, যেখানে প্রতিটি বার্তা একটি লাইন নিয়ে।
func aggregate(group string, tasks []*asynq.Task) *asynq.Task {
	log.Printf("%d টাস্ক সমূহ সংগ্রহ করা হয়েছে %q গ্রুপ থেকে", len(tasks), group)
	var b strings.Builder
	for _, t := range tasks {
		b.Write(t.Payload())
		b.WriteString("\n")
	}
	return asynq.NewTask("সংগৃহীত-টাস্ক", []byte(b.String()))
}

func handleAggregatedTask(ctx context.Context, task *asynq.Task) error {
	log.Print("হ্যান্ডলার গ্রুপ করা টাস্কটি পেয়েছে")
	log.Printf("সংগৃহীত বার্তা: %s", task.Payload())
	return nil
}

func main() {
	flag.Parse()

	srv := asynq.NewServer(
		asynq.RedisClientOpt{Addr: *flagRedisAddr},
		asynq.Config{
			Queues:           map[string]int{"tutorial": 1},
			GroupAggregator:  asynq.GroupAggregatorFunc(aggregate),
			GroupGracePeriod: *flagGroupGracePeriod,
			GroupMaxDelay:    *flagGroupMaxDelay,
			GroupMaxSize:     *flagGroupMaxSize,
		},
	)

	mux := asynq.NewServeMux()
	mux.HandleFunc("সংগৃহীত-টাস্ক", handleAggregatedTask)

	if err := srv.Run(mux); err != nil {
		log.Fatalf("সার্ভার চালু করতে ব্যর্থ: %v", err)
	}
}

আপনি এই প্রোগ্রাম চালাতে পারেন এবং আউটপুট দেখতে পারেন:

$ go build -o server server.go
$ ./server --redis-addr=

আপনি বার্তা দেখতে পারবেন যে সার্ভারটি কোম্পিউটারের শ্রেণি তৈরি করেছে এবং প্রসেসরটি কোম্পিউটারে সংগৃহীত কর্মসূচিগুলি প্রসেস করেছে। উপরের প্রোগ্রামে --grace-period, --max-delay, এবং --max-size ফ্লাগগুলি পরিবর্তন করে পরিবর্তন কীভাবে আসর করে তা দেখতে ভালোবাসবেন।