راهنمای شروع کار

در این آموزش، ما دو برنامه به نام‌های client و workers خواهیم نوشت.

  • برنامه client.go وظیفه‌ی ایجاد و برنامه‌ریزی وظایف برای پردازش به صورت ناهمزمان توسط موضوع‌های پس‌زمینه‌ای کارگر دارد.
  • برنامه workers.go چندین موضوع کارگر موازی را برای پردازش وظایف ایجاد شده توسط مشتری راه‌اندازی می‌کند.

این راهنما فرض می‌کند که شما یک سرور Redis را در localhost:6379 اجرا می‌کنید. قبل از شروع، لطفا اطمینان حاصل کنید که Redis نصب و در حال اجرا است.

ابتدا بیایید دو فایل اصلی خود را ایجاد کنیم.

mkdir quickstart && cd quickstart
go mod init asynq-quickstart
mkdir client workers
touch client/client.go workers/workers.go

سپس، بسته‌ی asynq را نصب کنید.

go get -u github.com/hibiken/asynq

قبل از شروع نوشتن کد، بیایید چند نوع اصلی را مرور کنیم که در این دو برنامه استفاده می‌شوند.

گزینه‌های اتصال Redis

Asynq از Redis به عنوان یک مشترک پیام استفاده می‌کند. هر دو client.go و workers.go برای عملیات خواندن و نوشتن به Redis نیاز دارند. ما از RedisClientOpt برای مشخص کردن اتصال به سرور Redis محلی استفاده خواهیم کرد.

redisConnOpt := asynq.RedisClientOpt{
    Addr: "localhost:6379",
    // اگر نیازی نیست، رمزعبور می‌تواند حذف شود
    Password: "mypassword",
    // از یک شماره پایگاه داده اختصاصی برای asynq استفاده کنید.
    // به طور پیش‌فرض، Redis 16 پایگاه داده را ارائه می‌دهد (از 0 تا 15).
    DB: 0,
}

وظایف

در asynq، واحدهای کاری در یک نوع به نام Task بسته‌بندی می‌شوند، که به طور مفهومی دو فیلد به نام‌های Type و Payload را دارد.

// Type یک مقدار رشته‌ای است که نوع وظیفه را نشان می‌دهد.
func (t *Task) Type() string

// Payload داده مورد نیاز برای اجرای وظیفه است.
func (t *Task) Payload() []byte

حال که به انواع اصلی نگاهی انداختیم، بیایید شروع به نوشتن برنامه‌های خود کنیم.

برنامه مشتری

در client.go، ما برخی از وظایف را ایجاد کرده و آن‌ها را با استفاده از asynq.Client قرار می‌دهیم.

برای ایجاد یک وظیفه، می‌توانید از تابع NewTask استفاده کرده و نوع و payload وظیفه را منتقل کنید.

متد Enqueue یک وظیفه و هر تعداد گزینه را می‌پذیرد. از گزینه‌های ProcessIn یا ProcessAt برای زمان‌بندی وظایف برای پردازش در آینده استفاده کنید.

// Payload مربوط به وظایف ایمیل.
type EmailTaskPayload struct {
    // شناسه گیرنده ایمیل.
    UserID int
}

// client.go
func main() {
    client := asynq.NewClient(asynq.RedisClientOpt{Addr: "localhost:6379"})

    // یک وظیفه با نوع و payload ایجاد کنید.
    payload, err := json.Marshal(EmailTaskPayload{UserID: 42})
    if err != nil {
        log.Fatal(err)
    }
    t1 := asynq.NewTask("email:welcome", payload)

    t2 := asynq.NewTask("email:reminder", payload)

    // وظایف را فوراً پردازش کنید.
    info, err := client.Enqueue(t1)
    if err != nil {
        log.Fatal(err)
    }
    log.Printf(" [*] وظیفه با موفقیت درون‌یابی شد: %+v", info)

    // وظایف را پس از 24 ساعت پردازش کنید.
    info, err = client.Enqueue(t2, asynq.ProcessIn(24*time.Hour))
    if err != nil {
        log.Fatal(err)
    }
    log.Printf(" [*] وظیفه با موفقیت درون‌یابی شد: %+v", info)
}

تمام کاری که برای برنامه مشتری ما نیاز است، انجام شده است.

برنامه‌ی کارگران

در workers.go، ما یک نمونه از asynq.Server ایجاد خواهیم کرد تا کارگران را شروع کنیم.

تابع NewServer، RedisConnOpt و Config را به عنوان پارامتر می‌گیرد.

Config برای تنظیم رفتار پردازش وظیفه‌های سرور استفاده می‌شود. می توانید به مستندات Config مراجعه کنید تا در مورد تمام گزینه‌های پیکربندی موجود اطلاعات کسب کنید.

به عنوان مثال، در این مثال، ما فقط همروندی را مشخص می‌کنیم.

// workers.go
func main() {
    srv := asynq.NewServe(
        asynq.RedisClientOpt{Addr: "localhost:6379"},
        asynq.Config{Concurrency: 10},
    )

    // توجه: در بخش زیر، ما معرفی می‌کنیم که `handler` چیست.
    if err := srv.Run(handler); err != nil {
        log.Fatal(err)
    }
}

پارامتر متد (*Server).Run یک رابط asynq.Handler است که یک متد ProcessTask دارد.

type Handler interface {
    // اگر وظیفه با موفقیت پردازش شود، ProcessTask باید کیک نامشخص بازگرداند.
    // اگر ProcessTask یک خطا غیرنامشخص یا باعث یک بحران شود، وظیفه بعداً تلاش مجدد خواهد شد.
    ProcessTask(context.Context, *Task) error
}

ساده‌ترین راه برای پیاده‌سازی یک handler، تعریف یک تابع با همان امضاء است و استفاده از نوع آداپتور asynq.HandlerFunc هنگام انتقال آن به Run است.

func handler(ctx context.Context, t *asynq.Task) error {
    switch t.Type() {
    case "email:welcome":
        var p EmailTaskPayload
        if err := json.Unmarshal(t.Payload(), &p); err != nil {
            return err
        }
        log.Printf(" [*] ارسال ایمیل خوش‌آمد گویی به کاربر %d", p.UserID)

    case "email:reminder":
        var p EmailTaskPayload
        if err := json.Unmarshal(t.Payload(), &p); err != nil {
            return err
        }
        log.Printf(" [*] ارسال یادآوری ایمیل به کاربر %d", p.UserID)

    default:
        return fmt.Errorf("نوع وظیفه غیرمنتظره است: %s", t.Type())
    }
    return nil
}

func main() {
    srv := asynq.NewServe(
        asynq.RedisClientOpt{Addr: "localhost:6379"},
        asynq.Config{Concurrency: 10},
    )

    // استفاده از آداپتور asynq.HandlerFunc برای پردازش تابع
    if err := srv.Run(asynq.HandlerFunc(handler)); err != nil {
        log.Fatal(err)
    }
}

می‌توانیم ادامه دهیم و موارد switch را برای این تابع handler اضافه کنیم، اما در یک برنامه‌ی واقعی، راحت‌تر است که منطق مربوط به هر مورد را در یک تابع جداگانه تعریف کنیم.

برای بازطراحی کد ما، بیایید از ServeMux استفاده کنیم تا handler خود را ایجاد کنیم. دقیقاً مانند ServeMux از پکیج "net/http"، می‌توانید یک handler را با فراخوانی Handle یا HandleFunc ثبت کنید. ServeMux رابط Handler را برآورده می‌کند، بنابراین می‌توان آن را به (*Server).Run ارسال کرد.

// workers.go
func main() {
    srv := asynq.NewServer(
        asynq.RedisClientOpt{Addr: "localhost:6379"},
        asynq.Config{Concurrency: 10},
    )

    mux := asynq.NewServeMux()
    mux.HandleFunc("email:welcome", sendWelcomeEmail)
    mux.HandleFunc("email:reminder", sendReminderEmail)

    if err := srv.Run(mux); err != nil {
        log.Fatal(err)
    }
}

func sendWelcomeEmail(ctx context.Context, t *asynq.Task) error {
    var p EmailTaskPayload
    if err := json.Unmarshal(t.Payload(), &p); err != nil {
        return err
    }
    log.Printf(" [*] ارسال ایمیل خوش‌آمد گویی به کاربر %d", p.UserID)
    return nil
}

func sendReminderEmail(ctx context.Context, t *asynq.Task) error {
    var p EmailTaskPayload
    if err := json.Unmarshal(t.Payload(), &p); err != nil {
        return err
    }
    log.Printf(" [*] ارسال یادآوری ایمیل به کاربر %d", p.UserID)
    return nil
}

اکنون که ما توابع پردازش برای هر نوع وظیفه استخراج کرده‌ایم، کد بیشتر منظم به نظر می‌رسد. با این حال، کد هنوز کمی ضمنی است. ما این مقادیر رشته‌ای برای انواع وظیفه و انواع بار را داریم و باید آنها را در یک بسته سازماندهی شده ببندیم. بیایید کد خود را بازطراحی کنیم و یک بسته برای بسته‌بندی ایجاد و پردازش وظیفه بنویسیم. ما به سادگی یک بسته به نام task ایجاد می‌کنیم.

mkdir task && touch task/task.go
package task

import (
    "context"
    "encoding/json"
    "log"

    "github.com/hibiken/asynq"
)

// لیست انواع وظایف
const (
    TypeWelcomeEmail  = "ایمیل:خوش‌آمدگویی"
    TypeReminderEmail = "ایمیل:یادآوری"
)

// داده ورودی برای هر وظیفه مربوط به ایمیل
type EmailTaskPayload struct {
    // شناسه گیرنده ایمیل
    UserID int
}

func NewWelcomeEmailTask(id int) (*asynq.Task, error) {
    payload, err := json.Marshal(EmailTaskPayload{UserID: id})
    if err != nil {
        return nil, err
    }
    return asynq.NewTask(TypeWelcomeEmail, payload), nil
}

func NewReminderEmailTask(id int) (*asynq.Task, error) {
    payload, err := json.Marshal(EmailTaskPayload{UserID: id})
    if err != nil {
        return nil, err
    }
    return asynq.NewTask(TypeReminderEmail, payload), nil
}

func HandleWelcomeEmailTask(ctx context.Context, t *asynq.Task) error {
    var p EmailTaskPayload
    if err := json.Unmarshal(t.Payload(), &p); err != nil {
        return err
    }
    log.Printf(" [*] در حال ارسال ایمیل خوش‌آمدگویی به کاربر %d", p.UserID)
    return nil
}

func HandleReminderEmailTask(ctx context.Context, t *asynq.Task) error {
    var p EmailTaskPayload
    if err := json.Unmarshal(t.Payload(), &p); err != nil {
        return err
    }
    log.Printf(" [*] در حال ارسال ایمیل یادآوری به کاربر %d", p.UserID)
    return nil
}

اکنون می‌توانیم این بسته را در client.go و workers.go وارد کنیم.

```go
// client.go
func main() {
    client := asynq.NewClient(asynq.RedisClientOpt{Addr: "localhost:6379"})

    t1, err := task.NewWelcomeEmailTask(42)
    if err != nil {
        log.Fatal(err)
    }

    t2, err := task.NewReminderEmailTask(42)
    if err != nil {
        log.Fatal(err)
    }

    // وظیفه را فوراً در صف قرار دهید.
    info, err := client.Enqueue(t1)
    if err != nil {
        log.Fatal(err)
    }
    log.Printf(" [*] وظیفه با موفقیت به صف اضافه شد: %+v", info)

    // وظیفه را برای پردازش پس از ۲۴ ساعت در صف قرار دهید.
    info, err = client.Enqueue(t2, asynq.ProcessIn(24*time.Hour))
    if err != nil {
        log.Fatal(err)
    }
    log.Printf(" [*] وظیفه با موفقیت به صف اضافه شد: %+v", info)
}
// workers.go
func main() {
    srv := asynq.NewServer(
        asynq.RedisClientOpt{Addr: "localhost:6379"},
        asynq.Config{Concurrency: 10},
    )

    mux := asynq.NewServeMux()
    mux.HandleFunc(task.TypeWelcomeEmail, task.HandleWelcomeEmailTask)
    mux.HandleFunc(task.TypeReminderEmail, task.HandleReminderEmailTask)

    if err := srv.Run(mux); err != nil {
        log.Fatal(err)
    }
}

اکنون کد بهتر به نظر می‌رسد!

اکنون که client و workers آماده هستند، می‌توانیم این دو برنامه را اجرا کنیم. بیایید با اجرای برنامه client وظایف را ایجاد و زمانبندی کنیم.

go run client/client.go

این دو وظیفه ایجاد خواهد کرد: یکی برای پردازش فوری و دیگری برای پردازش پس از ۲۴ ساعت.

بیایید از رابط خط فرمان asynq برای بررسی وظایف استفاده کنیم.

asynq dash

باید بتوانید یک وظیفه در وضعیت قرارداده شده و دیگری در وضعیت زمان‌بندی شده را مشاهده کنید.

توجه: برای درک معنای هر وضعیت، لطفاً به چرخه عمر یک وظیفه مراجعه کنید.

در آخر، بیایید برنامه workers را برای پردازش وظایف راه‌اندازی کنیم.

go run workers/workers.go

توجه: این برنامه تا زمانی که یک سیگنال برای خاتمه آن فرستاده نشود، خارج نخواهد شد. برای بهترین روش‌ها برای خاتمه ایمن کارگرها در پس‌زمینه، لطفاً به صفحه ویکی سیگنال‌ها مراجعه کنید.

باید بتوانید خروجی‌های متنی مربوط به پردازش موفق وظایف را در ترمینال مشاهده کنید.

می‌توانید دوباره برنامه client را اجرا کنید تا ببینید چگونه کارگرها وظایف را پذیرش و پردازش می‌کنند.

اجرای اولیه وظیفه برای پردازش موفق همیشه اتفاق نمی‌افتد. به صورت پیش‌فرض، وظایف ناموفق به صورت ۲۵ بار با کاهشی نمایی تلاش می‌شوند. بیایید برنامه ما را به‌روزرسانی کنیم تا یک خطا بازگشت داده و وضعیت ناموفق را شبیه‌سازی کنیم.

// tasks.go
func HandleWelcomeEmailTask(ctx context.Context, t *asynq.Task) error {
    var p emailTaskPayload
    if err := json.Unmarshal(t.Payload(), &p); err != nil {
        return err
    }
    log.Printf(" [*] تلاش برای ارسال یک ایمیل خوش‌آمدگویی به کاربر %d...", p.UserID)
    return fmt.Errorf("ارسال ایمیل به کاربر ناموفق بود")
}

بیایید برنامه‌های کارگر خود را مجدداهم‌راه‌اندازی کنیم و یک وظیفه به صف اضافه کنیم.

go run workers/workers.go
go run client/client.go

اگر شما asynq dash را اجرا می‌کنید، باید یک وظیفه در وضعیت Retry (با ناوبری به نمای جزئیات صف و برجسته کردن تب "retry") ببینید.

برای بررسی وظایفی که در وضعیت تکرار هستند، می‌توانید همچنین اجرا کنید:

asynq task ls --queue=default --state=retry

این کار همه وظایفی را که در آینده تکرار خواهند شد لیست می‌کند. خروجی شامل زمان مورد انتظار اجرای بعدی هر وظیفه است.

هنگامی که یک وظیفه تلاش‌های تکرار خود را به سرانجام رسانده باشد، به وضعیت Archived منتقل می‌شود و دیگر دوباره تلاش برای تکرار نمی‌شود (شما همچنان می‌توانید با استفاده از ابزارهای CLI یا WebUI وظایف بایگانی شده را به صورت دستی اجرا کنید).

قبل از پایان این آموزش، بیایید دستگیره‌مان را اصلاح کنیم.

func HandleWelcomeEmailTask(ctx context.Context, t *asynq.Task) error {
    var p emailTaskPayload
    if err := json.Unmarshal(t.Payload(), &p); err != nil {
        return err
    }
    log.Printf(" [*] ارسال یک ایمیل خوش‌آمدگویی به کاربر %d", p.UserID)
    return nil 
}