यह पेज एसिंक की टास्क एग्रीगेशन फीचर को पेश करता है।

अवलोकन

टास्क एग्रीगेशन आपको एक नहीं, बल्कि एक के बाद अन्य टास्कों को "हैंडलर" को पास करने के बजाय अनुक्रमणिका में डालने की अनुमति देता है। यह सुविधा आपको एक से अधिक क्रमिक कार्रवाइयों को एक में संकलित करने द्वारा लागत बचाने, कैशिंग को अनुकूलित करने, या बैच सूचना बनाने की संभावना प्रदान करता है।

कार्यक्रम

टास्क एग्रीगेशन फीचर का उपयोग करने के लिए, आपको एक ही समूह नाम के साथ टास्क कतार में डालने की आवश्यकता होती है। एक ही (कतार, समूह) जोड़ी का उपयोग करके कतार में टास्क कतार के द्वारा एक टास्क में संकलित कर दी जाएगी, और संकलित किए गए टास्क को हैंडलर को पारित कर दिया जाएगा।

एक संकलित टास्क बनाते समय, एसिंक सर्वर और टास्क के लिए अधिक टास्क की प्रतीक्षा करेगा, जब तक योग्य ग्रेस अवधि समाप्त नहीं होती। हर बार जब एक नया टास्क एक ही (कतार, समूह) के साथ कतार में डाला जाता है, तो ग्रेस अवधि अपड

त्वरित उदाहरण

ग्राहक पक्ष पर, समान समूह में कार्यों को कतार में डालने के लिए Queue और Group विकल्प का उपयोग करें।

// समान समूह में तीन कार्यों को कतार में डालें।
client.Enqueue(task1, asynq.Queue("notifications"), asynq.Group("user1:email"))
client.Enqueue(task2, asynq.Queue("notifications"), asynq.Group("user1:email"))
client.Enqueue(task3, asynq.Queue("notifications"), asynq.Group("user1:email"))

सर्वर पक्ष पर, कार्य संकलन को सक्रिय करने के लिए GroupAggregator प्रदान करें। आप GroupGracePeriod, GroupMaxDelay, और GroupMaxSize को कॉन्फ़िगर करके संकलन रणनीति को अनुकूलित कर सकते हैं।

// यह फ़ंक्शन एकाधिक कार्यों को एक कार्य में सम्मिलित करने के लिए उपयोग किया जाता है।
func aggregate(group string, tasks []*asynq.Task) *asynq.Task {
    // ... दिए गए कार्यों को सम्मिलित करने और सम्मिलित कार्य लौटाने के आपके तर्क को।
    // ... आवश्यक हो तो, NewTask(typename, payload, opts...) का उपयोग करके नया कार्य बनाएँ और विकल्प सेट करें।
    // ... (ध्यान दें) कतार विकल्प को अनदेखा कर दिया जाएगा, और सम्मिलित कार्य हमेशा समान कतार में ही कतार में डाला जाएगा।
}

srv := asynq.NewServer(
           redisConnOpt,
           asynq.Config{
               GroupAggregator:  asynq.GroupAggregatorFunc(aggregate),
               GroupMaxDelay:    10 * time.Minute,
               GroupGracePeriod: 2 * time.Minute,
               GroupMaxSize:     20,
               Queues: map[string]int{"notifications": 1},
           },
       )

ट्यूटोरियल

इस खंड में, हम संकलन सुविधा का उपयोग दिखाने के लिए एक सरल कार्यक्रम प्रदान करते हैं।

सबसे पहले, निम्नलिखित कोड के साथ एक ग्राहक प्रोग्राम बनाएँ:

// client.go
package main

import (
        "flag"
        "log"

        "github.com/hibiken/asynq"
)

var (
       flagRedisAddr = flag.String("redis-addr", "localhost:6379", "रेडिस सर्वर पता")
       flagMessage = flag.String("message", "नमस्ते", "कार्य को प्रसंस्करण करते समय छपाया जाने वाला संदेश")
)

func main() {
        flag.Parse()

        c := asynq.NewClient(asynq.RedisClientOpt{Addr: *flagRedisAddr})
        defer c.Close()

        task := asynq.NewTask("aggregation-tutorial", []byte(*flagMessage))
        info, err := c.Enqueue(task, asynq.Queue("tutorial"), asynq.Group("example-group"))
        if err != nil {
                log.Fatalf("कार्य को कतार में डालने में विफल: %v", err)
        }
        log.Printf("सफलतापूर्वक कार्य कतार में डाला गया: %s", info.ID)
}

आप इस कार्यक्रम को कई बार चला सकते हैं:

$ go build -o client client.go 
$ ./client --redis-addr=
$ ./client --message=hi --redis-addr=
$ ./client --message=bye --redis-addr=

अब, यदि आप CLI या वेब UI के माध्यम से कतार की जांच करें, तो आप कतार में संकलित कार्यों को देखेंगे।

अगला, निम्नलिखित कोड के साथ एक सर्वर प्रोग्राम बनाएँ:

// server.go
package main

import (
	"context"
	"flag"
	"log"
	"strings"
	"time"

	"github.com/hibiken/asynq"
)

var (
	flagRedisAddr         = flag.String("redis-addr", "localhost:6379", "रेडिस सर्वर का पता")
	flagGroupGracePeriod  = flag.Duration("grace-period", 10*time.Second, "समूह के लिए क्षमा अवधि")
	flagGroupMaxDelay     = flag.Duration("max-delay", 30*time.Second, "समूह के लिए अधिकतम देरी")
	flagGroupMaxSize      = flag.Int("max-size", 20, "समूह के लिए अधिकतम आकार")
)

// सरल संचयन फ़ंक्शन।
// हर टास्क का संदेश को एक में मिलाता है, हर संदेश को एक पंक्ति लेता है।
func aggregate(group string, tasks []*asynq.Task) *asynq.Task {
	log.Printf("%q समूह से %d टास्क का संचयन किया गया", len(tasks), group)
	var b strings.Builder
	for _, t := range tasks {
		b.Write(t.Payload())
		b.WriteString("\n")
	}
	return asynq.NewTask("संचयित-टास्क", []byte(b.String()))
}

func handleAggregatedTask(ctx context.Context, task *asynq.Task) error {
	log.Print("हैंडलर ने संचित टास्क प्राप्त किया")
	log.Printf("संचित संदेश: %s", task.Payload())
	return nil
}

func main() {
	flag.Parse()

	srv := asynq.NewServer(
		asynq.RedisClientOpt{Addr: *flagRedisAddr},
		asynq.Config{
			Queues:           map[string]int{"tutorial": 1},
			GroupAggregator:  asynq.GroupAggregatorFunc(aggregate),
			GroupGracePeriod: *flagGroupGracePeriod,
			GroupMaxDelay:    *flagGroupMaxDelay,
			GroupMaxSize:     *flagGroupMaxSize,
		},
	)

	mux := asynq.NewServeMux()
	mux.HandleFunc("संचित-टास्क", handleAggregatedTask)

	if err := srv.Run(mux); err != nil {
		log.Fatalf("सर्वर शुरू करने में विफल: %v", err)
	}
}

आप इस कार्यक्रम को चला सकते हैं और आउटपुट को देख सकते हैं:

$ go build -o server server.go
$ ./server --redis-addr=

आपको आउटपुट में देखना चाहिए कि सर्वर ने समूह में टास्क को संचयित किया है और प्रोसेसर ने संचयित टास्क को प्रसंस्कृत किया है। "grace-period", "max-delay", और "max-size" फ़्लैग्स को बदलने की कोशिश करें और देखें कि वे संचयन रणनीति पर कैसे प्रभाव डालते हैं।