دليل البدء

في هذا البرنامج التعليمي، سنقوم بكتابة برنامجين، العميل والعاملون.

  • client.go سيقوم بإنشاء وجدولة المهام لتتم معالجتها بشكل غير متزامن من قبل خيوط العمل الخلفية.
  • workers.go سيبدأ العديد من خيوط العامل المتزامنة للتعامل مع المهام التي تم إنشاؤها بواسطة العميل.

يفترض هذا الدليل أنك تقوم بتشغيل خادم Redis على localhost:6379. قبل البدء، يرجى التأكد من تثبيت Redis وتشغيله.

دعنا نقوم أولاً بإنشاء ملفاتنا الرئيسية.

mkdir quickstart && cd quickstart
go mod init asynq-quickstart
mkdir client workers
touch client/client.go workers/workers.go

ثم، قم بتثبيت حزمة asynq.

go get -u github.com/hibiken/asynq

قبل أن نبدأ في كتابة الكود، دعنا نراجع بعض الأنواع الأساسية التي ستُستخدم في هذين البرنامجين.

خيارات اتصال Redis

تستخدم Asynq Redis كسمسار رسائلي. كل من client.go و workers.go تحتاج إلى الاتصال بـ Redis لعمليات القراءة والكتابة. سنستخدم RedisClientOpt لتحديد الاتصال بخادم Redis الذي يعمل محليًا.

redisConnOpt := asynq.RedisClientOpt{
    Addr: "localhost:6379",
    // يمكن تجاهل كلمة المرور إذا لم تكن مطلوبة
    Password: "mypassword",
    // استخدام رقم قاعدة بيانات مخصص ل Asynq.
    // بشكل افتراضي، يوفر Redis 16 قواعد بيانات (0 إلى 15).
    DB: 0,
}

المهام

في asynq، يتم تجميع وحدات العمل في نوع يُسمى Task، والذي يحتوي بشكل مفهومي على حقلين: Type وPayload.

// Type هو قيمة نصية تشير إلى نوع المهمة.
func (t *Task) Type() string

// Payload هو البيانات المطلوبة لتنفيذ المهمة.
func (t *Task) Payload() []byte

الآن بعد أن نظرنا في الأنواع الأساسية، دعنا نبدأ في كتابة برامجنا.

برنامج العميل

في client.go، سنقوم بإنشاء بعض المهام ووضعها في قائمة الانتظار باستخدام asynq.Client.

لإنشاء مهمة، يمكنك استخدام دالة NewTask وتمرير نوع وحمولة المهمة.

تقوم الطريقة Enqueue بأخذ مهمة وأي عدد من الخيارات. استخدم الخيارات ProcessIn أو ProcessAt لجدولة المهام للمعالجة المستقبلية.

// الحمولة المتعلقة بمهام البريد الإلكتروني.
type EmailTaskPayload struct {
    // معرف مستلم البريد الإلكتروني.
    UserID int
}

// client.go
func main() {
    client := asynq.NewClient(asynq.RedisClientOpt{Addr: "localhost:6379"})

    // إنشاء مهمة بنوع وحمولة.
    payload, err := json.Marshal(EmailTaskPayload{UserID: 42})
    if err != nil {
        log.Fatal(err)
    }
    t1 := asynq.NewTask("email:welcome", payload)

    t2 := asynq.NewTask("email:reminder", payload)

    // معالجة المهام فوراً.
    info, err := client.Enqueue(t1)
    if err != nil {
        log.Fatal(err)
    }
    log.Printf(" [*] تم وضع المهمة في قائمة الانتظار بنجاح: %+v", info)

    // معالجة المهام بعد 24 ساعة.
    info, err = client.Enqueue(t2, asynq.ProcessIn(24*time.Hour))
    if err != nil {
        log.Fatal(err)
    }
    log.Printf(" [*] تم وضع المهمة في قائمة الانتظار بنجاح: %+v", info)
}

هذا كل ما نحتاجه لبرنامج العميل.

برنامج العمال

في workers.go، سنقوم بإنشاء مثيل asynq.Server لبدء العمال.

تأخذ الدالة NewServer معلمتين وهما RedisConnOpt و Config.

يستخدم Config لضبط سلوك معالجة المهمات الخادم. يمكنك الرجوع إلى التوثيق لـConfig لمعرفة جميع الخيارات المتاحة للتكوين.

للبساطة، في هذا المثال، نحدد فقط التوافر.

// workers.go
func main() {
    srv := asynq.NewServe(
        asynq.RedisClientOpt{Addr: "localhost:6379"},
        asynq.Config{Concurrency: 10},
    )

    // ملاحظة: في الجزء التالي، سنقوم بتقديم ما هو `handler`.
    if err := srv.Run(handler); err != nil {
        log.Fatal(err)
    }
}

معامل الدالة (*Server).Run هو واجهة asynq.Handler، التي تحتوي على دالة ProcessTask.

type Handler interface {
    // إذا تمت معالجة المهمة بنجاح، يجب على ProcessTask إرجاع nil.
    // إذا قامت ProcessTask بإرجاع خطأ غير nil أو تسببت في حالة طارئة، سيتم إعادة محاولة المهمة لاحقًا.
    ProcessTask(context.Context, *Task) error
}

أبسط طريقة لتنفيذ معالج هو تعريف دالة بنفس التوقيع واستخدام نوع المحول asynq.HandlerFunc عند تمريرها إلى Run.

func handler(ctx context.Context, t *asynq.Task) error {
    switch t.Type() {
    case "email:welcome":
        var p EmailTaskPayload
        if err := json.Unmarshal(t.Payload(), &p); err != nil {
            return err
        }
        log.Printf(" [*] Sending welcome email to user %d", p.UserID)

    case "email:reminder":
        var p EmailTaskPayload
        if err := json.Unmarshal(t.Payload(), &p); err != nil {
            return err
        }
        log.Printf(" [*] Sending reminder email to user %d", p.UserID)

    default:
        return fmt.Errorf("نوع المهمة غير متوقع: %s", t.Type())
    }
    return nil
}

func main() {
    srv := asynq.NewServe(
        asynq.RedisClientOpt{Addr: "localhost:6379"},
        asynq.Config{Concurrency: 10},
    )

    // استخدام محول asynq.HandlerFunc لمعالجة الدالة
    if err := srv.Run(asynq.HandlerFunc(handler)); err != nil {
        log.Fatal(err)
    }
}

يمكننا الاستمرار في إضافة حالات التبديل لهذه الدالة المعالجة، ولكن في تطبيق فعلي، سيكون من الأكثر ملاءمة تحديد منطق كل حالة في دالة منفصلة.

لإعادة تنظيم كودنا، دعنا نستخدم ServeMux لإنشاء معالجنا. تماماً كـ ServeMux في حزمة "net/http"، يمكنك تسجيل معالج بالاتصال بـ HandleأوHandleFunc. تفي ServeMuxبمتطلبات الواجهةHandler، لذا يمكن تمريرها إلى (*Server).Run`.

// workers.go
func main() {
    srv := asynq.NewServer(
        asynq.RedisClientOpt{Addr: "localhost:6379"},
        asynq.Config{Concurrency: 10},
    )

    mux := asynq.NewServeMux()
    mux.HandleFunc("email:welcome", sendWelcomeEmail)
    mux.HandleFunc("email:reminder", sendReminderEmail)

    if err := srv.Run(mux); err != nil {
        log.Fatal(err)
    }
}

func sendWelcomeEmail(ctx context.Context, t *asynq.Task) error {
    var p EmailTaskPayload
    if err := json.Unmarshal(t.Payload(), &p); err != nil {
        return err
    }
    log.Printf(" [*] Sending welcome email to user %d", p.UserID)
    return nil
}

func sendReminderEmail(ctx context.Context, t *asynq.Task) error {
    var p EmailTaskPayload
    if err := json.Unmarshal(t.Payload(), &p); err != nil {
        return err
    }
    log.Printf(" [*] Sending reminder email to user %d", p.UserID)
    return nil
}

الآن بعد استخراج دوال المعالجة لكل نوع من المهام، يبدو الكود أكثر تنظيماً. ومع ذلك، الكود لا يزال ضمنيًا قليلاً. لدينا قيم سلسلة لأنواع المهام وأنواع الحمولة، ويجب تقديمها بحزمة عضوية. لنعيد تنظيم كودنا ونكتب حزمةً لتقديم المهام والتعامل معها. نبسطًا، سنقوم بإنشاء حزمة تُدعى task.

mkdir task && touch task/task.go
package task

import (
    "context"
    "encoding/json"
    "log"

    "github.com/hibiken/asynq"
)

// List of task types.
const (
    TypeWelcomeEmail  = "email:welcome"
    TypeReminderEmail = "email:reminder"
)

// Payload for any task related to emails.
type EmailTaskPayload struct {
    // ID of the email recipient.
    UserID int
}

func NewWelcomeEmailTask(id int) (*asynq.Task, error) {
    payload, err := json.Marshal(EmailTaskPayload{UserID: id})
    if err != nil {
        return nil, err
    }
    return asynq.NewTask(TypeWelcomeEmail, payload), nil
}

func NewReminderEmailTask(id int) (*asynq.Task, error) {
    payload, err := json.Marshal(EmailTaskPayload{UserID: id})
    if err != nil {
        return nil, err
    }
    return asynq.NewTask(TypeReminderEmail, payload), nil
}

func HandleWelcomeEmailTask(ctx context.Context, t *asynq.Task) error {
    var p EmailTaskPayload
    if err := json.Unmarshal(t.Payload(), &p); err != nil {
        return err
    }
    log.Printf(" [*] إرسال بريد ترحيبي إلى المستخدم %d", p.UserID)
    return nil
}

func HandleReminderEmailTask(ctx context.Context, t *asynq.Task) error {
    var p EmailTaskPayload
    if err := json.Unmarshal(t.Payload(), &p); err != nil {
        return err
    }
    log.Printf(" [*] إرسال بريد تذكير إلى المستخدم %d", p.UserID)
    return nil
}
func HandleWelcomeEmailTask(ctx context.Context, t *asynq.Task) error {
    var p emailTaskPayload
    if err := json.Unmarshal(t.Payload(), &p); err != nil {
        return err
    }
    log.Printf(" [*] محاولة إرسال بريد ترحيبي للمستخدم %d...", p.UserID)
    return fmt.Errorf("فشل في إرسال البريد الإلكتروني إلى المستخدم")
}

دعنا نعيد تشغيل برنامج العمال ونضيف مهمة إلى الصف.

go run workers/workers.go
go run client/client.go

إذا كنت تقوم بتشغيل asynq dash ، يجب أن ترى مهمة في حالة إعادة المحاولة (من خلال الانتقال إلى عرض تفاصيل الصف وتحديد علامة التبويب "إعادة المحاولة").

للتحقق من المهام التي تعاد فيها المحاولة ، يمكنك أيضًا تشغيل:

asynq task ls --queue=default --state=retry

سيتم سرد جميع المهام التي ستتم إعادة محاولتها في المستقبل. يشمل الإخراج الوقت المتوقع للتنفيذ التالي لكل مهمة.

بمجرد أن تنفد مهمة من محاولات إعادة المحاولة لها ، ستتحول إلى حالة مؤرشفة ولن تُعاد محاولتها مرة أخرى (مع القدرة على تشغيل المهام المؤرشفة يدويًا باستخدام أدوات CLI أو WebUI).

قبل الختام ، دعنا نقوم بإصلاح المعالج الخاص بنا.

func HandleWelcomeEmailTask(ctx context.Context, t *asynq.Task) error {
    var p emailTaskPayload
    if err := json.Unmarshal(t.Payload(), &p); err != nil {
        return err
    }
    log.Printf(" [*] إرسال بريد ترحيبي للمستخدم %d", p.UserID)
    return nil 
}