شروع گائیڈ
اس ٹیوٹوریل میں ہم دو پروگرامز، client
اور workers
لکھیں گے۔
-
client.go
تاک کی اشیاء کو پس منظم طریقے سے پروسیس کرنے کے لئے پس منظم تھریڈز تکاذ پروسیس کرنے کے لئے پس منظم ٹاسک تیار کرے گا۔ -
workers.go
کلائنٹ نے بنائے گئے ٹاسکس پروسیس کرنے کے لئے متعدد متوازی ورکر تھریڈز شروع کرے گا۔
اس گائیڈ کا اندازہ ہے کہ آپ کو localhost:6379
پر ریڈس سرور چلا رہے ہیں۔ شروع کرنے سے پہلے، براہ کرم یہ یقینی بنائیں کہ ریڈس انسٹال کیا گیا ہے اور چل رہا ہے۔
چلیں، ہمارے دو اہم فائلیں پہلے تیار کریں۔
mkdir quickstart && cd quickstart
go mod init asynq-quickstart
mkdir client workers
touch client/client.go workers/workers.go
پھر، asynq
پیکیج کو انسٹال کریں۔
go get -u github.com/hibiken/asynq
کوڈ لکھنے سے پہلے، ہم کچھ کور ٹائپس جو ان دو پروگرامز میں استعمال ہونگے، کو دوبارہ دیکھ لیتے ہیں۔
ریڈس کنکشن اختیارات
Asynq ریڈس کو ایک پیغام بروکر کے طور پر استعمال کرتا ہے۔ client.go
اور workers.go
دونوں کو ریڈس سے منسلک ہونے کی ضرورت ہے پڑتی ہے۔ ہم RedisClientOpt
استعمال کر کے مقامی ریڈس سرور سے کنکٹ کرنے کا ٹھیکہ لگائیں گے۔
redisConnOpt := asynq.RedisClientOpt{
Addr: "localhost:6379",
//گزارش کے لئے پاسورڈ ضرورت نہیں ہونگے
Password: "mypassword",
// asynq کے لئے ایک دےڈیکیٹڈ ڈیٹا بیس نمبر استعمال کریں۔
// ڈیفالٹ طور پر، ریڈس 16 ڈیٹا بیس فراہم کرتا ہے (صفر تا پندرہ)۔
DB: 0,
}
ٹاسکس
asynq
میں، کام واحد ایک تائپ ٹاسک
میں چھپے ہیں، جو تصوری طور پر دو فیلڈز: Type
اور Payload
رکھتا ہے۔
// Type ایک اسٹرنگ ویلیو ہے جو ٹاسک کی قسم کو ظاہر کرتا ہے۔
func (t *Task) Type() string
// Payload تاسرات کو اجراء کرنے کے لئے درکار ڈیٹا ہے۔
func (t *Task) Payload() []byte
اب جب ہم نے کور ٹائپس دیکھ لیئے ہیں، چلیں اپنے پروگرامز کو لکھنے شروع کریں۔
کلائنٹ پروگرام
client.go
میں، ہم کچھ ٹاسکس بنائیں گے اور انہیں asynq.Client
کو استعمال کر کے ان کی قطاروں میں ڈالیں گے۔
ٹاسک بنانے کے لئے، آپ NewTask
فنکشن استعمال کر سکتے ہیں اور ٹاسک کی قسم اور پیلوڈ کو گزار سکتے ہیں۔
Enqueue
میتھڈ ایک ٹاسک اور کسی بھی تعداد کے اختیارات لیتا ہے۔ فیوچر پروسیسنگ کے لئے ProcessIn
یا ProcessAt
اختیارات استعمال کریں۔
// ای میل ٹاسک کے متعلق پیلوڈ۔
type EmailTaskPayload struct {
// ای میل ریسپانڈنٹ کی شناختی نمبر۔
UserID int
}
// client.go
func main() {
client := asynq.NewClient(asynq.RedisClientOpt{Addr: "localhost:6379"})
// قسم اور پیلوڈ کے ساتھ ایک ٹاسک بنائیں۔
payload, err := json.Marshal(EmailTaskPayload{UserID: 42})
if err != nil {
log.Fatal(err)
}
t1 := asynq.NewTask("email:welcome", payload)
t2 := asynq.NewTask("email:reminder", payload)
// ٹاسکس فوراً پروسیس ہوں۔
info, err := client.Enqueue(t1)
if err != nil {
log.Fatal(err)
}
log.Printf(" [*] کامیابی سے ٹاسک قطاروں میں ڈالی گئی ہے: %+v", info)
// 24 گھنٹے بعد ٹاسکس ہونگے۔
info, err = client.Enqueue(t2, asynq.ProcessIn(24*time.Hour))
if err != nil {
log.Fatal(err)
}
log.Printf(" [*] کامیابی سے ٹاسک قطاروں میں ڈالی گئی ہے: %+v", info)
}
یہی ہمارے کلائنٹ پروگرام کے لئے سب کچھ ہے۔
ورکرز پروگرام
workers.go
میں، ہم ایک asynq.Server
مثال بنائیں گے تاکہ ورکرز کو شروع کر سکیں۔
NewServer
فنکشن کو RedisConnOpt
اور Config
پیرامیٹرز کا استعمال ہوتا ہے۔
Config
کو سرور کے ٹاسک کی پروسیسنگ کرنے کے رویے کو ترتیب دینے کے لئے استعمال کیا جاتا ہے۔
آپ Config
ڈاکومینٹیشن دیکھ سکتے ہیں تاکہ آپ کو تمام دستیاب ترتیب کے آپشنز کے بارے میں معلومات حاصل ہوسکے۔
اس مثال کو سیدھا رکھنے کے لئے ہم صرف concurrency مخصوص کرتے ہیں۔
// workers.go
func main() {
srv := asynq.NewServe(
asynq.RedisClientOpt{Addr: "localhost:6379"},
asynq.Config{Concurrency: 10},
)
// نوٹ: نیچے دیئے گئے حصے میں، ہم 'ہینڈلر' کیا ہے وہ دکھائیں گے۔
if err := srv.Run(handler); err != nil {
log.Fatal(err)
}
}
(*Server).Run
میتھڈ کا پیرامیٹر ایک انٹرفیس asynq.Handler
ہے، جس میں ایک میتھڈ ProcessTask
ہوتا ہے۔
type Handler interface {
// اگر ٹاسک کا کام درستی سے کیا گیا ہوتا ہے تو ProcessTask کو نل ریٹرن کرنا چاہئے۔
// اگر ProcessTask غیر نل ایرر ریٹرن کرتا ہے یا پھر پینک پیدا ہوتا ہے، تو ٹاسک بعد میں دوبارہ کوشش کی جائے گی۔
ProcessTask(context.Context, *Task) error
}
ہینڈلر کو نافذ کرنے کا سب سے آسان طریقہ یہ ہے کہ آپ ایک فنکشن کی تعریف کریں جس کا سینیچر ایسا ہو اور جب آپ اسے Run
میں پاس کریں تو آپ asynq.HandlerFunc
ایڈاپٹر ٹائپ کا استعمال کریں۔
func handler(ctx context.Context, t *asynq.Task) error {
switch t.Type() {
case "email:welcome":
var p EmailTaskPayload
if err := json.Unmarshal(t.Payload(), &p); err != nil {
return err
}
log.Printf(" [*] Sending welcome email to user %d", p.UserID)
case "email:reminder":
var p EmailTaskPayload
if err := json.Unmarshal(t.Payload(), &p); err != nil {
return err
}
log.Printf(" [*] Sending reminder email to user %d", p.UserID)
default:
return fmt.Errorf("Unexpected task type: %s", t.Type())
}
return nil
}
func main() {
srv := asynq.NewServe(
asynq.RedisClientOpt{Addr: "localhost:6379"},
asynq.Config{Concurrency: 10},
)
// فنکشن کو ہینڈل کرنے کے لئے اسنق.HandlerFunc ایڈاپٹر کا استعمال کریں
if err := srv.Run(asynq.HandlerFunc(handler)); err != nil {
log.Fatal(err)
}
}
ہم اس ہینڈلر فنکشن کے لئے switch کیسز کو تسلسل سے جاری رکھ سکتے ہیں، لیکن ایک واقعی ایپلیکیشن میں، ہر کیس کے لئے منطق کو الگ فنکشن میں تعریف کرنا زیادہ موزون ہوتا ہے۔
ہمارے کوڈ کو ریفیکٹ کرنے کے لئے، ہمیں ServeMux
استعمال کرنا ہوگا تاکہ ہمارا ہینڈلر بن سکے۔ "net/http" پیکیج کی طرح، آپ ServeMux
کو Handle
یا HandleFunc
کو کال کرکے ایک ہینڈلر رجسٹر کرسکتے ہیں۔ ServeMux
Handler
انٹرفیس کو پورا کرتا ہے، لہاذا اسکو (*Server).Run
کو پاس کیا جا سکتا ہے۔
// workers.go
func main() {
srv := asynq.NewServer(
asynq.RedisClientOpt{Addr: "localhost:6379"},
asynq.Config{Concurrency: 10},
)
mux := asynq.NewServeMux()
mux.HandleFunc("email:welcome", sendWelcomeEmail)
mux.HandleFunc("email:reminder", sendReminderEmail)
if err := srv.Run(mux); err != nil {
log.Fatal(err)
}
}
func sendWelcomeEmail(ctx context.Context, t *asynq.Task) error {
var p EmailTaskPayload
if err := json.Unmarshal(t.Payload(), &p); err != nil {
return err
}
log.Printf(" [*] Sending welcome email to user %d", p.UserID)
return nil
}
func sendReminderEmail(ctx context.Context, t *asynq.Task) error {
var p EmailTaskPayload
if err := json.Unmarshal(t.Payload(), &p); err != nil {
return err
}
log.Printf(" [*] Sending reminder email to user %d", p.UserID)
return nil
}
اب جب ہم نے ہر قسم کے ٹاسک کے لئے ہینڈلنگ فنکشن نکال لیئے ہیں، تو کوڈ منظم نظر آرہا ہے۔ تاہم، کوڈ اب بھی تھوڑا زیادہ ضمنی ہے۔ ہمارے پاس ٹاسک کی قسموں اور پیرے لوڈ کی قسموں کے لئے ہمارے پاس یہ سٹرنگ مقامات ہیں، اور ہمیں انہیں ایک اورگینک پیکیج میں بند کرنا چاہئے۔ ہم کوڈ کو ریفیکٹ کرنے کے لئے اور یہ درج کرنے کے بعد، task کا ایک پیکیج لکھتے ہیں۔
mkdir task && touch task/task.go
package task
import (
"context"
"encoding/json"
"log"
"github.com/hibiken/asynq"
)
// List of task types.
const (
TypeWelcomeEmail = "email:welcome"
TypeReminderEmail = "email:reminder"
)
// Payload for any task related to emails.
type EmailTaskPayload struct {
UserID int // برقی خط استعمال کرنے والے کا شناخت
}
func NewWelcomeEmailTask(id int) (*asynq.Task, error) {
payload, err := json.Marshal(EmailTaskPayload{UserID: id})
if err != nil {
return nil, err
}
return asynq.NewTask(TypeWelcomeEmail, payload), nil
}
func NewReminderEmailTask(id int) (*asynq.Task, error) {
payload, err := json.Marshal(EmailTaskPayload{UserID: id})
if err != nil {
return nil, err
}
return asynq.NewTask(TypeReminderEmail, payload), nil
}
func HandleWelcomeEmailTask(ctx context.Context, t *asynq.Task) error {
var p EmailTaskPayload
if err := json.Unmarshal(t.Payload(), &p); err != nil {
return err
}
log.Printf(" [*] برقی خط بھیجنے کے لئے خوش آمدید ای میل اور واپسی ای میل کرنے کو چلانا ہے %d", p.UserID)
return nil
}
func HandleReminderEmailTask(ctx context.Context, t *asynq.Task) error {
var p EmailTaskPayload
if err := json.Unmarshal(t.Payload(), &p); err != nil {
return err
}
log.Printf(" [*] یاد دہانی ای میل بھیجنے کے لئے کاروائی کا انتظار کر رہے ہیں %d", p.UserID)
return nil
}
اب ہم client.go
اور workers.go
میں اس پیکیج کو درآمد کرسکتے ہیں۔
```go
// client.go
func main() {
client := asynq.NewClient(asynq.RedisClientOpt{Addr: "localhost:6379"})
t1, err := task.NewWelcomeEmailTask(42)
if err != nil {
log.Fatal(err)
}
t2, err := task.NewReminderEmailTask(42)
if err != nil {
log.Fatal(err)
}
// تسک کو فوراً قائم کریں۔
info, err := client.Enqueue(t1)
if err != nil {
log.Fatal(err)
}
log.Printf(" [*] تسک کو کامیابی سے قائم کیا گیا: %+v", info)
// کام کے لئے تسک کو 24 گھنٹے بعد پراکرت کرنے کیلئے تسک کو قائم کریں۔
info, err = client.Enqueue(t2, asynq.ProcessIn(24*time.Hour))
if err != nil {
log.Fatal(err)
}
log.Printf(" [*] تسک کو کامیابی سے قائم کیا گیا: %+v", info)
}
// workers.go
func main() {
srv := asynq.NewServer(
asynq.RedisClientOpt{Addr: "localhost:6379"},
asynq.Config{Concurrency: 10},
)
mux := asynq.NewServeMux()
mux.HandleFunc(task.TypeWelcomeEmail, task.HandleWelcomeEmailTask)
mux.HandleFunc(task.TypeReminderEmail, task.HandleReminderEmailTask)
if err := srv.Run(mux); err != nil {
log.Fatal(err)
}
}
کوڈ اب بہتر نظر آتا ہے!
اب جب ہمارے پاس client
اور workers
تیار ہیں تو ہم ان دونوں پروگرامز کو چلانے کے لئے۔ چلتے ہیں client
پروگرام کو شروع کرکے کام کرنے والا ٹاسک بنانے کے لئے۔
go run client/client.go
اس سے دو تسک بنائے جائیں گے: فوراً پروسیسنگ کے لئے ایک اور 24 گھنٹے بعد کے لئے۔
تسکس کو انسپیکٹ کرنے کے لئے asynq
کمانڈ لائن انٹرفیس کا استعمال کریں۔
asynq dash
آپ کو ایک قائم حالت میں ایک تسک اور دوسرے شیڈول حالت میں ایک تسک دیکھنے کو ملنا چاہئے۔
نوٹ: ہر حالت کی معنی سمجھنے کے لئے، برائے مہربانی Task Lifecycle پر مراجعہ کریں۔
آخر میں، ہم workers
پروگرام کو شروع کریں گے تاکہ ٹاسکس کا سامنا ہوسکے۔
go run workers/workers.go
نوٹ: یہ پروگرام منسوخ ہونے تک اختتام نہیں کرے گا۔ بیک گراؤنڈ ورکرز کو سلامتی سے منسوخ کرنے کے بہترین مشورے کے لئے برائے مہربانی Signals Wiki page دیکھیں۔
آپ کو ٹرمینل میں کچھ متنی نتیجے دیکھنے کو ملنے چاہئیں جو تسکس کی کامیاب پروسیسنگ کی نشاندہی کرتے ہیں۔
آپ client
پروگرام کو دوبارہ رن کرکے دیکھ سکتے ہیں کہ ورکرز انہیں قبول کیسے کرتے ہیں اورپروسیس کیسے کرتے ہیں۔
تاسک کا مکمل طور پر کامیابی سے پروسیس ہونے کی بجائے، ایک ٹاسک ناکام رہنے کا عمل کرنے کے لئے 25 بار دوبارہ کوشش کرے گا۔ ہمارے ہینڈلر کو اپ ڈیٹ کرنے کے لئے ایک خراب حالت کا تشخیص کرنے کے لئے ایک خرابی واپسی کرنے کا عمل کریں۔
// tasks.go
```go
func HandleWelcomeEmailTask(ctx context.Context, t *asynq.Task) error {
var p emailTaskPayload
if err := json.Unmarshal(t.Payload(), &p); err != nil {
return err
}
log.Printf(" [*] کوشش کی جا رہی ہے کہ صارف %d کو خوش آمدید ای میل بھیجا جائے...", p.UserID)
return fmt.Errorf("صارف کو ای میل بھیجنے میں ناکامی")
}
ہمارے ورکرز پروگرام کو دوبارہ شروع کریں اور ایک ٹاسک قطار میں ڈالیں۔
go run workers/workers.go
go run client/client.go
اگر آپ asynq dash
چلارہے ہیں تو ، آپ کو Retry حالت میں ایک ٹاسک دیکھنے کا امکان ہونا چاہئے (قطار کی تفصیلات ویو اور "retry" ٹیب کو ہائی لائٹ کرکے)
کسی ٹاسک کو دوبارہ کرنے کی حالت میں دیکھنے کے لئے ، آپ یہ بھی چلاسکتے ہیں:
asynq task ls --queue=default --state=retry
یہ تمام ٹاسکس کا فہرست فراہم کرے گا جو مستقبل میں دوبارہ کوشش کرنے کے لئے ہوں گے۔ اس میں ہر ٹاسک کے لئے اگلی اجراء کی متوقع وقت شامل ہوگی۔
جب ایک ٹاسک نے اپنی دوبارہ کوششات کو مکمل کرلیں ، تو وہ Archived حالت میں منتقل ہوجائے گا اور دوبارہ کوشش نہیں کی جائے گی (آپ ابھی بھی CLI یا WebUI ٹولز کا استعمال کرکے آرشیو ٹاسکس کو منظور کر سکتے ہیں)۔
اس ٹیوٹوریل کو ختم کرنے سے پہلے ، ہمارے ہینڈلر کو درست کریں۔
func HandleWelcomeEmailTask(ctx context.Context, t *asynq.Task) error {
var p emailTaskPayload
if err := json.Unmarshal(t.Payload(), &p); err != nil {
return err
}
log.Printf(" [*] صارف %d کو خوش آمدید ای میل بھیج رہے ہیں", p.UserID)
return nil
}