प्रारंभिक गाइड

इस ट्यूटोरियल में, हम दो प्रोग्राम लिखेंगे, client और workers.

  • client.go बैकग्राउंड कार्यकर्ता थ्रेड द्वारा असिंक्रोनस रूप से प्रसंस्कृत कार्यों को बनाएगा और अनुसूचीत करेगा।
  • workers.go कई समभावित कार्यकर्ता थ्रेड्स को शुरू करेगा ताकि वे क्लाइंट द्वारा बनाए गए कार्यों को संभाल सकें।

यह गाइड यह मानता है कि आप localhost: 6379 पर एक रेडिस सर्वर को चला रहे हैं। शुरू होने से पहले, कृपया सुनिश्चित करें कि रेडिस इंस्टॉल किया गया है और चल रहा है।

चलो पहले हम अपने दो मुख्य फ़ाइलें बनाते हैं।

mkdir quickstart && cd quickstart
go mod init asynq-quickstart
mkdir client workers
touch client/client.go workers/workers.go

फिर, asynq पैकेज को इंस्टॉल करें।

go get -u github.com/hibiken/asynq

कोड लिखने से पहले, चलिए कुछ मूल टाइप जाँचते हैं जो इन दो प्रोग्रामों में उपयोग किए जाएंगे।

रेडिस कनेक्शन विकल्प

Asynq रेडिस को एक संदेश ब्रोकर के रूप में उपयोग करता है। client.go और workers.go दोनों रेडिस से संपर्क करने के लिए स्वागत और लिखने के ऑपरेशन्स के लिए आवश्यकता है। हम लोकल रेडिस सर्वर के संदर्भ में कनेक्शन निर्दिष्ट करने के लिए RedisClientOpt का उपयोग करेंगे।

redisConnOpt := asynq.RedisClientOpt{
    Addr: "localhost:6379",
    // Password can be omitted if not needed
    Password: "mypassword",
    // Use a dedicated database number for asynq.
    // By default, Redis provides 16 databases (0 to 15).
    DB: 0,
}

कार्य

asynq में, कार्य इकाइयाँ Task नामक एक टाइप में संगठित होती हैं, जिसमें सांकेतिक रूप से दो क्षेत्र होते हैं: Type और Payload

// Type is a string value that indicates the type of the task.
func (t *Task) Type() string

// Payload is the data required for task execution.
func (t *Task) Payload() []byte

अब जब हमने मूल टाइप्स को देख लिया है, तो चलिए हमारे प्रोग्राम लिखना शुरू करते हैं।

क्लाइंट प्रोग्राम

client.go में, हम कुछ कार्यों को बनाएंगे और asynq.Client का उपयोग करके उन्हें अनुसूचीत करेंगे।

किसी कार्य को बनाने के लिए, आप NewTask फ़ंक्शन का उपयोग कर सकते हैं और कार्य के प्रकार और पेरामीटर को पास कर सकते हैं।

Enqueue मेथड कोई कार्य और किसी भी संभावित विकल्प को लेता है। किसी भविष्य के प्रसंस्करण के लिए टास्क को अनुसूचित करने के लिए ProcessIn या ProcessAt विकल्प का उपयोग करें।

// ईमेल कार्यों संबंधित पेरामीटर।
type EmailTaskPayload struct {
    // ईमेल प्राप्तकर्ता का आईडी।
    UserID int
}

// client.go
func main() {
    client := asynq.NewClient(asynq.RedisClientOpt{Addr: "localhost:6379"})

    // प्रकार नाम और पेयलोड के साथ एक कार्य बनाएँ।
    payload, err := json.Marshal(EmailTaskPayload{UserID: 42})
    if err != nil {
        log.Fatal(err)
    }
    t1 := asynq.NewTask("email:welcome", payload)

    t2 := asynq.NewTask("email:reminder", payload)

    // कार्यों को तुरंत प्रसंस्कृत करें।
    info, err := client.Enqueue(t1)
    if err != nil {
        log.Fatal(err)
    }
    log.Printf(" [*] सफलतापूर्वक कार्य को संभाला गया: %+v", info)

    // 24 घंटे के बाद कार्यों को संभाला जाएगा।
    info, err = client.Enqueue(t2, asynq.ProcessIn(24*time.Hour))
    if err != nil {
        log.Fatal(err)
    }
    log.Printf(" [*] सफलतापूर्वक कार्य को संभाला गया: %+v", info)
}

यह हमारे क्लाइंट प्रोग्राम के लिए हर चीज़ की ज़रूरत है।

कर्मचारी कार्यक्रम

workers.go में, हमें कर्मचारियों को शुरू करने के लिए एक asynq.Server उदाहरण बनाना होगा।

NewServer फ़ंक्शन पैरामीटर के रूप में RedisConnOpt और Config लेता है।

Config का उपयोग सर्वर के कार्य प्रसंस्करण व्यवहार को समायोजित करने के लिए होता है। आप Config दस्तावेज़ को देख सकते हैं ताकि आपको सभी उपलब्ध विन्यास विकल्पों के बारे में पता लग सके।

सरलता के लिए, इस उदाहरण में, हम केवल समरसता को निर्दिष्ट करते हैं।

// workers.go
func main() {
    srv := asynq.NewServe(
        asynq.RedisClientOpt{Addr: "localhost:6379"},
        asynq.Config{Concurrency: 10},
    )

    // नोट: निम्नलिखित खंड में, हम बताएंगे कि `handler` क्या है।
    if err := srv.Run(handler); err != nil {
        log.Fatal(err)
    }
}

(*Server).Run विधि का पैरामीटर एक इंटरफेस asynq.Handler है, जिसमें एक ProcessTask विधि होती है।

प्रकार है {
    // यदि कार्य को सफलतापूर्वक प्रसंस्कृत किया गया है, तो ProcessTask को निल लौटाना चाहिए।
    // यदि ProcessTask एक गैर-निल त्रुटि लौटाती है या अप्रत्याशित होता है, तो कार्य को बाद में पुनरावृत्ति की जाएगी।
    ProcessTask(context.Context, *Task) error
}

हैंडलर लागू करने का सबसे सरल तरीका एक विधि को समान हस्ताक्षर वाले एक फ़ंक्शन की परिभाषा करना है और जब इसे Run को में भेजा जाए, तो asynq.HandlerFunc एडाप्टर प्रकार का उपयोग करना है।

func handler(ctx context.Context, t *asynq.Task) error {
    switch t.Type() {
    case "email:welcome":
        var p EmailTaskPayload
        if err := json.Unmarshal(t.Payload(), &p); err != nil {
            return err
        }
        log.Printf(" [*] उपयोक्ता %d को स्वागत ईमेल भेजना", p.UserID)

    case "email:reminder":
        var p EmailTaskPayload
        if err := json.Unmarshal(t.Payload(), &p); err != nil {
            return err
        }
        log.Printf(" [*] उपयोक्ता %d को अनुस्मारक ईमेल भेजना", p.UserID)

    default:
        return fmt.Errorf("अप्रत्याशित कार्य प्रकार: %s", t.Type())
    }
    return nil
}

func main() {
    srv := asynq.NewServe(
        asynq.RedisClientOpt{Addr: "localhost:6379"},
        asynq.Config{Concurrency: 10},
    )

    // कार्य प्रक्रिया के अधीन स्थाननिर्धारित करने के लिए asynq.HandlerFunc एडाप्टर का उपयोग करें
    if err := srv.Run(asynq.HandlerFunc(handler)); err != nil {
        log.Fatal(err)
    }
}

हम इस हैंडलर फ़ंक्शन के लिए स्विच केसों को जारी रख सकते हैं, लेकिन एक वास्तविक एप्लिकेशन में, प्रत्येक मामले के लिए लॉजिक को अलग से विधि में परिभाषित करना अधिक सुविधाजनक होगा।

हमारे कोड को फिरसे संरचित करने के लिए, हमें ServeMux का उपयोग करना होगा हमारे हैंडलर को बनाने के लिए। जैसा कि "net/http" पैकेज से ServeMux में किया जाता है, आप Handle या HandleFunc को बुलाकर एक हैंडलर रजिस्टर कर सकते हैं। ServeMux Handler इंटरफेस को पूरा करता है, तो इसे (*Server).Run को के लिए पास किया जा सकता है।

// workers.go
func main() {
    srv := asynq.NewServer(
        asynq.RedisClientOpt{Addr: "localhost:6379"},
        asynq.Config{Concurrency: 10},
    )

    mux := asynq.NewServeMux()
    mux.HandleFunc("email:welcome", sendWelcomeEmail)
    mux.HandleFunc("email:reminder", sendReminderEmail)

    if err := srv.Run(mux); err != nil {
        log.Fatal(err)
    }
}

func sendWelcomeEmail(ctx context.Context, t *asynq.Task) error {
    var p EmailTaskPayload
    if err := json.Unmarshal(t.Payload(), &p); err != nil {
        return err
    }
    log.Printf(" [*] उपयोक्ता %d को स्वागत ईमेल भेजना", p.UserID)
    return nil
}

func sendReminderEmail(ctx context.Context, t *asynq.Task) error {
    var p EmailTaskPayload
    if err := json.Unmarshal(t.Payload(), &p); err != nil {
        return err
    }
    log.Printf(" [*] उपयोक्ता %d को अनुस्मारक ईमेल भेजना", p.UserID)
    return nil
}

अब जब हमने प्रत्येक प्रकार के कार्यों के लिए हैंडलिंग फ़ंक्शन को निकाल लिया है, तो कोड और अधिक संगठित दिखता है। तथापि, कोड अभी भी थोड़ा ज्यादा निहित है। हमारे पास कार्य प्रकारों और परिष्कृत प्रकारों के लिए इन स्ट्रिंग मान हैं, और हमें उन्हें एक संगठित पैकेज में समावेषित करना चाहिए। हमारे कोड को संशोधित करने और कार्य निर्माण और हैंडलिंग को एक पैकेज में आवरित करने के लिए, आओ हमारे कोड को संशोधित करें और एक task पैकेज लिखें।

mkdir task && touch task/task.go
package task

import (
	"context"
	"encoding/json"
	"log"

	"github.com/hibiken/asynq"
)

// संदेशों के लिए कार्य के सूची।
const (
	TypeWelcomeEmail  = "email:welcome"
	TypeReminderEmail = "email:reminder"
)

// ईमेल संबंधित किसी भी कार्य के लिए आयात।
type EmailTaskPayload struct {
	// ईमेल प्राप्तकर्ता की ID।
	UserID int
}

func NewWelcomeEmailTask(id int) (*asynq.Task, error) {
	payload, err := json.Marshal(EmailTaskPayload{UserID: id})
	if err != nil {
		return nil, err
	}
	return asynq.NewTask(TypeWelcomeEmail, payload), nil
}

func NewReminderEmailTask(id int) (*asynq.Task, error) {
	payload, err := json.Marshal(EmailTaskPayload{UserID: id})
	if err != nil {
		return nil, err
	}
	return asynq.NewTask(TypeReminderEmail, payload), nil
}

func HandleWelcomeEmailTask(ctx context.Context, t *asynq.Task) error {
	var p EmailTaskPayload
	if err := json.Unmarshal(t.Payload(), &p); err != nil {
		return err
	}
	log.Printf(" [*] उपयोगकर्ता %d को स्वागत ईमेल भेज रहा हूँ", p.UserID)
	return nil
}

func HandleReminderEmailTask(ctx context.Context, t *asynq.Task) error {
	var p EmailTaskPayload
	if err := json.Unmarshal(t.Payload(), &p); err != nil {
		return err
	}
	log.Printf(" [*] उपयोगकर्ता %d को याद दिलाने वाला ईमेल भेज रहा हूँ", p.UserID)
	return nil
}

अब हम client.go और workers.go में इस पैकेज को आयात कर सकते हैं।

```go
// client.go
func main() {
	client := asynq.NewClient(asynq.RedisClientOpt{Addr: "localhost:6379"})

	t1, err := task.NewWelcomeEmailTask(42)
	if err != nil {
		log.Fatal(err)
	}

	t2, err := task.NewReminderEmailTask(42)
	if err != nil {
		log.Fatal(err)
	}

	// कार्य को तुरंत enqueue करें।
	info, err := client.Enqueue(t1)
	if err != nil {
		log.Fatal(err)
	}
	log.Printf(" [*] कार्य सफलतापूर्वक enqueue किया गया: %+v", info)

	// कार्य को 24 घंटे के बाद प्रसंस्करण के लिए enqueue करें।
	info, err = client.Enqueue(t2, asynq.ProcessIn(24*time.Hour))
	if err != nil {
		log.Fatal(err)
	}
	log.Printf(" [*] कार्य सफलतापूर्वक enqueue किया गया: %+v", info)
}
// workers.go
func main() {
	srv := asynq.NewServer(
		asynq.RedisClientOpt{Addr: "localhost:6379"},
		asynq.Config{Concurrency: 10},
	)

	mux := asynq.NewServeMux()
	mux.HandleFunc(task.TypeWelcomeEmail, task.HandleWelcomeEmailTask)
	mux.HandleFunc(task.TypeReminderEmail, task.HandleReminderEmailTask)

	if err := srv.Run(mux); err != nil {
		log.Fatal(err)
	}
}

कोड अब बेहतर दिखता है!

अब जैसे ही हमारे पास client और workers तैयार हैं, हम इन दोनों कार्यों को चला सकते हैं। आइए पहले client कार्यक्रम को चलाकर कार्य सृष्टि और अनुसूचित करें।

go run client/client.go

इससे दो कार्यों का निर्माण होगा: तुरंत प्रसंस्करण के लिए एक और 24 घंटे के बाद प्रसंस्करण के लिए एक।

हम asynq कमांड-लाइन इंटरफेस का उपयोग करके कार्यों की जांच कर सकते हैं।

asynq dash

आपको Enqueued स्थिति में एक कार्य और Scheduled स्थिति में एक और कार्य दिखाई देना चाहिए।

ध्यान दें: प्रत्येक स्थिति का अर्थ समझने के लिए, कृपया कार्य जीवनचक्र पर जाएं।

अंत में, हम workers कार्यक्रम को आरंभ करने के लिए चलाते हैं।

go run workers/workers.go

नोट: यह कार्यक्रम यह नहीं बंद होगा जब तक आप इसे समाप्त करने के लिए संकेत न भेजें। पृष्ठ-२ पर छोटाकार सुरक्षित तरीके से पृष्ठ के पीछे कार्यकर्ता को समाप्त करने के लिए सर्वश्रेष्ठ कार्यक्रियाओं के लिए, प्रतीक Wiki पृष्ठ का उपयोग करें।

आपको टर्मिनल में कुछ पाठिक का उत्पादन देखने को मिलना चाहिए, जो कार्यों की सफल प्रसंस्करण का सफलतापूर्वक सूचित कर रहा है।

आप client कार्यक्रम को फिर से चला सकते हैं और देख सकते हैं कि कार्यकर्ता कैसे उन्हें स्वीकार करते हैं और प्रसंस्करण करते हैं।

एक कार्य को प्रीसंगनग्रीत होकर सफलतापूर्वक प्रसंस्करण नहीं करना असाधारण नहीं है। डिफॉल्ट रूप से, असफल कार्यों को तापमान बैकऑफ के साथ 25 बार प्रायास किया जाएगा। एक असफल स्थिति की अनुकरण करने के लिए हमारे हैंडलर को अपडेट करने के लिए एक त्रुटि लौटाने के लिए अपडेट करें।

// tasks.go
```go
func HandleWelcomeEmailTask(ctx context.Context, t *asynq.Task) error {
    var p emailTaskPayload
    if err := json.Unmarshal(t.Payload(), &p); err != nil {
        return err
    }
    log.Printf(" [*] कोशिश कर रहे हैं कि उपयोगकर्ता %d को वेलकम ईमेल भेजें...", p.UserID)
    return fmt.Errorf("उपयोगकर्ता को ईमेल भेजने में विफल")
}

हमारे कार्यकर्ता प्रोग्राम को पुनः आरंभ करें और एक टास्क को enqueue करें।

go run workers/workers.go
go run client/client.go

यदि आप asynq dash चला रहे हैं, तो आपको Retry स्थिति में एक टास्क दिखाई देना चाहिए (कतार विवरण दृश्य पर जाकर "पुनः प्रयास" टैब को हाइलाइट करके)।

पुनः प्रयास स्थिति में कौन-कौन से टास्क हैं, यह जांचने के लिए, आप निम्नलिखित भी चला सकते हैं:

asynq task ls --queue=default --state=retry

इससे भविष्य में पुनः प्रयास के लिए सभी टास्क की सूची आएगी। उम्मीदित समय के साथ-साथ निर्दिष्ट टास्क के अगले क्रियान्वयन का आउटपुट शामिल होगा।

जैसे ही एक टास्क ने अपने पुनः प्रयासों को पूर्ण कर लिया होगा, वह Archived स्थिति में चला जाएगा और उसे फिर से पुनः प्रयास नहीं किया जाएगा (आप CLI या WebUI उपकरणों का उपयोग करके अभी भी संग्रहीत टास्क को मैन्युअल रूप से चला सकते हैं)।

इस ट्यूटोरियल को समाप्त करने से पहले, हमारे हैंडलर को ठीक कर लें।

func HandleWelcomeEmailTask(ctx context.Context, t *asynq.Task) error {
    var p emailTaskPayload
    if err := json.Unmarshal(t.Payload(), &p); err != nil {
        return err
    }
    log.Printf(" [*] उपयोगकर्ता %d को वेलकम ईमेल भेजना", p.UserID)
    return nil 
}