شروع گائیڈ

اس ٹیوٹوریل میں ہم دو پروگرامز، client اور workers لکھیں گے۔

  • client.go تاک کی اشیاء کو پس منظم طریقے سے پروسیس کرنے کے لئے پس منظم تھریڈز تکاذ پروسیس کرنے کے لئے پس منظم ٹاسک تیار کرے گا۔
  • workers.go کلائنٹ نے بنائے گئے ٹاسکس پروسیس کرنے کے لئے متعدد متوازی ورکر تھریڈز شروع کرے گا۔

اس گائیڈ کا اندازہ ہے کہ آپ کو localhost:6379 پر ریڈس سرور چلا رہے ہیں۔ شروع کرنے سے پہلے، براہ کرم یہ یقینی بنائیں کہ ریڈس انسٹال کیا گیا ہے اور چل رہا ہے۔

چلیں، ہمارے دو اہم فائلیں پہلے تیار کریں۔

mkdir quickstart && cd quickstart
go mod init asynq-quickstart
mkdir client workers
touch client/client.go workers/workers.go

پھر، asynq پیکیج کو انسٹال کریں۔

go get -u github.com/hibiken/asynq

کوڈ لکھنے سے پہلے، ہم کچھ کور ٹائپس جو ان دو پروگرامز میں استعمال ہونگے، کو دوبارہ دیکھ لیتے ہیں۔

ریڈس کنکشن اختیارات

Asynq ریڈس کو ایک پیغام بروکر کے طور پر استعمال کرتا ہے۔ client.go اور workers.go دونوں کو ریڈس سے منسلک ہونے کی ضرورت ہے پڑتی ہے۔ ہم RedisClientOpt استعمال کر کے مقامی ریڈس سرور سے کنکٹ کرنے کا ٹھیکہ لگائیں گے۔

redisConnOpt := asynq.RedisClientOpt{
    Addr: "localhost:6379",
    //گزارش کے لئے پاسورڈ ضرورت نہیں ہونگے
    Password: "mypassword",
    // asynq کے لئے ایک دےڈیکیٹڈ ڈیٹا بیس نمبر استعمال کریں۔
    // ڈیفالٹ طور پر، ریڈس 16 ڈیٹا بیس فراہم کرتا ہے (صفر تا پندرہ)۔
    DB: 0,
}

ٹاسکس

asynq میں، کام واحد ایک تائپ ٹاسک میں چھپے ہیں، جو تصوری طور پر دو فیلڈز: Type اور Payload رکھتا ہے۔

// Type ایک اسٹرنگ ویلیو ہے جو ٹاسک کی قسم کو ظاہر کرتا ہے۔
func (t *Task) Type() string

// Payload تاسرات کو اجراء کرنے کے لئے درکار ڈیٹا ہے۔
func (t *Task) Payload() []byte

اب جب ہم نے کور ٹائپس دیکھ لیئے ہیں، چلیں اپنے پروگرامز کو لکھنے شروع کریں۔

کلائنٹ پروگرام

client.go میں، ہم کچھ ٹاسکس بنائیں گے اور انہیں asynq.Client کو استعمال کر کے ان کی قطاروں میں ڈالیں گے۔

ٹاسک بنانے کے لئے، آپ NewTask فنکشن استعمال کر سکتے ہیں اور ٹاسک کی قسم اور پیلوڈ کو گزار سکتے ہیں۔

Enqueue میتھڈ ایک ٹاسک اور کسی بھی تعداد کے اختیارات لیتا ہے۔ فیوچر پروسیسنگ کے لئے ProcessIn یا ProcessAt اختیارات استعمال کریں۔

// ای میل ٹاسک کے متعلق پیلوڈ۔
type EmailTaskPayload struct {
    // ای میل ریسپانڈنٹ کی شناختی نمبر۔
    UserID int
}

// client.go
func main() {
    client := asynq.NewClient(asynq.RedisClientOpt{Addr: "localhost:6379"})

    // قسم اور پیلوڈ کے ساتھ ایک ٹاسک بنائیں۔
    payload, err := json.Marshal(EmailTaskPayload{UserID: 42})
    if err != nil {
        log.Fatal(err)
    }
    t1 := asynq.NewTask("email:welcome", payload)

    t2 := asynq.NewTask("email:reminder", payload)

    // ٹاسکس فوراً پروسیس ہوں۔
    info, err := client.Enqueue(t1)
    if err != nil {
        log.Fatal(err)
    }
    log.Printf(" [*] کامیابی سے ٹاسک قطاروں میں ڈالی گئی ہے: %+v", info)

    // 24 گھنٹے بعد ٹاسکس ہونگے۔
    info, err = client.Enqueue(t2, asynq.ProcessIn(24*time.Hour))
    if err != nil {
        log.Fatal(err)
    }
    log.Printf(" [*] کامیابی سے ٹاسک قطاروں میں ڈالی گئی ہے: %+v", info)
}

یہی ہمارے کلائنٹ پروگرام کے لئے سب کچھ ہے۔

ورکرز پروگرام

workers.go میں، ہم ایک asynq.Server مثال بنائیں گے تاکہ ورکرز کو شروع کر سکیں۔

NewServer فنکشن کو RedisConnOpt اور Config پیرامیٹرز کا استعمال ہوتا ہے۔

Config کو سرور کے ٹاسک کی پروسیسنگ کرنے کے رویے کو ترتیب دینے کے لئے استعمال کیا جاتا ہے۔ آپ Config ڈاکومینٹیشن دیکھ سکتے ہیں تاکہ آپ کو تمام دستیاب ترتیب کے آپشنز کے بارے میں معلومات حاصل ہوسکے۔

اس مثال کو سیدھا رکھنے کے لئے ہم صرف concurrency مخصوص کرتے ہیں۔

// workers.go
func main() {
    srv := asynq.NewServe(
        asynq.RedisClientOpt{Addr: "localhost:6379"},
        asynq.Config{Concurrency: 10},
    )

    // نوٹ: نیچے دیئے گئے حصے میں، ہم 'ہینڈلر' کیا ہے وہ دکھائیں گے۔
    if err := srv.Run(handler); err != nil {
        log.Fatal(err)
    }
}

(*Server).Run میتھڈ کا پیرامیٹر ایک انٹرفیس asynq.Handler ہے، جس میں ایک میتھڈ ProcessTask ہوتا ہے۔

type Handler interface {
    // اگر ٹاسک کا کام درستی سے کیا گیا ہوتا ہے تو ProcessTask کو نل ریٹرن کرنا چاہئے۔
    // اگر ProcessTask غیر نل ایرر ریٹرن کرتا ہے یا پھر پینک پیدا ہوتا ہے، تو ٹاسک بعد میں دوبارہ کوشش کی جائے گی۔
    ProcessTask(context.Context, *Task) error
}

ہینڈلر کو نافذ کرنے کا سب سے آسان طریقہ یہ ہے کہ آپ ایک فنکشن کی تعریف کریں جس کا سینیچر ایسا ہو اور جب آپ اسے Run میں پاس کریں تو آپ asynq.HandlerFunc ایڈاپٹر ٹائپ کا استعمال کریں۔

func handler(ctx context.Context, t *asynq.Task) error {
    switch t.Type() {
    case "email:welcome":
        var p EmailTaskPayload
        if err := json.Unmarshal(t.Payload(), &p); err != nil {
            return err
        }
        log.Printf(" [*] Sending welcome email to user %d", p.UserID)

    case "email:reminder":
        var p EmailTaskPayload
        if err := json.Unmarshal(t.Payload(), &p); err != nil {
            return err
        }
        log.Printf(" [*] Sending reminder email to user %d", p.UserID)

    default:
        return fmt.Errorf("Unexpected task type: %s", t.Type())
    }
    return nil
}

func main() {
    srv := asynq.NewServe(
        asynq.RedisClientOpt{Addr: "localhost:6379"},
        asynq.Config{Concurrency: 10},
    )

    // فنکشن کو ہینڈل کرنے کے لئے اسنق.HandlerFunc ایڈاپٹر کا استعمال کریں
    if err := srv.Run(asynq.HandlerFunc(handler)); err != nil {
        log.Fatal(err)
    }
}

ہم اس ہینڈلر فنکشن کے لئے switch کیسز کو تسلسل سے جاری رکھ سکتے ہیں، لیکن ایک واقعی ایپلیکیشن میں، ہر کیس کے لئے منطق کو الگ فنکشن میں تعریف کرنا زیادہ موزون ہوتا ہے۔

ہمارے کوڈ کو ریفیکٹ کرنے کے لئے، ہمیں ServeMux استعمال کرنا ہوگا تاکہ ہمارا ہینڈلر بن سکے۔ "net/http" پیکیج کی طرح، آپ ServeMux کو Handle یا HandleFunc کو کال کرکے ایک ہینڈلر رجسٹر کرسکتے ہیں۔ ServeMux Handler انٹرفیس کو پورا کرتا ہے، لہاذا اسکو (*Server).Run کو پاس کیا جا سکتا ہے۔

// workers.go
func main() {
    srv := asynq.NewServer(
        asynq.RedisClientOpt{Addr: "localhost:6379"},
        asynq.Config{Concurrency: 10},
    )

    mux := asynq.NewServeMux()
    mux.HandleFunc("email:welcome", sendWelcomeEmail)
    mux.HandleFunc("email:reminder", sendReminderEmail)

    if err := srv.Run(mux); err != nil {
        log.Fatal(err)
    }
}

func sendWelcomeEmail(ctx context.Context, t *asynq.Task) error {
    var p EmailTaskPayload
    if err := json.Unmarshal(t.Payload(), &p); err != nil {
        return err
    }
    log.Printf(" [*] Sending welcome email to user %d", p.UserID)
    return nil
}

func sendReminderEmail(ctx context.Context, t *asynq.Task) error {
    var p EmailTaskPayload
    if err := json.Unmarshal(t.Payload(), &p); err != nil {
        return err
    }
    log.Printf(" [*] Sending reminder email to user %d", p.UserID)
    return nil
}

اب جب ہم نے ہر قسم کے ٹاسک کے لئے ہینڈلنگ فنکشن نکال لیئے ہیں، تو کوڈ منظم نظر آرہا ہے۔ تاہم، کوڈ اب بھی تھوڑا زیادہ ضمنی ہے۔ ہمارے پاس ٹاسک کی قسموں اور پیرے لوڈ کی قسموں کے لئے ہمارے پاس یہ سٹرنگ مقامات ہیں، اور ہمیں انہیں ایک اورگینک پیکیج میں بند کرنا چاہئے۔ ہم کوڈ کو ریفیکٹ کرنے کے لئے اور یہ درج کرنے کے بعد، task کا ایک پیکیج لکھتے ہیں۔

mkdir task && touch task/task.go
package task

import (
	"context"
	"encoding/json"
	"log"

	"github.com/hibiken/asynq"
)

// List of task types.
const (
	TypeWelcomeEmail  = "email:welcome" 
	TypeReminderEmail = "email:reminder"
)

// Payload for any task related to emails.
type EmailTaskPayload struct {
	UserID int // برقی خط استعمال کرنے والے کا شناخت
}

func NewWelcomeEmailTask(id int) (*asynq.Task, error) {
	payload, err := json.Marshal(EmailTaskPayload{UserID: id})
	if err != nil {
		return nil, err
	}
	return asynq.NewTask(TypeWelcomeEmail, payload), nil
}

func NewReminderEmailTask(id int) (*asynq.Task, error) {
	payload, err := json.Marshal(EmailTaskPayload{UserID: id})
	if err != nil {
		return nil, err
	}
	return asynq.NewTask(TypeReminderEmail, payload), nil
}

func HandleWelcomeEmailTask(ctx context.Context, t *asynq.Task) error {
	var p EmailTaskPayload
	if err := json.Unmarshal(t.Payload(), &p); err != nil {
		return err
	}
	log.Printf(" [*] برقی خط بھیجنے کے لئے خوش آمدید ای میل اور واپسی ای میل کرنے کو چلانا ہے %d", p.UserID)
	return nil
}

func HandleReminderEmailTask(ctx context.Context, t *asynq.Task) error {
	var p EmailTaskPayload
	if err := json.Unmarshal(t.Payload(), &p); err != nil {
		return err
	}
	log.Printf(" [*] یاد دہانی ای میل بھیجنے کے لئے کاروائی کا انتظار کر رہے ہیں %d", p.UserID)
	return nil
}

اب ہم client.go اور workers.go میں اس پیکیج کو درآمد کرسکتے ہیں۔

```go
// client.go
func main() {
	client := asynq.NewClient(asynq.RedisClientOpt{Addr: "localhost:6379"})

	t1, err := task.NewWelcomeEmailTask(42)
	if err != nil {
		log.Fatal(err)
	}

	t2, err := task.NewReminderEmailTask(42)
	if err != nil {
		log.Fatal(err)
	}

	// تسک کو فوراً قائم کریں۔
	info, err := client.Enqueue(t1)
	if err != nil {
		log.Fatal(err)
	}
	log.Printf(" [*] تسک کو کامیابی سے قائم کیا گیا: %+v", info)

	// کام کے لئے تسک کو 24 گھنٹے بعد پراکرت کرنے کیلئے تسک کو قائم کریں۔
	info, err = client.Enqueue(t2, asynq.ProcessIn(24*time.Hour))
	if err != nil {
		log.Fatal(err)
	}
	log.Printf(" [*] تسک کو کامیابی سے قائم کیا گیا: %+v", info)
}
// workers.go
func main() {
	srv := asynq.NewServer(
		asynq.RedisClientOpt{Addr: "localhost:6379"},
		asynq.Config{Concurrency: 10},
	)

	mux := asynq.NewServeMux()
	mux.HandleFunc(task.TypeWelcomeEmail, task.HandleWelcomeEmailTask)
	mux.HandleFunc(task.TypeReminderEmail, task.HandleReminderEmailTask)

	if err := srv.Run(mux); err != nil {
		log.Fatal(err)
	}
}

کوڈ اب بہتر نظر آتا ہے!

اب جب ہمارے پاس client اور workers تیار ہیں تو ہم ان دونوں پروگرامز کو چلانے کے لئے۔ چلتے ہیں client پروگرام کو شروع کرکے کام کرنے والا ٹاسک بنانے کے لئے۔

go run client/client.go

اس سے دو تسک بنائے جائیں گے: فوراً پروسیسنگ کے لئے ایک اور 24 گھنٹے بعد کے لئے۔

تسکس کو انسپیکٹ کرنے کے لئے asynq کمانڈ لائن انٹرفیس کا استعمال کریں۔

asynq dash

آپ کو ایک قائم حالت میں ایک تسک اور دوسرے شیڈول حالت میں ایک تسک دیکھنے کو ملنا چاہئے۔

نوٹ: ہر حالت کی معنی سمجھنے کے لئے، برائے مہربانی Task Lifecycle پر مراجعہ کریں۔

آخر میں، ہم workers پروگرام کو شروع کریں گے تاکہ ٹاسکس کا سامنا ہوسکے۔

go run workers/workers.go

نوٹ: یہ پروگرام منسوخ ہونے تک اختتام نہیں کرے گا۔ بیک گراؤنڈ ورکرز کو سلامتی سے منسوخ کرنے کے بہترین مشورے کے لئے برائے مہربانی Signals Wiki page دیکھیں۔

آپ کو ٹرمینل میں کچھ متنی نتیجے دیکھنے کو ملنے چاہئیں جو تسکس کی کامیاب پروسیسنگ کی نشاندہی کرتے ہیں۔

آپ client پروگرام کو دوبارہ رن کرکے دیکھ سکتے ہیں کہ ورکرز انہیں قبول کیسے کرتے ہیں اورپروسیس کیسے کرتے ہیں۔

تاسک کا مکمل طور پر کامیابی سے پروسیس ہونے کی بجائے، ایک ٹاسک ناکام رہنے کا عمل کرنے کے لئے 25 بار دوبارہ کوشش کرے گا۔ ہمارے ہینڈلر کو اپ ڈیٹ کرنے کے لئے ایک خراب حالت کا تشخیص کرنے کے لئے ایک خرابی واپسی کرنے کا عمل کریں۔

// tasks.go
```go
func HandleWelcomeEmailTask(ctx context.Context, t *asynq.Task) error {
    var p emailTaskPayload
    if err := json.Unmarshal(t.Payload(), &p); err != nil {
        return err
    }
    log.Printf(" [*] کوشش کی جا رہی ہے کہ صارف %d کو خوش آمدید ای میل بھیجا جائے...", p.UserID)
    return fmt.Errorf("صارف کو ای میل بھیجنے میں ناکامی")
}

ہمارے ورکرز پروگرام کو دوبارہ شروع کریں اور ایک ٹاسک قطار میں ڈالیں۔

go run workers/workers.go
go run client/client.go

اگر آپ asynq dash چلارہے ہیں تو ، آپ کو Retry حالت میں ایک ٹاسک دیکھنے کا امکان ہونا چاہئے (قطار کی تفصیلات ویو اور "retry" ٹیب کو ہائی لائٹ کرکے)

کسی ٹاسک کو دوبارہ کرنے کی حالت میں دیکھنے کے لئے ، آپ یہ بھی چلاسکتے ہیں:

asynq task ls --queue=default --state=retry

یہ تمام ٹاسکس کا فہرست فراہم کرے گا جو مستقبل میں دوبارہ کوشش کرنے کے لئے ہوں گے۔ اس میں ہر ٹاسک کے لئے اگلی اجراء کی متوقع وقت شامل ہوگی۔

جب ایک ٹاسک نے اپنی دوبارہ کوششات کو مکمل کرلیں ، تو وہ Archived حالت میں منتقل ہوجائے گا اور دوبارہ کوشش نہیں کی جائے گی (آپ ابھی بھی CLI یا WebUI ٹولز کا استعمال کرکے آرشیو ٹاسکس کو منظور کر سکتے ہیں)۔

اس ٹیوٹوریل کو ختم کرنے سے پہلے ، ہمارے ہینڈلر کو درست کریں۔

func HandleWelcomeEmailTask(ctx context.Context, t *asynq.Task) error {
    var p emailTaskPayload
    if err := json.Unmarshal(t.Payload(), &p); err != nil {
        return err
    }
    log.Printf(" [*] صارف %d کو خوش آمدید ای میل بھیج رہے ہیں", p.UserID)
    return nil 
}