หน้านี้จะแนะนำคุณลักษณะการรวมงานของ Asynq
ภาพรวม
การรวมงานช่วยให้คุณสามารถจัดคิวงานหลายๆ งานต่อเนื่องกัน แทนที่จะส่งมันไปทีละอันให้กับ "Handler" คุณลักษณะนี้ช่วยให้คุณสามารถรวมการกระทำหลายๆ อันต่อเนื่องกันเข้าไปอย่างเดียว เพื่อประหยัดค่าใช้จ่าย ทำให้แคชมีประสิทธิภาพ หรือแบทช์การแจ้งเตือน
หลักการทำงาน
เพื่อใช้คุณลักษณะการรวมงาน คุณต้องจัดคิวงานด้วย ชื่อกลุ่ม เดียวกันลงในคิวเดียวกัน งานที่จัดคิวด้วยคู่ (queue, group)
เดียวกันจะถูกรวมเข้าไปทำงานเป็นงานเดียวโดย GroupAggregator
ที่คุณกำหนด และงานที่ถูกรวมเข้าไปจะถูกส่งให้ Handler
เมื่อสร้างงานที่ถูกรวมเข้าไป เซิร์ฟเวอร์ Asynq จะรองานเพิ่มเติมจนกว่า ช่วงเวลาผ่อนผัน ที่กำหนดไว้จะหมด ทุกครั้งที่จัดคิวงานใหม่ด้วยคู่ (queue, group)
เดียวกัน ช่วงเวลาผ่อนผันจะถูกอัพเดต
ช่วงเวลาผ่อนผันมีขีดจำกัดที่สามารถกำหนดเอง: คุณสามารถตั้งค่า เวลาล่าช้าสูงสุดในการรวมงาน หลังจากนั้น เซิร์ฟเวอร์ Asynq จะละไปกำหนดช่วงเวลาผ่อนผันที่เหลือและรวมงาน
คุณ c่าย ตั้งค่า จำนวนงานสูงสุด ที่สามารถรวมได้ในครั้งเดียวกัน ถ้าครบจำนวนแล้ว เซิร์ฟเวอร์ Asynq จะรวมงานทันที
หมายเหตุ: การตั้งเวลางานและการรวมงานเป็นคุณลักษณะที่ขัดแย้งกัน ซึ่งการตั้งเวลาจะมีลำดับความสำคัญมากกว่าการรวมงาน
ตัวอย่างการใช้งานอย่างรวดเร็ว
ทางฝั่งของไคลเอ็นต์ ให้ใช้ Queue
และ Group
ในการจัดลำดับงานลงในกลุ่มเดียวกัน
// จัดลำดับงานสามงานลงในกลุ่มเดียวกัน
client.Enqueue(task1, asynq.Queue("notifications"), asynq.Group("user1:email"))
client.Enqueue(task2, asynq.Queue("notifications"), asynq.Group("user1:email"))
client.Enqueue(task3, asynq.Queue("notifications"), asynq.Group("user1:email"))
ทางฝั่งเซิร์ฟเวอร์ ให้เตรียม GroupAggregator
เพื่อให้งานถูกเรียกเข้ากลุ่มได้ คุณสามารถปรับแต่งกลไกการรวมโดยกำหนดค่า GroupGracePeriod
GroupMaxDelay
และ GroupMaxSize
// ฟังก์ชันนี้ใช้ในการรวมงานหลายงานเข้าไว้ในงานเดียว
func aggregate(group string, tasks []*asynq.Task) *asynq.Task {
// ... ตรรกะของคุณในการรวมงานที่ระบุและคืนค่างานที่รวมไว้
// ... หากจำเป็น ให้ใช้ NewTask(typename, payload, opts...) เพื่อสร้างงานใหม่และกำหนดอ็อพชัน
// ... (หมายเหตุ) ตัวเลือก Queue จะถูกละเอียด และงานที่รวมไว้จะถูกจัดลำดับไปที่คิวเดียวกับกลุ่ม
}
srv := asynq.NewServer(
redisConnOpt,
asynq.Config{
GroupAggregator: asynq.GroupAggregatorFunc(aggregate),
GroupMaxDelay: 10 * time.Minute,
GroupGracePeriod: 2 * time.Minute,
GroupMaxSize: 20,
Queues: map[string]int{"notifications": 1},
},
)
บทช่วยสอน
ในส่วนนี้ เราจะเสนอโปรแกรมง่ายๆ เพื่อสาธิตการใช้คุณสมบัติการรวม.
ตัวแรก สร้างโปรแกรมไคลเอ็นต์ด้วยโค้ดต่อไปนี้:
// client.go
package main
import (
"flag"
"log"
"github.com/hibiken/asynq"
)
var (
flagRedisAddr = flag.String("redis-addr", "localhost:6379", "ที่อยู่เซิร์ฟเวอร์ Redis")
flagMessage = flag.String("message", "hello", "ข้อความที่จะถูกพิมพ์เมื่อกระบวนการงาน")
)
func main() {
flag.Parse()
c := asynq.NewClient(asynq.RedisClientOpt{Addr: *flagRedisAddr})
defer c.Close()
task := asynq.NewTask("aggregation-tutorial", []byte(*flagMessage))
info, err := c.Enqueue(task, asynq.Queue("tutorial"), asynq.Group("example-group"))
if err != nil {
log.Fatalf("ไม่สามารถเตรียมการงาน: %v", err)
}
log.Printf("การเตรียมงานเรียบร้อย: %s", info.ID)
}
คุณสามารถเรียกใช้โปรแกรมนี้หลายครั้ง:
$ go build -o client client.go
$ ./client --redis-addr=
$ ./client --message=hi --redis-addr=
$ ./client --message=bye --redis-addr=
ตอนนี้ หากคุณตรวจสอบคิวผ่าน CLI หรือ Web UI คุณจะเห็นงาน ถูกรวมเข้าด้วยกัน ในคิว
ต่อมา สร้างโปรแกรมเซิร์ฟเวอร์ด้วยโค้ดต่อไปนี้:
// server.go
package main
import (
"context"
"flag"
"log"
"strings"
"time"
"github.com/hibiken/asynq"
)
var (
flagRedisAddr = flag.String("redis-addr", "localhost:6379", "ที่อยู่ของเซิร์ฟเวอร์ Redis")
flagGroupGracePeriod = flag.Duration("grace-period", 10*time.Second, "ระยะเวลาอนุญาตสำหรับกลุ่ม")
flagGroupMaxDelay = flag.Duration("max-delay", 30*time.Second, "ค่าล่าช้าสุงสุดสำหรับกลุ่ม")
flagGroupMaxSize = flag.Int("max-size", 20, "ขนาดสูงสุดสำหรับกลุ่ม")
)
// ฟังก์ชันการรวมข้อความอย่างง่าย
// รวมข้อความที่ได้จากงานทั้งหมดเข้าด้วยกันด้วย โดยที่แต่ละข้อความใช้เส้นทางหนึ่ง
func aggregate(group string, tasks []*asynq.Task) *asynq.Task {
log.Printf("รวม %d งานจากกลุ่ม %q", len(tasks), group)
var b strings.Builder
for _, t := range tasks {
b.Write(t.Payload())
b.WriteString("\n")
}
return asynq.NewTask("aggregated-task", []byte(b.String()))
}
func handleAggregatedTask(ctx context.Context, task *asynq.Task) error {
log.Print("ผู้รับผิดชอบได้รับงานที่รวมกัน")
log.Printf("ข้อความที่รวมกัน: %s", task.Payload())
return nil
}
func main() {
flag.Parse()
srv := asynq.NewServer(
asynq.RedisClientOpt{Addr: *flagRedisAddr},
asynq.Config{
Queues: map[string]int{"tutorial": 1},
GroupAggregator: asynq.GroupAggregatorFunc(aggregate),
GroupGracePeriod: *flagGroupGracePeriod,
GroupMaxDelay: *flagGroupMaxDelay,
GroupMaxSize: *flagGroupMaxSize,
},
)
mux := asynq.NewServeMux()
mux.HandleFunc("aggregated-task", handleAggregatedTask)
if err := srv.Run(mux); err != nil {
log.Fatalf("ไม่สามารถเริ่มเซิร์ฟเวอร์: %v", err)
}
}
คุณสามารถรันโปรแกรมนี้และสังเกตผลลัพธ์ได้ดังนี้:
$ go build -o server server.go
$ ./server --redis-addr=
คุณควรสามารถเห็นในผลลัพธ์ว่าเซิร์ฟเวอร์ได้รวมงานในกลุ่มและตัวประมวลผลได้ประมวลผลงานที่ถูกรวมกันแล้ว ลองเปลี่ยน --grace-period
, --max-delay
, และ --max-size
ในโปรแกรมข้างต้นเพื่อดูว่ามันมีผลกระทบต่อกลยุบการรวมหรือไม่