คู่มือเริ่มต้น

ในบทชี้สอนนี้ เราจะเขียนโปรแกรมสองตัว คือ client และ workers.

  • client.go จะสร้างและกำหนดการทำงานของงานที่จะถูกประมวลผลอย่างไม่ทันเพื่อโดยกระทั่งหน้าที่ของงาน
  • workers.go จะเริ่มต้นสายงานพนักงานหลายตัวที่ทำหน้าที่จัดการงานที่สร้างขึ้นโดยไคลเอนต์

คู่มือนี้สมมติว่าคุณกำลังทำงานกับเซิร์ฟเวอร์ Redis ที่ localhost:6379 ก่อนที่เริ่มต้น โปรดตรวจสอบให้แน่ใจว่า Redis ได้ถูกติดตั้งและกำลังทำงาน

เริ่มจากการสร้างไฟล์หลักสองแฟ้มของเราก่อน

mkdir quickstart && cd quickstart
go mod init asynq-quickstart
mkdir client workers
touch client/client.go workers/workers.go

จากนั้น ติดตั้งแพ็กเกจ asynq

go get -u github.com/hibiken/asynq

ก่อนที่เราจะเริ่มเขียนโค้ด ให้เราทบทวนบางประเภทหลักที่จะถูกใช้ในโปรแกรมสองตัวนี้

ตัวเลือกการเชื่อมต่อกับ Redis

Asynq ใช้ Redis เป็นต้นสัญญา ทั้ง client.go และ workers.go ต้องการเชื่อมต่อกับ Redis เพื่อดำเนินการอ่านและเขียน เราจะใช้ RedisClientOpt เพื่อระบุการเชื่อมต่อกับเซิร์ฟเวอร์ Redis ที่กำลังทำงานอยู่ตามสถานที่

redisConnOpt := asynq.RedisClientOpt{
    Addr: "localhost:6379",
    // รหัสผ่านสามารถถูกข้ามได้หากไม่จำเป็น
    Password: "mypassword",
    // ใช้หมายเลขฐานข้อมูลที่จองไว้สำหรับ asynq
    // โดยค่าเริ่มต้น Redis มีฐานข้อมูล 16 ฐานข้อมูล (0 ถึง 15)
    DB: 0,
}

งาน

ใน asynq หน้าที่งานถูกครอบคลุมอยู่ในชนิดที่เรียกว่า Task ซึ่งมีสองฟิลด์ที่เป็นความคิดที่มีรูปแบบ: Type และ Payload

// Type เป็นค่าสตริงที่ระบุประเภทของงาน
func (t *Task) Type() string

// Payload เป็นข้อมูลที่จำเป็นสำหรับการดำเนินการงาน
func (t *Task) Payload() []byte

ตอนนี้ที่เราได้มองไปที่ประเภทหลัก เรามาเริ่มเขียนโปรแกรมกันเถอะ

โปรแกรมไคลเอนต์

ใน client.go เราจะสร้างบางงานและนำเข้าไว้โดยใช้ asynq.Client

เมื่อต้องการสร้างงาน คุณสามารถใช้ฟังก์ชัน NewTask และส่งประเภทและข้อมูลที่จำเป็นสำหรับงาน

เมท็อด Enqueue จะใช้งานงานและจำนวนตัวเลือกใด ๆ ใช้ ProcessIn หรือ ProcessAt เพื่อกำหนดการทำงานของงานสำหรับการประมวลผลในอนาคต

// Payload ที่เกี่ยวข้องกับงานอีเมล
type EmailTaskPayload struct {
    // รหัสผู้รับอีเมล
    UserID int
}

// client.go
func main() {
    client := asynq.NewClient(asynq.RedisClientOpt{Addr: "localhost:6379"})

    // สร้างงานด้วยชื่อประเภทและข้อมูลที่จำเป็นสำหรับการดำเนินการของงาน
    payload, err := json.Marshal(EmailTaskPayload{UserID: 42})
    if err != nil {
        log.Fatal(err)
    }
    t1 := asynq.NewTask("email:welcome", payload)

    t2 := asynq.NewTask("email:reminder", payload)

    // ดำเนินการงานทันที
    info, err := client.Enqueue(t1)
    if err != nil {
        log.Fatal(err)
    }
    log.Printf(" [*] นำเข้างานเรียบร้อย: %+v", info)

    // อัปเดตงานตามหลัง 24 ชั่วโมง
    info, err = client.Enqueue(t2, asynq.ProcessIn(24*time.Hour))
    if err != nil {
        log.Fatal(err)
    }
    log.Printf(" [*] นำเข้างานเรียบร้อย: %+v", info)
}

นั่นคือทุกอย่างที่เราต้องการสำหรับโปรแกรมไคลเอนต์ของเราแล้ว

โปรแกรม Workers

ใน workers.go เราจะสร้างอินสแตนซ์ asynq.Server เพื่อเริ่มต้น workers

ฟังก์ชัน NewServer รับ RedisConnOpt และ Config เป็นพารามิเตอร์

Config ใช้เพื่อปรับแต่งพฤติกรรมการประมวลผลงานของเซิร์ฟเวอร์ คุณสามารถอ้างถึง เอกสาร Config เพื่อเรียนรู้เกี่ยวกับตัวเลือกการปรับแต่งทั้งหมดที่ใช้ได้

เพื่อความง่ายของตัวอย่างนี้ เราจะระบุเฉพาะการประสิทธิภาพ

// workers.go
func main() {
    srv := asynq.NewServe(
        asynq.RedisClientOpt{Addr: "localhost:6379"},
        asynq.Config{Concurrency: 10},
    )

    // หมายเหตุ: ในส่วนถัดไป เราจะแนะนำว่า `handler` คืออะไร
    if err := srv.Run(handler); err != nil {
        log.Fatal(err)
    }
}

พารามิเตอร์ของเมธอด (*Server).Run เป็นอินเทอร์เฟซ asynq.Handler ซึ่งมีเมธอด ProcessTask

type Handler interface {
    // หากงานถูกประมวลผลเรียบร้อยแล้ว ProcessTask ควรส่งกลับ nil
    // หาก ProcessTask ส่งกลับข้อผิดพลาดที่ไม่ใช่ nil หรือทำให้เกิดความสับสน งานจะทำงานซ้ำในภายหลัง
    ProcessTask(context.Context, *Task) error
}

วิธีที่ง่ายที่สุดในการสร้าง handler คือการกำหนดฟังก์ชันที่มีลักษณะเดียวกัน และใช้ชนิดอะแดปเตอร์ asynq.HandlerFunc เมื่อส่งต่อไปยัง Run

func handler(ctx context.Context, t *asynq.Task) error {
    switch t.Type() {
    case "email:welcome":
        var p EmailTaskPayload
        if err := json.Unmarshal(t.Payload(), &p); err != nil {
            return err
        }
        log.Printf(" [*] ส่งอีเมลต้อนรับสู่ผู้ใช้ %d", p.UserID)

    case "email:reminder":
        var p EmailTaskPayload
        if err := json.Unmarshal(t.Payload(), &p); err != nil {
            return err
        }
        log.Printf(" [*] ส่งอีเมลเตือนสู่ผู้ใช้ %d", p.UserID)

    default:
        return fmt.Errorf("ประเภทงานที่ไม่คาดคิด: %s", t.Type())
    }
    return nil
}

func main() {
    srv := asynq.NewServe(
        asynq.RedisClientOpt{Addr: "localhost:6379"},
        asynq.Config{Concurrency: 10},
    )

    // ใช้ชนิดอะแดปเตอร์ asynq.HandlerFunc ในการจัดการฟังก์ชัน
    if err := srv.Run(asynq.HandlerFunc(handler)); err != nil {
        log.Fatal(err)
    }
}

เราสามารถดำเนินการเพิ่ม switch cases สำหรับฟังก์ชันช่วยเหลือนี้ต่อไป แต่ในแอปพลิเคชันจริง จะสะดวกยิ่งกว่าถ้ากำหนดตรรกะสำหรับแต่ละกรณีล่ะเท่านั้น

เพื่อแก้ไขโค้ดของเรา เราจะใช้ ServeMux ในการสร้าง handler เราเพียงแค่สร้าง ServeMux จากแพ็คเกจ "net/http" คุณสามารถลงทะเบียน handler โดยเรียกใช้ Handle หรือ HandleFunc ServeMux ทำให้พอภูมิต่ออินเทอร์เฟซ Handler ดังนั้น สามารถส่งต่อไปยัง (*Server).Run

// workers.go
func main() {
    srv := asynq.NewServer(
        asynq.RedisClientOpt{Addr: "localhost:6379"},
        asynq.Config{Concurrency: 10},
    )

    mux := asynq.NewServeMux()
    mux.HandleFunc("email:welcome", sendWelcomeEmail)
    mux.HandleFunc("email:reminder", sendReminderEmail)

    if err := srv.Run(mux); err != nil {
        log.Fatal(err)
    }
}

func sendWelcomeEmail(ctx context.Context, t *asynq.Task) error {
    var p EmailTaskPayload
    if err := json.Unmarshal(t.Payload(), &p); err != nil {
        return err
    }
    log.Printf(" [*] ส่งอีเมลต้อนรับสู่ผู้ใช้ %d", p.UserID)
    return nil
}

func sendReminderEmail(ctx context.Context, t *asynq.Task) error {
    var p EmailTaskPayload
    if err := json.Unmarshal(t.Payload(), &p); err != nil {
        return err
    }
    log.Printf(" [*] ส่งอีเมลเตือนสู่ผู้ใช้ %d", p.UserID)
    return nil
}

ตอนนี้ที่เราได้แยกฟังก์ชันการจัดการสำหรับแต่ละประเภทงาน โค้ดดูมีระเบียบขึ้น อย่างไรก็ตาม โค้ดยังมีความไม่โดดเด่น

เรามีค่าสตริงสำหรับประเภทงานและประเภท payload เราควรที่จะห่วงยืดเหมือนใจในแพกเกจเพียงแค่สร้างแพ็คเกจเรียกว่า task ทำเพียงแค่สร้างแพ็คเกจมีชื่อว่า task และไฟล์ที่เรียกว่า task.go

mkdir task && touch task/task.go
package task

import (
	"context"
	"encoding/json"
	"log"

	"github.com/hibiken/asynq"
)

// List of task types.
const (
	TypeWelcomeEmail  = "email:welcome"
	TypeReminderEmail = "email:reminder"
)

// Payload for any task related to emails.
type EmailTaskPayload struct {
	// ID of the email recipient.
	UserID int
}

func NewWelcomeEmailTask(id int) (*asynq.Task, error) {
	payload, err := json.Marshal(EmailTaskPayload{UserID: id})
	if err != nil {
		return nil, err
	}
	return asynq.NewTask(TypeWelcomeEmail, payload), nil
}

func NewReminderEmailTask(id int) (*asynq.Task, error) {
	payload, err := json.Marshal(EmailTaskPayload{UserID: id})
	if err != nil {
		return nil, err
	}
	return asynq.NewTask(TypeReminderEmail, payload), nil
}

func HandleWelcomeEmailTask(ctx context.Context, t *asynq.Task) error {
	var p EmailTaskPayload
	if err := json.Unmarshal(t.Payload(), &p); err != nil {
		return err
	}
	log.Printf(" [*] ส่งอีเมลต้อนรับให้กับผู้ใช้ %d", p.UserID)
	return nil
}

func HandleReminderEmailTask(ctx context.Context, t *asynq.Task) error {
	var p EmailTaskPayload
	if err := json.Unmarshal(t.Payload(), &p); err != nil {
		return err
	}
	log.Printf(" [*] ส่งอีเมลเตือนตามให้กับผู้ใช้ %d", p.UserID)
	return nil
}

ตอนนี้เราสามารถ import แพ็คเกจนี้ใน client.go และ workers.go แล้ว

```go
// client.go
func main() {
    client := asynq.NewClient(asynq.RedisClientOpt{Addr: "localhost:6379"})

    t1, err := task.NewWelcomeEmailTask(42)
    if err != nil {
        log.Fatal(err)
    }

    t2, err := task.NewReminderEmailTask(42)
    if err != nil {
        log.Fatal(err)
    }

    // นำ task ไปใส่คิวทันที
    info, err := client.Enqueue(t1)
    if err != nil {
        log.Fatal(err)
    }
    log.Printf(" [*] นำ task ไปใส่คิวสำเร็จ: %+v", info)

    // นำ task ไปใส่คิวเพื่อดำเนินการหลังจาก 24 ชั่วโมง
    info, err = client.Enqueue(t2, asynq.ProcessIn(24*time.Hour))
    if err != nil {
        log.Fatal(err)
    }
    log.Printf(" [*] นำ task ไปใส่คิวสำเร็จ: %+v", info)
}
// workers.go
func main() {
    srv := asynq.NewServer(
        asynq.RedisClientOpt{Addr: "localhost:6379"},
        asynq.Config{Concurrency: 10},
    )

    mux := asynq.NewServeMux()
    mux.HandleFunc(task.TypeWelcomeEmail, task.HandleWelcomeEmailTask)
    mux.HandleFunc(task.TypeReminderEmail, task.HandleReminderEmailTask)

    if err := srv.Run(mux); err != nil {
        log.Fatal(err)
    }
}

ตอนนี้โค้ดดูดีขึ้น!

ตอนนี้ที่เรามี client และ workers พร้อมแล้ว เราสามารถรันโปรแกรมเหล่านี้ได้ โดยเริ่มจากรันโปรแกรม client เพื่อสร้างและกำหนดงาน

go run client/client.go

นี่จะสร้างงานสองอัน: หนึ่งสำหรับดำเนินการทันที และอีกอันสำหรับดำเนินการหลังจาก 24 ชั่วโมง เราสามารถใช้ asynq command-line interface เพื่อตรวจสอบงาน

asynq dash

คุณควรจะเห็นงานหนึ่งอยู่ในสถานะ Enqueued และงานอีกอันอยู่ในสถานะ Scheduled

หมายเหตุ: สำหรับแบ่งสถานะแต่ละอย่าง กรุณาอ้างถึง Task Lifecycle.

สุดท้ายแล้ว ให้เริ่มโปรแกรม workers เพื่อดำเนินการงาน

go run workers/workers.go

หมายเหตุ: โปรแกรมนี้จะไม่หลุดจากการทำงานจนกว่าคุณจะส่งสัญญาณเพื่อสิ้นสุดการทำงาน สำหรับ best practices เกี่ยวกับการสิ้นสุดการทำงานของ background workers โปรดอ้างอิงที่ Signals Wiki page.

คุณควรเห็นผลลัพธ์ในการดำเนินการงานที่สำเร็จผ่านทางเทอร์มินอล

คุณสามารถทดสอบโปรแกรม client อีกครั้งเพื่อดูว่างานจะได้รับและดำเนินการผ่านไหม

การที่งานอาจไม่สามารถดำเนินการผ่านได้ในครั้งแรกไม่ได้เป็นสิ่งแปลกที่เลย โดยค่าเริ่มต้น งานที่ไม่สำเร็จจะถูกทำซ้ำ 25 ครั้ง ด้วย exponential backoff algorithm ให้เรามาอัปเดต handler เพื่อส่งคืน error เพื่อจำลองสถานการณ์ที่ไม่สำเร็จ

// tasks.go
```go
func HandleWelcomeEmailTask(ctx context.Context, t *asynq.Task) error {
    var p emailTaskPayload
    if err := json.Unmarshal(t.Payload(), &p); err != nil {
        return err
    }
    log.Printf(" [*] พยายามส่งอีเมลต้อนรับให้กับผู้ใช้ %d...", p.UserID)
    return fmt.Errorf("การส่งอีเมลถึงผู้ใช้ล้มเหลว")
}

ให้เริ่มต้นโปรแกรม workers และส่งงานให้กับทีมงานอีกครั้ง

go run workers/workers.go
go run client/client.go

ถ้าคุณกำลังใช้ asynq dash คุณจะเห็นงานที่อยู่ในสถานะ Retry (โดยการไปที่มุมมองรายละเอียดของคิวและเน้นที่แท็บ "retry")

หากต้องการตรวจสอบงานที่กำลังอยู่ในสถานะ retry คุณยังสามารถใช้คำสั่งนี้:

asynq task ls --queue=default --state=retry

นี้จะแสดงรายการของงานทั้งหมดที่จะลองใหม่ในอนาคต ผลลัพธ์รวมถึงเวลาที่คาดหวังในการเรียกใช้ครั้งต่อไปสำหรับแต่ละงาน

เมื่องานเสร็จสิ้นการลองใหม่ของมัน งานนั้นจะเปลี่ยนสถานะเป็น Archived และจะไม่มีการลองใหม่อีก (คุณยังสามารถเรียกใช้งานที่เก็บถาวรด้วยเครื่องมือ CLI หรือ WebUI)

ก่อนจบบทความนี้ เรามาแก้ไข handler กัน

func HandleWelcomeEmailTask(ctx context.Context, t *asynq.Task) error {
    var p emailTaskPayload
    if err := json.Unmarshal(t.Payload(), &p); err != nil {
        return err
    }
    log.Printf(" [*] ส่งอีเมลต้อนรับให้ผู้ใช้ %d", p.UserID)
    return nil 
}