راهنمای شروع کار
در این آموزش، ما دو برنامه به نامهای client
و workers
خواهیم نوشت.
- برنامه
client.go
وظیفهی ایجاد و برنامهریزی وظایف برای پردازش به صورت ناهمزمان توسط موضوعهای پسزمینهای کارگر دارد. - برنامه
workers.go
چندین موضوع کارگر موازی را برای پردازش وظایف ایجاد شده توسط مشتری راهاندازی میکند.
این راهنما فرض میکند که شما یک سرور Redis را در localhost:6379
اجرا میکنید. قبل از شروع، لطفا اطمینان حاصل کنید که Redis نصب و در حال اجرا است.
ابتدا بیایید دو فایل اصلی خود را ایجاد کنیم.
mkdir quickstart && cd quickstart
go mod init asynq-quickstart
mkdir client workers
touch client/client.go workers/workers.go
سپس، بستهی asynq
را نصب کنید.
go get -u github.com/hibiken/asynq
قبل از شروع نوشتن کد، بیایید چند نوع اصلی را مرور کنیم که در این دو برنامه استفاده میشوند.
گزینههای اتصال Redis
Asynq از Redis به عنوان یک مشترک پیام استفاده میکند. هر دو client.go
و workers.go
برای عملیات خواندن و نوشتن به Redis نیاز دارند. ما از RedisClientOpt
برای مشخص کردن اتصال به سرور Redis محلی استفاده خواهیم کرد.
redisConnOpt := asynq.RedisClientOpt{
Addr: "localhost:6379",
// اگر نیازی نیست، رمزعبور میتواند حذف شود
Password: "mypassword",
// از یک شماره پایگاه داده اختصاصی برای asynq استفاده کنید.
// به طور پیشفرض، Redis 16 پایگاه داده را ارائه میدهد (از 0 تا 15).
DB: 0,
}
وظایف
در asynq
، واحدهای کاری در یک نوع به نام Task
بستهبندی میشوند، که به طور مفهومی دو فیلد به نامهای Type
و Payload
را دارد.
// Type یک مقدار رشتهای است که نوع وظیفه را نشان میدهد.
func (t *Task) Type() string
// Payload داده مورد نیاز برای اجرای وظیفه است.
func (t *Task) Payload() []byte
حال که به انواع اصلی نگاهی انداختیم، بیایید شروع به نوشتن برنامههای خود کنیم.
برنامه مشتری
در client.go
، ما برخی از وظایف را ایجاد کرده و آنها را با استفاده از asynq.Client
قرار میدهیم.
برای ایجاد یک وظیفه، میتوانید از تابع NewTask
استفاده کرده و نوع و payload وظیفه را منتقل کنید.
متد Enqueue
یک وظیفه و هر تعداد گزینه را میپذیرد. از گزینههای ProcessIn
یا ProcessAt
برای زمانبندی وظایف برای پردازش در آینده استفاده کنید.
// Payload مربوط به وظایف ایمیل.
type EmailTaskPayload struct {
// شناسه گیرنده ایمیل.
UserID int
}
// client.go
func main() {
client := asynq.NewClient(asynq.RedisClientOpt{Addr: "localhost:6379"})
// یک وظیفه با نوع و payload ایجاد کنید.
payload, err := json.Marshal(EmailTaskPayload{UserID: 42})
if err != nil {
log.Fatal(err)
}
t1 := asynq.NewTask("email:welcome", payload)
t2 := asynq.NewTask("email:reminder", payload)
// وظایف را فوراً پردازش کنید.
info, err := client.Enqueue(t1)
if err != nil {
log.Fatal(err)
}
log.Printf(" [*] وظیفه با موفقیت درونیابی شد: %+v", info)
// وظایف را پس از 24 ساعت پردازش کنید.
info, err = client.Enqueue(t2, asynq.ProcessIn(24*time.Hour))
if err != nil {
log.Fatal(err)
}
log.Printf(" [*] وظیفه با موفقیت درونیابی شد: %+v", info)
}
تمام کاری که برای برنامه مشتری ما نیاز است، انجام شده است.
برنامهی کارگران
در workers.go
، ما یک نمونه از asynq.Server
ایجاد خواهیم کرد تا کارگران را شروع کنیم.
تابع NewServer
، RedisConnOpt
و Config
را به عنوان پارامتر میگیرد.
Config
برای تنظیم رفتار پردازش وظیفههای سرور استفاده میشود.
می توانید به مستندات Config
مراجعه کنید تا در مورد تمام گزینههای پیکربندی موجود اطلاعات کسب کنید.
به عنوان مثال، در این مثال، ما فقط همروندی را مشخص میکنیم.
// workers.go
func main() {
srv := asynq.NewServe(
asynq.RedisClientOpt{Addr: "localhost:6379"},
asynq.Config{Concurrency: 10},
)
// توجه: در بخش زیر، ما معرفی میکنیم که `handler` چیست.
if err := srv.Run(handler); err != nil {
log.Fatal(err)
}
}
پارامتر متد (*Server).Run
یک رابط asynq.Handler
است که یک متد ProcessTask
دارد.
type Handler interface {
// اگر وظیفه با موفقیت پردازش شود، ProcessTask باید کیک نامشخص بازگرداند.
// اگر ProcessTask یک خطا غیرنامشخص یا باعث یک بحران شود، وظیفه بعداً تلاش مجدد خواهد شد.
ProcessTask(context.Context, *Task) error
}
سادهترین راه برای پیادهسازی یک handler، تعریف یک تابع با همان امضاء است و استفاده از نوع آداپتور asynq.HandlerFunc
هنگام انتقال آن به Run
است.
func handler(ctx context.Context, t *asynq.Task) error {
switch t.Type() {
case "email:welcome":
var p EmailTaskPayload
if err := json.Unmarshal(t.Payload(), &p); err != nil {
return err
}
log.Printf(" [*] ارسال ایمیل خوشآمد گویی به کاربر %d", p.UserID)
case "email:reminder":
var p EmailTaskPayload
if err := json.Unmarshal(t.Payload(), &p); err != nil {
return err
}
log.Printf(" [*] ارسال یادآوری ایمیل به کاربر %d", p.UserID)
default:
return fmt.Errorf("نوع وظیفه غیرمنتظره است: %s", t.Type())
}
return nil
}
func main() {
srv := asynq.NewServe(
asynq.RedisClientOpt{Addr: "localhost:6379"},
asynq.Config{Concurrency: 10},
)
// استفاده از آداپتور asynq.HandlerFunc برای پردازش تابع
if err := srv.Run(asynq.HandlerFunc(handler)); err != nil {
log.Fatal(err)
}
}
میتوانیم ادامه دهیم و موارد switch را برای این تابع handler اضافه کنیم، اما در یک برنامهی واقعی، راحتتر است که منطق مربوط به هر مورد را در یک تابع جداگانه تعریف کنیم.
برای بازطراحی کد ما، بیایید از ServeMux
استفاده کنیم تا handler خود را ایجاد کنیم. دقیقاً مانند ServeMux
از پکیج "net/http"
، میتوانید یک handler را با فراخوانی Handle
یا HandleFunc
ثبت کنید. ServeMux
رابط Handler
را برآورده میکند، بنابراین میتوان آن را به (*Server).Run
ارسال کرد.
// workers.go
func main() {
srv := asynq.NewServer(
asynq.RedisClientOpt{Addr: "localhost:6379"},
asynq.Config{Concurrency: 10},
)
mux := asynq.NewServeMux()
mux.HandleFunc("email:welcome", sendWelcomeEmail)
mux.HandleFunc("email:reminder", sendReminderEmail)
if err := srv.Run(mux); err != nil {
log.Fatal(err)
}
}
func sendWelcomeEmail(ctx context.Context, t *asynq.Task) error {
var p EmailTaskPayload
if err := json.Unmarshal(t.Payload(), &p); err != nil {
return err
}
log.Printf(" [*] ارسال ایمیل خوشآمد گویی به کاربر %d", p.UserID)
return nil
}
func sendReminderEmail(ctx context.Context, t *asynq.Task) error {
var p EmailTaskPayload
if err := json.Unmarshal(t.Payload(), &p); err != nil {
return err
}
log.Printf(" [*] ارسال یادآوری ایمیل به کاربر %d", p.UserID)
return nil
}
اکنون که ما توابع پردازش برای هر نوع وظیفه استخراج کردهایم، کد بیشتر منظم به نظر میرسد. با این حال، کد هنوز کمی ضمنی است. ما این مقادیر رشتهای برای انواع وظیفه و انواع بار را داریم و باید آنها را در یک بسته سازماندهی شده ببندیم. بیایید کد خود را بازطراحی کنیم و یک بسته برای بستهبندی ایجاد و پردازش وظیفه بنویسیم. ما به سادگی یک بسته به نام task
ایجاد میکنیم.
mkdir task && touch task/task.go
package task
import (
"context"
"encoding/json"
"log"
"github.com/hibiken/asynq"
)
// لیست انواع وظایف
const (
TypeWelcomeEmail = "ایمیل:خوشآمدگویی"
TypeReminderEmail = "ایمیل:یادآوری"
)
// داده ورودی برای هر وظیفه مربوط به ایمیل
type EmailTaskPayload struct {
// شناسه گیرنده ایمیل
UserID int
}
func NewWelcomeEmailTask(id int) (*asynq.Task, error) {
payload, err := json.Marshal(EmailTaskPayload{UserID: id})
if err != nil {
return nil, err
}
return asynq.NewTask(TypeWelcomeEmail, payload), nil
}
func NewReminderEmailTask(id int) (*asynq.Task, error) {
payload, err := json.Marshal(EmailTaskPayload{UserID: id})
if err != nil {
return nil, err
}
return asynq.NewTask(TypeReminderEmail, payload), nil
}
func HandleWelcomeEmailTask(ctx context.Context, t *asynq.Task) error {
var p EmailTaskPayload
if err := json.Unmarshal(t.Payload(), &p); err != nil {
return err
}
log.Printf(" [*] در حال ارسال ایمیل خوشآمدگویی به کاربر %d", p.UserID)
return nil
}
func HandleReminderEmailTask(ctx context.Context, t *asynq.Task) error {
var p EmailTaskPayload
if err := json.Unmarshal(t.Payload(), &p); err != nil {
return err
}
log.Printf(" [*] در حال ارسال ایمیل یادآوری به کاربر %d", p.UserID)
return nil
}
اکنون میتوانیم این بسته را در client.go
و workers.go
وارد کنیم.
```go
// client.go
func main() {
client := asynq.NewClient(asynq.RedisClientOpt{Addr: "localhost:6379"})
t1, err := task.NewWelcomeEmailTask(42)
if err != nil {
log.Fatal(err)
}
t2, err := task.NewReminderEmailTask(42)
if err != nil {
log.Fatal(err)
}
// وظیفه را فوراً در صف قرار دهید.
info, err := client.Enqueue(t1)
if err != nil {
log.Fatal(err)
}
log.Printf(" [*] وظیفه با موفقیت به صف اضافه شد: %+v", info)
// وظیفه را برای پردازش پس از ۲۴ ساعت در صف قرار دهید.
info, err = client.Enqueue(t2, asynq.ProcessIn(24*time.Hour))
if err != nil {
log.Fatal(err)
}
log.Printf(" [*] وظیفه با موفقیت به صف اضافه شد: %+v", info)
}
// workers.go
func main() {
srv := asynq.NewServer(
asynq.RedisClientOpt{Addr: "localhost:6379"},
asynq.Config{Concurrency: 10},
)
mux := asynq.NewServeMux()
mux.HandleFunc(task.TypeWelcomeEmail, task.HandleWelcomeEmailTask)
mux.HandleFunc(task.TypeReminderEmail, task.HandleReminderEmailTask)
if err := srv.Run(mux); err != nil {
log.Fatal(err)
}
}
اکنون کد بهتر به نظر میرسد!
اکنون که client
و workers
آماده هستند، میتوانیم این دو برنامه را اجرا کنیم. بیایید با اجرای برنامه client
وظایف را ایجاد و زمانبندی کنیم.
go run client/client.go
این دو وظیفه ایجاد خواهد کرد: یکی برای پردازش فوری و دیگری برای پردازش پس از ۲۴ ساعت.
بیایید از رابط خط فرمان asynq
برای بررسی وظایف استفاده کنیم.
asynq dash
باید بتوانید یک وظیفه در وضعیت قرارداده شده و دیگری در وضعیت زمانبندی شده را مشاهده کنید.
توجه: برای درک معنای هر وضعیت، لطفاً به چرخه عمر یک وظیفه مراجعه کنید.
در آخر، بیایید برنامه workers
را برای پردازش وظایف راهاندازی کنیم.
go run workers/workers.go
توجه: این برنامه تا زمانی که یک سیگنال برای خاتمه آن فرستاده نشود، خارج نخواهد شد. برای بهترین روشها برای خاتمه ایمن کارگرها در پسزمینه، لطفاً به صفحه ویکی سیگنالها مراجعه کنید.
باید بتوانید خروجیهای متنی مربوط به پردازش موفق وظایف را در ترمینال مشاهده کنید.
میتوانید دوباره برنامه client
را اجرا کنید تا ببینید چگونه کارگرها وظایف را پذیرش و پردازش میکنند.
اجرای اولیه وظیفه برای پردازش موفق همیشه اتفاق نمیافتد. به صورت پیشفرض، وظایف ناموفق به صورت ۲۵ بار با کاهشی نمایی تلاش میشوند. بیایید برنامه ما را بهروزرسانی کنیم تا یک خطا بازگشت داده و وضعیت ناموفق را شبیهسازی کنیم.
// tasks.go
func HandleWelcomeEmailTask(ctx context.Context, t *asynq.Task) error {
var p emailTaskPayload
if err := json.Unmarshal(t.Payload(), &p); err != nil {
return err
}
log.Printf(" [*] تلاش برای ارسال یک ایمیل خوشآمدگویی به کاربر %d...", p.UserID)
return fmt.Errorf("ارسال ایمیل به کاربر ناموفق بود")
}
بیایید برنامههای کارگر خود را مجدداهمراهاندازی کنیم و یک وظیفه به صف اضافه کنیم.
go run workers/workers.go
go run client/client.go
اگر شما asynq dash
را اجرا میکنید، باید یک وظیفه در وضعیت Retry (با ناوبری به نمای جزئیات صف و برجسته کردن تب "retry") ببینید.
برای بررسی وظایفی که در وضعیت تکرار هستند، میتوانید همچنین اجرا کنید:
asynq task ls --queue=default --state=retry
این کار همه وظایفی را که در آینده تکرار خواهند شد لیست میکند. خروجی شامل زمان مورد انتظار اجرای بعدی هر وظیفه است.
هنگامی که یک وظیفه تلاشهای تکرار خود را به سرانجام رسانده باشد، به وضعیت Archived منتقل میشود و دیگر دوباره تلاش برای تکرار نمیشود (شما همچنان میتوانید با استفاده از ابزارهای CLI یا WebUI وظایف بایگانی شده را به صورت دستی اجرا کنید).
قبل از پایان این آموزش، بیایید دستگیرهمان را اصلاح کنیم.
func HandleWelcomeEmailTask(ctx context.Context, t *asynq.Task) error {
var p emailTaskPayload
if err := json.Unmarshal(t.Payload(), &p); err != nil {
return err
}
log.Printf(" [*] ارسال یک ایمیل خوشآمدگویی به کاربر %d", p.UserID)
return nil
}