यह पेज एसिंक की टास्क एग्रीगेशन फीचर को पेश करता है।
अवलोकन
टास्क एग्रीगेशन आपको एक नहीं, बल्कि एक के बाद अन्य टास्कों को "हैंडलर" को पास करने के बजाय अनुक्रमणिका में डालने की अनुमति देता है। यह सुविधा आपको एक से अधिक क्रमिक कार्रवाइयों को एक में संकलित करने द्वारा लागत बचाने, कैशिंग को अनुकूलित करने, या बैच सूचना बनाने की संभावना प्रदान करता है।
कार्यक्रम
टास्क एग्रीगेशन फीचर का उपयोग करने के लिए, आपको एक ही समूह नाम के साथ टास्क कतार में डालने की आवश्यकता होती है। एक ही (कतार, समूह)
जोड़ी का उपयोग करके कतार में टास्क कतार के द्वारा एक टास्क में संकलित कर दी जाएगी, और संकलित किए गए टास्क को हैंडलर को पारित कर दिया जाएगा।
एक संकलित टास्क बनाते समय, एसिंक सर्वर और टास्क के लिए अधिक टास्क की प्रतीक्षा करेगा, जब तक योग्य ग्रेस अवधि समाप्त नहीं होती। हर बार जब एक नया टास्क एक ही (कतार, समूह)
के साथ कतार में डाला जाता है, तो ग्रेस अवधि अपड
त्वरित उदाहरण
ग्राहक पक्ष पर, समान समूह में कार्यों को कतार में डालने के लिए Queue
और Group
विकल्प का उपयोग करें।
// समान समूह में तीन कार्यों को कतार में डालें।
client.Enqueue(task1, asynq.Queue("notifications"), asynq.Group("user1:email"))
client.Enqueue(task2, asynq.Queue("notifications"), asynq.Group("user1:email"))
client.Enqueue(task3, asynq.Queue("notifications"), asynq.Group("user1:email"))
सर्वर पक्ष पर, कार्य संकलन को सक्रिय करने के लिए GroupAggregator
प्रदान करें। आप GroupGracePeriod
, GroupMaxDelay
, और GroupMaxSize
को कॉन्फ़िगर करके संकलन रणनीति को अनुकूलित कर सकते हैं।
// यह फ़ंक्शन एकाधिक कार्यों को एक कार्य में सम्मिलित करने के लिए उपयोग किया जाता है।
func aggregate(group string, tasks []*asynq.Task) *asynq.Task {
// ... दिए गए कार्यों को सम्मिलित करने और सम्मिलित कार्य लौटाने के आपके तर्क को।
// ... आवश्यक हो तो, NewTask(typename, payload, opts...) का उपयोग करके नया कार्य बनाएँ और विकल्प सेट करें।
// ... (ध्यान दें) कतार विकल्प को अनदेखा कर दिया जाएगा, और सम्मिलित कार्य हमेशा समान कतार में ही कतार में डाला जाएगा।
}
srv := asynq.NewServer(
redisConnOpt,
asynq.Config{
GroupAggregator: asynq.GroupAggregatorFunc(aggregate),
GroupMaxDelay: 10 * time.Minute,
GroupGracePeriod: 2 * time.Minute,
GroupMaxSize: 20,
Queues: map[string]int{"notifications": 1},
},
)
ट्यूटोरियल
इस खंड में, हम संकलन सुविधा का उपयोग दिखाने के लिए एक सरल कार्यक्रम प्रदान करते हैं।
सबसे पहले, निम्नलिखित कोड के साथ एक ग्राहक प्रोग्राम बनाएँ:
// client.go
package main
import (
"flag"
"log"
"github.com/hibiken/asynq"
)
var (
flagRedisAddr = flag.String("redis-addr", "localhost:6379", "रेडिस सर्वर पता")
flagMessage = flag.String("message", "नमस्ते", "कार्य को प्रसंस्करण करते समय छपाया जाने वाला संदेश")
)
func main() {
flag.Parse()
c := asynq.NewClient(asynq.RedisClientOpt{Addr: *flagRedisAddr})
defer c.Close()
task := asynq.NewTask("aggregation-tutorial", []byte(*flagMessage))
info, err := c.Enqueue(task, asynq.Queue("tutorial"), asynq.Group("example-group"))
if err != nil {
log.Fatalf("कार्य को कतार में डालने में विफल: %v", err)
}
log.Printf("सफलतापूर्वक कार्य कतार में डाला गया: %s", info.ID)
}
आप इस कार्यक्रम को कई बार चला सकते हैं:
$ go build -o client client.go
$ ./client --redis-addr=
$ ./client --message=hi --redis-addr=
$ ./client --message=bye --redis-addr=
अब, यदि आप CLI या वेब UI के माध्यम से कतार की जांच करें, तो आप कतार में संकलित कार्यों को देखेंगे।
अगला, निम्नलिखित कोड के साथ एक सर्वर प्रोग्राम बनाएँ:
// server.go
package main
import (
"context"
"flag"
"log"
"strings"
"time"
"github.com/hibiken/asynq"
)
var (
flagRedisAddr = flag.String("redis-addr", "localhost:6379", "रेडिस सर्वर का पता")
flagGroupGracePeriod = flag.Duration("grace-period", 10*time.Second, "समूह के लिए क्षमा अवधि")
flagGroupMaxDelay = flag.Duration("max-delay", 30*time.Second, "समूह के लिए अधिकतम देरी")
flagGroupMaxSize = flag.Int("max-size", 20, "समूह के लिए अधिकतम आकार")
)
// सरल संचयन फ़ंक्शन।
// हर टास्क का संदेश को एक में मिलाता है, हर संदेश को एक पंक्ति लेता है।
func aggregate(group string, tasks []*asynq.Task) *asynq.Task {
log.Printf("%q समूह से %d टास्क का संचयन किया गया", len(tasks), group)
var b strings.Builder
for _, t := range tasks {
b.Write(t.Payload())
b.WriteString("\n")
}
return asynq.NewTask("संचयित-टास्क", []byte(b.String()))
}
func handleAggregatedTask(ctx context.Context, task *asynq.Task) error {
log.Print("हैंडलर ने संचित टास्क प्राप्त किया")
log.Printf("संचित संदेश: %s", task.Payload())
return nil
}
func main() {
flag.Parse()
srv := asynq.NewServer(
asynq.RedisClientOpt{Addr: *flagRedisAddr},
asynq.Config{
Queues: map[string]int{"tutorial": 1},
GroupAggregator: asynq.GroupAggregatorFunc(aggregate),
GroupGracePeriod: *flagGroupGracePeriod,
GroupMaxDelay: *flagGroupMaxDelay,
GroupMaxSize: *flagGroupMaxSize,
},
)
mux := asynq.NewServeMux()
mux.HandleFunc("संचित-टास्क", handleAggregatedTask)
if err := srv.Run(mux); err != nil {
log.Fatalf("सर्वर शुरू करने में विफल: %v", err)
}
}
आप इस कार्यक्रम को चला सकते हैं और आउटपुट को देख सकते हैं:
$ go build -o server server.go
$ ./server --redis-addr=
आपको आउटपुट में देखना चाहिए कि सर्वर ने समूह में टास्क को संचयित किया है और प्रोसेसर ने संचयित टास्क को प्रसंस्कृत किया है। "grace-period", "max-delay", और "max-size" फ़्लैग्स को बदलने की कोशिश करें और देखें कि वे संचयन रणनीति पर कैसे प्रभाव डालते हैं।