คู่มือเริ่มต้น
ในบทชี้สอนนี้ เราจะเขียนโปรแกรมสองตัว คือ client
และ workers
.
-
client.go
จะสร้างและกำหนดการทำงานของงานที่จะถูกประมวลผลอย่างไม่ทันเพื่อโดยกระทั่งหน้าที่ของงาน -
workers.go
จะเริ่มต้นสายงานพนักงานหลายตัวที่ทำหน้าที่จัดการงานที่สร้างขึ้นโดยไคลเอนต์
คู่มือนี้สมมติว่าคุณกำลังทำงานกับเซิร์ฟเวอร์ Redis ที่ localhost:6379
ก่อนที่เริ่มต้น โปรดตรวจสอบให้แน่ใจว่า Redis ได้ถูกติดตั้งและกำลังทำงาน
เริ่มจากการสร้างไฟล์หลักสองแฟ้มของเราก่อน
mkdir quickstart && cd quickstart
go mod init asynq-quickstart
mkdir client workers
touch client/client.go workers/workers.go
จากนั้น ติดตั้งแพ็กเกจ asynq
go get -u github.com/hibiken/asynq
ก่อนที่เราจะเริ่มเขียนโค้ด ให้เราทบทวนบางประเภทหลักที่จะถูกใช้ในโปรแกรมสองตัวนี้
ตัวเลือกการเชื่อมต่อกับ Redis
Asynq ใช้ Redis เป็นต้นสัญญา ทั้ง client.go
และ workers.go
ต้องการเชื่อมต่อกับ Redis เพื่อดำเนินการอ่านและเขียน เราจะใช้ RedisClientOpt
เพื่อระบุการเชื่อมต่อกับเซิร์ฟเวอร์ Redis ที่กำลังทำงานอยู่ตามสถานที่
redisConnOpt := asynq.RedisClientOpt{
Addr: "localhost:6379",
// รหัสผ่านสามารถถูกข้ามได้หากไม่จำเป็น
Password: "mypassword",
// ใช้หมายเลขฐานข้อมูลที่จองไว้สำหรับ asynq
// โดยค่าเริ่มต้น Redis มีฐานข้อมูล 16 ฐานข้อมูล (0 ถึง 15)
DB: 0,
}
งาน
ใน asynq
หน้าที่งานถูกครอบคลุมอยู่ในชนิดที่เรียกว่า Task
ซึ่งมีสองฟิลด์ที่เป็นความคิดที่มีรูปแบบ: Type
และ Payload
// Type เป็นค่าสตริงที่ระบุประเภทของงาน
func (t *Task) Type() string
// Payload เป็นข้อมูลที่จำเป็นสำหรับการดำเนินการงาน
func (t *Task) Payload() []byte
ตอนนี้ที่เราได้มองไปที่ประเภทหลัก เรามาเริ่มเขียนโปรแกรมกันเถอะ
โปรแกรมไคลเอนต์
ใน client.go
เราจะสร้างบางงานและนำเข้าไว้โดยใช้ asynq.Client
เมื่อต้องการสร้างงาน คุณสามารถใช้ฟังก์ชัน NewTask
และส่งประเภทและข้อมูลที่จำเป็นสำหรับงาน
เมท็อด Enqueue
จะใช้งานงานและจำนวนตัวเลือกใด ๆ ใช้ ProcessIn
หรือ ProcessAt
เพื่อกำหนดการทำงานของงานสำหรับการประมวลผลในอนาคต
// Payload ที่เกี่ยวข้องกับงานอีเมล
type EmailTaskPayload struct {
// รหัสผู้รับอีเมล
UserID int
}
// client.go
func main() {
client := asynq.NewClient(asynq.RedisClientOpt{Addr: "localhost:6379"})
// สร้างงานด้วยชื่อประเภทและข้อมูลที่จำเป็นสำหรับการดำเนินการของงาน
payload, err := json.Marshal(EmailTaskPayload{UserID: 42})
if err != nil {
log.Fatal(err)
}
t1 := asynq.NewTask("email:welcome", payload)
t2 := asynq.NewTask("email:reminder", payload)
// ดำเนินการงานทันที
info, err := client.Enqueue(t1)
if err != nil {
log.Fatal(err)
}
log.Printf(" [*] นำเข้างานเรียบร้อย: %+v", info)
// อัปเดตงานตามหลัง 24 ชั่วโมง
info, err = client.Enqueue(t2, asynq.ProcessIn(24*time.Hour))
if err != nil {
log.Fatal(err)
}
log.Printf(" [*] นำเข้างานเรียบร้อย: %+v", info)
}
นั่นคือทุกอย่างที่เราต้องการสำหรับโปรแกรมไคลเอนต์ของเราแล้ว
โปรแกรม Workers
ใน workers.go
เราจะสร้างอินสแตนซ์ asynq.Server
เพื่อเริ่มต้น workers
ฟังก์ชัน NewServer
รับ RedisConnOpt
และ Config
เป็นพารามิเตอร์
Config
ใช้เพื่อปรับแต่งพฤติกรรมการประมวลผลงานของเซิร์ฟเวอร์
คุณสามารถอ้างถึง เอกสาร Config
เพื่อเรียนรู้เกี่ยวกับตัวเลือกการปรับแต่งทั้งหมดที่ใช้ได้
เพื่อความง่ายของตัวอย่างนี้ เราจะระบุเฉพาะการประสิทธิภาพ
// workers.go
func main() {
srv := asynq.NewServe(
asynq.RedisClientOpt{Addr: "localhost:6379"},
asynq.Config{Concurrency: 10},
)
// หมายเหตุ: ในส่วนถัดไป เราจะแนะนำว่า `handler` คืออะไร
if err := srv.Run(handler); err != nil {
log.Fatal(err)
}
}
พารามิเตอร์ของเมธอด (*Server).Run
เป็นอินเทอร์เฟซ asynq.Handler
ซึ่งมีเมธอด ProcessTask
type Handler interface {
// หากงานถูกประมวลผลเรียบร้อยแล้ว ProcessTask ควรส่งกลับ nil
// หาก ProcessTask ส่งกลับข้อผิดพลาดที่ไม่ใช่ nil หรือทำให้เกิดความสับสน งานจะทำงานซ้ำในภายหลัง
ProcessTask(context.Context, *Task) error
}
วิธีที่ง่ายที่สุดในการสร้าง handler คือการกำหนดฟังก์ชันที่มีลักษณะเดียวกัน และใช้ชนิดอะแดปเตอร์ asynq.HandlerFunc
เมื่อส่งต่อไปยัง Run
func handler(ctx context.Context, t *asynq.Task) error {
switch t.Type() {
case "email:welcome":
var p EmailTaskPayload
if err := json.Unmarshal(t.Payload(), &p); err != nil {
return err
}
log.Printf(" [*] ส่งอีเมลต้อนรับสู่ผู้ใช้ %d", p.UserID)
case "email:reminder":
var p EmailTaskPayload
if err := json.Unmarshal(t.Payload(), &p); err != nil {
return err
}
log.Printf(" [*] ส่งอีเมลเตือนสู่ผู้ใช้ %d", p.UserID)
default:
return fmt.Errorf("ประเภทงานที่ไม่คาดคิด: %s", t.Type())
}
return nil
}
func main() {
srv := asynq.NewServe(
asynq.RedisClientOpt{Addr: "localhost:6379"},
asynq.Config{Concurrency: 10},
)
// ใช้ชนิดอะแดปเตอร์ asynq.HandlerFunc ในการจัดการฟังก์ชัน
if err := srv.Run(asynq.HandlerFunc(handler)); err != nil {
log.Fatal(err)
}
}
เราสามารถดำเนินการเพิ่ม switch cases สำหรับฟังก์ชันช่วยเหลือนี้ต่อไป แต่ในแอปพลิเคชันจริง จะสะดวกยิ่งกว่าถ้ากำหนดตรรกะสำหรับแต่ละกรณีล่ะเท่านั้น
เพื่อแก้ไขโค้ดของเรา เราจะใช้ ServeMux
ในการสร้าง handler เราเพียงแค่สร้าง ServeMux
จากแพ็คเกจ "net/http"
คุณสามารถลงทะเบียน handler โดยเรียกใช้ Handle
หรือ HandleFunc
ServeMux
ทำให้พอภูมิต่ออินเทอร์เฟซ Handler
ดังนั้น สามารถส่งต่อไปยัง (*Server).Run
// workers.go
func main() {
srv := asynq.NewServer(
asynq.RedisClientOpt{Addr: "localhost:6379"},
asynq.Config{Concurrency: 10},
)
mux := asynq.NewServeMux()
mux.HandleFunc("email:welcome", sendWelcomeEmail)
mux.HandleFunc("email:reminder", sendReminderEmail)
if err := srv.Run(mux); err != nil {
log.Fatal(err)
}
}
func sendWelcomeEmail(ctx context.Context, t *asynq.Task) error {
var p EmailTaskPayload
if err := json.Unmarshal(t.Payload(), &p); err != nil {
return err
}
log.Printf(" [*] ส่งอีเมลต้อนรับสู่ผู้ใช้ %d", p.UserID)
return nil
}
func sendReminderEmail(ctx context.Context, t *asynq.Task) error {
var p EmailTaskPayload
if err := json.Unmarshal(t.Payload(), &p); err != nil {
return err
}
log.Printf(" [*] ส่งอีเมลเตือนสู่ผู้ใช้ %d", p.UserID)
return nil
}
ตอนนี้ที่เราได้แยกฟังก์ชันการจัดการสำหรับแต่ละประเภทงาน โค้ดดูมีระเบียบขึ้น อย่างไรก็ตาม โค้ดยังมีความไม่โดดเด่น
เรามีค่าสตริงสำหรับประเภทงานและประเภท payload เราควรที่จะห่วงยืดเหมือนใจในแพกเกจเพียงแค่สร้างแพ็คเกจเรียกว่า task
ทำเพียงแค่สร้างแพ็คเกจมีชื่อว่า task
และไฟล์ที่เรียกว่า task.go
mkdir task && touch task/task.go
package task
import (
"context"
"encoding/json"
"log"
"github.com/hibiken/asynq"
)
// List of task types.
const (
TypeWelcomeEmail = "email:welcome"
TypeReminderEmail = "email:reminder"
)
// Payload for any task related to emails.
type EmailTaskPayload struct {
// ID of the email recipient.
UserID int
}
func NewWelcomeEmailTask(id int) (*asynq.Task, error) {
payload, err := json.Marshal(EmailTaskPayload{UserID: id})
if err != nil {
return nil, err
}
return asynq.NewTask(TypeWelcomeEmail, payload), nil
}
func NewReminderEmailTask(id int) (*asynq.Task, error) {
payload, err := json.Marshal(EmailTaskPayload{UserID: id})
if err != nil {
return nil, err
}
return asynq.NewTask(TypeReminderEmail, payload), nil
}
func HandleWelcomeEmailTask(ctx context.Context, t *asynq.Task) error {
var p EmailTaskPayload
if err := json.Unmarshal(t.Payload(), &p); err != nil {
return err
}
log.Printf(" [*] ส่งอีเมลต้อนรับให้กับผู้ใช้ %d", p.UserID)
return nil
}
func HandleReminderEmailTask(ctx context.Context, t *asynq.Task) error {
var p EmailTaskPayload
if err := json.Unmarshal(t.Payload(), &p); err != nil {
return err
}
log.Printf(" [*] ส่งอีเมลเตือนตามให้กับผู้ใช้ %d", p.UserID)
return nil
}
ตอนนี้เราสามารถ import แพ็คเกจนี้ใน client.go
และ workers.go
แล้ว
```go
// client.go
func main() {
client := asynq.NewClient(asynq.RedisClientOpt{Addr: "localhost:6379"})
t1, err := task.NewWelcomeEmailTask(42)
if err != nil {
log.Fatal(err)
}
t2, err := task.NewReminderEmailTask(42)
if err != nil {
log.Fatal(err)
}
// นำ task ไปใส่คิวทันที
info, err := client.Enqueue(t1)
if err != nil {
log.Fatal(err)
}
log.Printf(" [*] นำ task ไปใส่คิวสำเร็จ: %+v", info)
// นำ task ไปใส่คิวเพื่อดำเนินการหลังจาก 24 ชั่วโมง
info, err = client.Enqueue(t2, asynq.ProcessIn(24*time.Hour))
if err != nil {
log.Fatal(err)
}
log.Printf(" [*] นำ task ไปใส่คิวสำเร็จ: %+v", info)
}
// workers.go
func main() {
srv := asynq.NewServer(
asynq.RedisClientOpt{Addr: "localhost:6379"},
asynq.Config{Concurrency: 10},
)
mux := asynq.NewServeMux()
mux.HandleFunc(task.TypeWelcomeEmail, task.HandleWelcomeEmailTask)
mux.HandleFunc(task.TypeReminderEmail, task.HandleReminderEmailTask)
if err := srv.Run(mux); err != nil {
log.Fatal(err)
}
}
ตอนนี้โค้ดดูดีขึ้น!
ตอนนี้ที่เรามี client
และ workers
พร้อมแล้ว เราสามารถรันโปรแกรมเหล่านี้ได้ โดยเริ่มจากรันโปรแกรม client
เพื่อสร้างและกำหนดงาน
go run client/client.go
นี่จะสร้างงานสองอัน: หนึ่งสำหรับดำเนินการทันที และอีกอันสำหรับดำเนินการหลังจาก 24 ชั่วโมง
เราสามารถใช้ asynq
command-line interface เพื่อตรวจสอบงาน
asynq dash
คุณควรจะเห็นงานหนึ่งอยู่ในสถานะ Enqueued และงานอีกอันอยู่ในสถานะ Scheduled
หมายเหตุ: สำหรับแบ่งสถานะแต่ละอย่าง กรุณาอ้างถึง Task Lifecycle.
สุดท้ายแล้ว ให้เริ่มโปรแกรม workers
เพื่อดำเนินการงาน
go run workers/workers.go
หมายเหตุ: โปรแกรมนี้จะไม่หลุดจากการทำงานจนกว่าคุณจะส่งสัญญาณเพื่อสิ้นสุดการทำงาน สำหรับ best practices เกี่ยวกับการสิ้นสุดการทำงานของ background workers โปรดอ้างอิงที่ Signals Wiki page.
คุณควรเห็นผลลัพธ์ในการดำเนินการงานที่สำเร็จผ่านทางเทอร์มินอล
คุณสามารถทดสอบโปรแกรม client
อีกครั้งเพื่อดูว่างานจะได้รับและดำเนินการผ่านไหม
การที่งานอาจไม่สามารถดำเนินการผ่านได้ในครั้งแรกไม่ได้เป็นสิ่งแปลกที่เลย โดยค่าเริ่มต้น งานที่ไม่สำเร็จจะถูกทำซ้ำ 25 ครั้ง ด้วย exponential backoff algorithm ให้เรามาอัปเดต handler เพื่อส่งคืน error เพื่อจำลองสถานการณ์ที่ไม่สำเร็จ
// tasks.go
```go
func HandleWelcomeEmailTask(ctx context.Context, t *asynq.Task) error {
var p emailTaskPayload
if err := json.Unmarshal(t.Payload(), &p); err != nil {
return err
}
log.Printf(" [*] พยายามส่งอีเมลต้อนรับให้กับผู้ใช้ %d...", p.UserID)
return fmt.Errorf("การส่งอีเมลถึงผู้ใช้ล้มเหลว")
}
ให้เริ่มต้นโปรแกรม workers และส่งงานให้กับทีมงานอีกครั้ง
go run workers/workers.go
go run client/client.go
ถ้าคุณกำลังใช้ asynq dash
คุณจะเห็นงานที่อยู่ในสถานะ Retry (โดยการไปที่มุมมองรายละเอียดของคิวและเน้นที่แท็บ "retry")
หากต้องการตรวจสอบงานที่กำลังอยู่ในสถานะ retry คุณยังสามารถใช้คำสั่งนี้:
asynq task ls --queue=default --state=retry
นี้จะแสดงรายการของงานทั้งหมดที่จะลองใหม่ในอนาคต ผลลัพธ์รวมถึงเวลาที่คาดหวังในการเรียกใช้ครั้งต่อไปสำหรับแต่ละงาน
เมื่องานเสร็จสิ้นการลองใหม่ของมัน งานนั้นจะเปลี่ยนสถานะเป็น Archived และจะไม่มีการลองใหม่อีก (คุณยังสามารถเรียกใช้งานที่เก็บถาวรด้วยเครื่องมือ CLI หรือ WebUI)
ก่อนจบบทความนี้ เรามาแก้ไข handler กัน
func HandleWelcomeEmailTask(ctx context.Context, t *asynq.Task) error {
var p emailTaskPayload
if err := json.Unmarshal(t.Payload(), &p); err != nil {
return err
}
log.Printf(" [*] ส่งอีเมลต้อนรับให้ผู้ใช้ %d", p.UserID)
return nil
}