प्रारंभिक गाइड
इस ट्यूटोरियल में, हम दो प्रोग्राम लिखेंगे, client
और workers
.
-
client.go
बैकग्राउंड कार्यकर्ता थ्रेड द्वारा असिंक्रोनस रूप से प्रसंस्कृत कार्यों को बनाएगा और अनुसूचीत करेगा। -
workers.go
कई समभावित कार्यकर्ता थ्रेड्स को शुरू करेगा ताकि वे क्लाइंट द्वारा बनाए गए कार्यों को संभाल सकें।
यह गाइड यह मानता है कि आप localhost: 6379
पर एक रेडिस सर्वर को चला रहे हैं। शुरू होने से पहले, कृपया सुनिश्चित करें कि रेडिस इंस्टॉल किया गया है और चल रहा है।
चलो पहले हम अपने दो मुख्य फ़ाइलें बनाते हैं।
mkdir quickstart && cd quickstart
go mod init asynq-quickstart
mkdir client workers
touch client/client.go workers/workers.go
फिर, asynq
पैकेज को इंस्टॉल करें।
go get -u github.com/hibiken/asynq
कोड लिखने से पहले, चलिए कुछ मूल टाइप जाँचते हैं जो इन दो प्रोग्रामों में उपयोग किए जाएंगे।
रेडिस कनेक्शन विकल्प
Asynq रेडिस को एक संदेश ब्रोकर के रूप में उपयोग करता है। client.go
और workers.go
दोनों रेडिस से संपर्क करने के लिए स्वागत और लिखने के ऑपरेशन्स के लिए आवश्यकता है। हम लोकल रेडिस सर्वर के संदर्भ में कनेक्शन निर्दिष्ट करने के लिए RedisClientOpt
का उपयोग करेंगे।
redisConnOpt := asynq.RedisClientOpt{
Addr: "localhost:6379",
// Password can be omitted if not needed
Password: "mypassword",
// Use a dedicated database number for asynq.
// By default, Redis provides 16 databases (0 to 15).
DB: 0,
}
कार्य
asynq
में, कार्य इकाइयाँ Task
नामक एक टाइप में संगठित होती हैं, जिसमें सांकेतिक रूप से दो क्षेत्र होते हैं: Type
और Payload
।
// Type is a string value that indicates the type of the task.
func (t *Task) Type() string
// Payload is the data required for task execution.
func (t *Task) Payload() []byte
अब जब हमने मूल टाइप्स को देख लिया है, तो चलिए हमारे प्रोग्राम लिखना शुरू करते हैं।
क्लाइंट प्रोग्राम
client.go
में, हम कुछ कार्यों को बनाएंगे और asynq.Client
का उपयोग करके उन्हें अनुसूचीत करेंगे।
किसी कार्य को बनाने के लिए, आप NewTask
फ़ंक्शन का उपयोग कर सकते हैं और कार्य के प्रकार और पेरामीटर को पास कर सकते हैं।
Enqueue
मेथड कोई कार्य और किसी भी संभावित विकल्प को लेता है। किसी भविष्य के प्रसंस्करण के लिए टास्क को अनुसूचित करने के लिए ProcessIn
या ProcessAt
विकल्प का उपयोग करें।
// ईमेल कार्यों संबंधित पेरामीटर।
type EmailTaskPayload struct {
// ईमेल प्राप्तकर्ता का आईडी।
UserID int
}
// client.go
func main() {
client := asynq.NewClient(asynq.RedisClientOpt{Addr: "localhost:6379"})
// प्रकार नाम और पेयलोड के साथ एक कार्य बनाएँ।
payload, err := json.Marshal(EmailTaskPayload{UserID: 42})
if err != nil {
log.Fatal(err)
}
t1 := asynq.NewTask("email:welcome", payload)
t2 := asynq.NewTask("email:reminder", payload)
// कार्यों को तुरंत प्रसंस्कृत करें।
info, err := client.Enqueue(t1)
if err != nil {
log.Fatal(err)
}
log.Printf(" [*] सफलतापूर्वक कार्य को संभाला गया: %+v", info)
// 24 घंटे के बाद कार्यों को संभाला जाएगा।
info, err = client.Enqueue(t2, asynq.ProcessIn(24*time.Hour))
if err != nil {
log.Fatal(err)
}
log.Printf(" [*] सफलतापूर्वक कार्य को संभाला गया: %+v", info)
}
यह हमारे क्लाइंट प्रोग्राम के लिए हर चीज़ की ज़रूरत है।
कर्मचारी कार्यक्रम
workers.go
में, हमें कर्मचारियों को शुरू करने के लिए एक asynq.Server
उदाहरण बनाना होगा।
NewServer
फ़ंक्शन पैरामीटर के रूप में RedisConnOpt
और Config
लेता है।
Config
का उपयोग सर्वर के कार्य प्रसंस्करण व्यवहार को समायोजित करने के लिए होता है।
आप Config
दस्तावेज़ को देख सकते हैं ताकि आपको सभी उपलब्ध विन्यास विकल्पों के बारे में पता लग सके।
सरलता के लिए, इस उदाहरण में, हम केवल समरसता को निर्दिष्ट करते हैं।
// workers.go
func main() {
srv := asynq.NewServe(
asynq.RedisClientOpt{Addr: "localhost:6379"},
asynq.Config{Concurrency: 10},
)
// नोट: निम्नलिखित खंड में, हम बताएंगे कि `handler` क्या है।
if err := srv.Run(handler); err != nil {
log.Fatal(err)
}
}
(*Server).Run
विधि का पैरामीटर एक इंटरफेस asynq.Handler
है, जिसमें एक ProcessTask
विधि होती है।
प्रकार है {
// यदि कार्य को सफलतापूर्वक प्रसंस्कृत किया गया है, तो ProcessTask को निल लौटाना चाहिए।
// यदि ProcessTask एक गैर-निल त्रुटि लौटाती है या अप्रत्याशित होता है, तो कार्य को बाद में पुनरावृत्ति की जाएगी।
ProcessTask(context.Context, *Task) error
}
हैंडलर लागू करने का सबसे सरल तरीका एक विधि को समान हस्ताक्षर वाले एक फ़ंक्शन की परिभाषा करना है और जब इसे Run
को में भेजा जाए, तो asynq.HandlerFunc
एडाप्टर प्रकार का उपयोग करना है।
func handler(ctx context.Context, t *asynq.Task) error {
switch t.Type() {
case "email:welcome":
var p EmailTaskPayload
if err := json.Unmarshal(t.Payload(), &p); err != nil {
return err
}
log.Printf(" [*] उपयोक्ता %d को स्वागत ईमेल भेजना", p.UserID)
case "email:reminder":
var p EmailTaskPayload
if err := json.Unmarshal(t.Payload(), &p); err != nil {
return err
}
log.Printf(" [*] उपयोक्ता %d को अनुस्मारक ईमेल भेजना", p.UserID)
default:
return fmt.Errorf("अप्रत्याशित कार्य प्रकार: %s", t.Type())
}
return nil
}
func main() {
srv := asynq.NewServe(
asynq.RedisClientOpt{Addr: "localhost:6379"},
asynq.Config{Concurrency: 10},
)
// कार्य प्रक्रिया के अधीन स्थाननिर्धारित करने के लिए asynq.HandlerFunc एडाप्टर का उपयोग करें
if err := srv.Run(asynq.HandlerFunc(handler)); err != nil {
log.Fatal(err)
}
}
हम इस हैंडलर फ़ंक्शन के लिए स्विच केसों को जारी रख सकते हैं, लेकिन एक वास्तविक एप्लिकेशन में, प्रत्येक मामले के लिए लॉजिक को अलग से विधि में परिभाषित करना अधिक सुविधाजनक होगा।
हमारे कोड को फिरसे संरचित करने के लिए, हमें ServeMux
का उपयोग करना होगा हमारे हैंडलर को बनाने के लिए। जैसा कि "net/http"
पैकेज से ServeMux
में किया जाता है, आप Handle
या HandleFunc
को बुलाकर एक हैंडलर रजिस्टर कर सकते हैं। ServeMux
Handler
इंटरफेस को पूरा करता है, तो इसे (*Server).Run
को के लिए पास किया जा सकता है।
// workers.go
func main() {
srv := asynq.NewServer(
asynq.RedisClientOpt{Addr: "localhost:6379"},
asynq.Config{Concurrency: 10},
)
mux := asynq.NewServeMux()
mux.HandleFunc("email:welcome", sendWelcomeEmail)
mux.HandleFunc("email:reminder", sendReminderEmail)
if err := srv.Run(mux); err != nil {
log.Fatal(err)
}
}
func sendWelcomeEmail(ctx context.Context, t *asynq.Task) error {
var p EmailTaskPayload
if err := json.Unmarshal(t.Payload(), &p); err != nil {
return err
}
log.Printf(" [*] उपयोक्ता %d को स्वागत ईमेल भेजना", p.UserID)
return nil
}
func sendReminderEmail(ctx context.Context, t *asynq.Task) error {
var p EmailTaskPayload
if err := json.Unmarshal(t.Payload(), &p); err != nil {
return err
}
log.Printf(" [*] उपयोक्ता %d को अनुस्मारक ईमेल भेजना", p.UserID)
return nil
}
अब जब हमने प्रत्येक प्रकार के कार्यों के लिए हैंडलिंग फ़ंक्शन को निकाल लिया है, तो कोड और अधिक संगठित दिखता है। तथापि, कोड अभी भी थोड़ा ज्यादा निहित है। हमारे पास कार्य प्रकारों और परिष्कृत प्रकारों के लिए इन स्ट्रिंग मान हैं, और हमें उन्हें एक संगठित पैकेज में समावेषित करना चाहिए। हमारे कोड को संशोधित करने और कार्य निर्माण और हैंडलिंग को एक पैकेज में आवरित करने के लिए, आओ हमारे कोड को संशोधित करें और एक task
पैकेज लिखें।
mkdir task && touch task/task.go
package task
import (
"context"
"encoding/json"
"log"
"github.com/hibiken/asynq"
)
// संदेशों के लिए कार्य के सूची।
const (
TypeWelcomeEmail = "email:welcome"
TypeReminderEmail = "email:reminder"
)
// ईमेल संबंधित किसी भी कार्य के लिए आयात।
type EmailTaskPayload struct {
// ईमेल प्राप्तकर्ता की ID।
UserID int
}
func NewWelcomeEmailTask(id int) (*asynq.Task, error) {
payload, err := json.Marshal(EmailTaskPayload{UserID: id})
if err != nil {
return nil, err
}
return asynq.NewTask(TypeWelcomeEmail, payload), nil
}
func NewReminderEmailTask(id int) (*asynq.Task, error) {
payload, err := json.Marshal(EmailTaskPayload{UserID: id})
if err != nil {
return nil, err
}
return asynq.NewTask(TypeReminderEmail, payload), nil
}
func HandleWelcomeEmailTask(ctx context.Context, t *asynq.Task) error {
var p EmailTaskPayload
if err := json.Unmarshal(t.Payload(), &p); err != nil {
return err
}
log.Printf(" [*] उपयोगकर्ता %d को स्वागत ईमेल भेज रहा हूँ", p.UserID)
return nil
}
func HandleReminderEmailTask(ctx context.Context, t *asynq.Task) error {
var p EmailTaskPayload
if err := json.Unmarshal(t.Payload(), &p); err != nil {
return err
}
log.Printf(" [*] उपयोगकर्ता %d को याद दिलाने वाला ईमेल भेज रहा हूँ", p.UserID)
return nil
}
अब हम client.go
और workers.go
में इस पैकेज को आयात कर सकते हैं।
```go
// client.go
func main() {
client := asynq.NewClient(asynq.RedisClientOpt{Addr: "localhost:6379"})
t1, err := task.NewWelcomeEmailTask(42)
if err != nil {
log.Fatal(err)
}
t2, err := task.NewReminderEmailTask(42)
if err != nil {
log.Fatal(err)
}
// कार्य को तुरंत enqueue करें।
info, err := client.Enqueue(t1)
if err != nil {
log.Fatal(err)
}
log.Printf(" [*] कार्य सफलतापूर्वक enqueue किया गया: %+v", info)
// कार्य को 24 घंटे के बाद प्रसंस्करण के लिए enqueue करें।
info, err = client.Enqueue(t2, asynq.ProcessIn(24*time.Hour))
if err != nil {
log.Fatal(err)
}
log.Printf(" [*] कार्य सफलतापूर्वक enqueue किया गया: %+v", info)
}
// workers.go
func main() {
srv := asynq.NewServer(
asynq.RedisClientOpt{Addr: "localhost:6379"},
asynq.Config{Concurrency: 10},
)
mux := asynq.NewServeMux()
mux.HandleFunc(task.TypeWelcomeEmail, task.HandleWelcomeEmailTask)
mux.HandleFunc(task.TypeReminderEmail, task.HandleReminderEmailTask)
if err := srv.Run(mux); err != nil {
log.Fatal(err)
}
}
कोड अब बेहतर दिखता है!
अब जैसे ही हमारे पास client
और workers
तैयार हैं, हम इन दोनों कार्यों को चला सकते हैं। आइए पहले client
कार्यक्रम को चलाकर कार्य सृष्टि और अनुसूचित करें।
go run client/client.go
इससे दो कार्यों का निर्माण होगा: तुरंत प्रसंस्करण के लिए एक और 24 घंटे के बाद प्रसंस्करण के लिए एक।
हम asynq
कमांड-लाइन इंटरफेस का उपयोग करके कार्यों की जांच कर सकते हैं।
asynq dash
आपको Enqueued स्थिति में एक कार्य और Scheduled स्थिति में एक और कार्य दिखाई देना चाहिए।
ध्यान दें: प्रत्येक स्थिति का अर्थ समझने के लिए, कृपया कार्य जीवनचक्र पर जाएं।
अंत में, हम workers
कार्यक्रम को आरंभ करने के लिए चलाते हैं।
go run workers/workers.go
नोट: यह कार्यक्रम यह नहीं बंद होगा जब तक आप इसे समाप्त करने के लिए संकेत न भेजें। पृष्ठ-२ पर छोटाकार सुरक्षित तरीके से पृष्ठ के पीछे कार्यकर्ता को समाप्त करने के लिए सर्वश्रेष्ठ कार्यक्रियाओं के लिए, प्रतीक Wiki पृष्ठ का उपयोग करें।
आपको टर्मिनल में कुछ पाठिक का उत्पादन देखने को मिलना चाहिए, जो कार्यों की सफल प्रसंस्करण का सफलतापूर्वक सूचित कर रहा है।
आप client
कार्यक्रम को फिर से चला सकते हैं और देख सकते हैं कि कार्यकर्ता कैसे उन्हें स्वीकार करते हैं और प्रसंस्करण करते हैं।
एक कार्य को प्रीसंगनग्रीत होकर सफलतापूर्वक प्रसंस्करण नहीं करना असाधारण नहीं है। डिफॉल्ट रूप से, असफल कार्यों को तापमान बैकऑफ के साथ 25 बार प्रायास किया जाएगा। एक असफल स्थिति की अनुकरण करने के लिए हमारे हैंडलर को अपडेट करने के लिए एक त्रुटि लौटाने के लिए अपडेट करें।
// tasks.go
```go
func HandleWelcomeEmailTask(ctx context.Context, t *asynq.Task) error {
var p emailTaskPayload
if err := json.Unmarshal(t.Payload(), &p); err != nil {
return err
}
log.Printf(" [*] कोशिश कर रहे हैं कि उपयोगकर्ता %d को वेलकम ईमेल भेजें...", p.UserID)
return fmt.Errorf("उपयोगकर्ता को ईमेल भेजने में विफल")
}
हमारे कार्यकर्ता प्रोग्राम को पुनः आरंभ करें और एक टास्क को enqueue करें।
go run workers/workers.go
go run client/client.go
यदि आप asynq dash
चला रहे हैं, तो आपको Retry स्थिति में एक टास्क दिखाई देना चाहिए (कतार विवरण दृश्य पर जाकर "पुनः प्रयास" टैब को हाइलाइट करके)।
पुनः प्रयास स्थिति में कौन-कौन से टास्क हैं, यह जांचने के लिए, आप निम्नलिखित भी चला सकते हैं:
asynq task ls --queue=default --state=retry
इससे भविष्य में पुनः प्रयास के लिए सभी टास्क की सूची आएगी। उम्मीदित समय के साथ-साथ निर्दिष्ट टास्क के अगले क्रियान्वयन का आउटपुट शामिल होगा।
जैसे ही एक टास्क ने अपने पुनः प्रयासों को पूर्ण कर लिया होगा, वह Archived स्थिति में चला जाएगा और उसे फिर से पुनः प्रयास नहीं किया जाएगा (आप CLI या WebUI उपकरणों का उपयोग करके अभी भी संग्रहीत टास्क को मैन्युअल रूप से चला सकते हैं)।
इस ट्यूटोरियल को समाप्त करने से पहले, हमारे हैंडलर को ठीक कर लें।
func HandleWelcomeEmailTask(ctx context.Context, t *asynq.Task) error {
var p emailTaskPayload
if err := json.Unmarshal(t.Payload(), &p); err != nil {
return err
}
log.Printf(" [*] उपयोगकर्ता %d को वेलकम ईमेल भेजना", p.UserID)
return nil
}