หน้านี้จะแนะนำคุณลักษณะการรวมงานของ Asynq

ภาพรวม

การรวมงานช่วยให้คุณสามารถจัดคิวงานหลายๆ งานต่อเนื่องกัน แทนที่จะส่งมันไปทีละอันให้กับ "Handler" คุณลักษณะนี้ช่วยให้คุณสามารถรวมการกระทำหลายๆ อันต่อเนื่องกันเข้าไปอย่างเดียว เพื่อประหยัดค่าใช้จ่าย ทำให้แคชมีประสิทธิภาพ หรือแบทช์การแจ้งเตือน

หลักการทำงาน

เพื่อใช้คุณลักษณะการรวมงาน คุณต้องจัดคิวงานด้วย ชื่อกลุ่ม เดียวกันลงในคิวเดียวกัน งานที่จัดคิวด้วยคู่ (queue, group) เดียวกันจะถูกรวมเข้าไปทำงานเป็นงานเดียวโดย GroupAggregator ที่คุณกำหนด และงานที่ถูกรวมเข้าไปจะถูกส่งให้ Handler

เมื่อสร้างงานที่ถูกรวมเข้าไป เซิร์ฟเวอร์ Asynq จะรองานเพิ่มเติมจนกว่า ช่วงเวลาผ่อนผัน ที่กำหนดไว้จะหมด ทุกครั้งที่จัดคิวงานใหม่ด้วยคู่ (queue, group) เดียวกัน ช่วงเวลาผ่อนผันจะถูกอัพเดต

ช่วงเวลาผ่อนผันมีขีดจำกัดที่สามารถกำหนดเอง: คุณสามารถตั้งค่า เวลาล่าช้าสูงสุดในการรวมงาน หลังจากนั้น เซิร์ฟเวอร์ Asynq จะละไปกำหนดช่วงเวลาผ่อนผันที่เหลือและรวมงาน

คุณ c่าย ตั้งค่า จำนวนงานสูงสุด ที่สามารถรวมได้ในครั้งเดียวกัน ถ้าครบจำนวนแล้ว เซิร์ฟเวอร์ Asynq จะรวมงานทันที

หมายเหตุ: การตั้งเวลางานและการรวมงานเป็นคุณลักษณะที่ขัดแย้งกัน ซึ่งการตั้งเวลาจะมีลำดับความสำคัญมากกว่าการรวมงาน

ตัวอย่างการใช้งานอย่างรวดเร็ว

ทางฝั่งของไคลเอ็นต์ ให้ใช้ Queue และ Group ในการจัดลำดับงานลงในกลุ่มเดียวกัน

// จัดลำดับงานสามงานลงในกลุ่มเดียวกัน
client.Enqueue(task1, asynq.Queue("notifications"), asynq.Group("user1:email"))
client.Enqueue(task2, asynq.Queue("notifications"), asynq.Group("user1:email"))
client.Enqueue(task3, asynq.Queue("notifications"), asynq.Group("user1:email"))

ทางฝั่งเซิร์ฟเวอร์ ให้เตรียม GroupAggregator เพื่อให้งานถูกเรียกเข้ากลุ่มได้ คุณสามารถปรับแต่งกลไกการรวมโดยกำหนดค่า GroupGracePeriod GroupMaxDelay และ GroupMaxSize

// ฟังก์ชันนี้ใช้ในการรวมงานหลายงานเข้าไว้ในงานเดียว
func aggregate(group string, tasks []*asynq.Task) *asynq.Task {
    // ... ตรรกะของคุณในการรวมงานที่ระบุและคืนค่างานที่รวมไว้
    // ... หากจำเป็น ให้ใช้ NewTask(typename, payload, opts...) เพื่อสร้างงานใหม่และกำหนดอ็อพชัน
    // ... (หมายเหตุ) ตัวเลือก Queue จะถูกละเอียด และงานที่รวมไว้จะถูกจัดลำดับไปที่คิวเดียวกับกลุ่ม
}

srv := asynq.NewServer(
           redisConnOpt,
           asynq.Config{
               GroupAggregator:  asynq.GroupAggregatorFunc(aggregate),
               GroupMaxDelay:    10 * time.Minute,
               GroupGracePeriod: 2 * time.Minute,
               GroupMaxSize:     20,
               Queues: map[string]int{"notifications": 1},
           },
       )

บทช่วยสอน

ในส่วนนี้ เราจะเสนอโปรแกรมง่ายๆ เพื่อสาธิตการใช้คุณสมบัติการรวม.

ตัวแรก สร้างโปรแกรมไคลเอ็นต์ด้วยโค้ดต่อไปนี้:

// client.go
package main

import (
        "flag"
        "log"

        "github.com/hibiken/asynq"
)

var (
       flagRedisAddr = flag.String("redis-addr", "localhost:6379", "ที่อยู่เซิร์ฟเวอร์ Redis")
       flagMessage = flag.String("message", "hello", "ข้อความที่จะถูกพิมพ์เมื่อกระบวนการงาน")
)

func main() {
        flag.Parse()

        c := asynq.NewClient(asynq.RedisClientOpt{Addr: *flagRedisAddr})
        defer c.Close()

        task := asynq.NewTask("aggregation-tutorial", []byte(*flagMessage))
        info, err := c.Enqueue(task, asynq.Queue("tutorial"), asynq.Group("example-group"))
        if err != nil {
                log.Fatalf("ไม่สามารถเตรียมการงาน: %v", err)
        }
        log.Printf("การเตรียมงานเรียบร้อย: %s", info.ID)
}

คุณสามารถเรียกใช้โปรแกรมนี้หลายครั้ง:

$ go build -o client client.go 
$ ./client --redis-addr=
$ ./client --message=hi --redis-addr=
$ ./client --message=bye --redis-addr=

ตอนนี้ หากคุณตรวจสอบคิวผ่าน CLI หรือ Web UI คุณจะเห็นงาน ถูกรวมเข้าด้วยกัน ในคิว

ต่อมา สร้างโปรแกรมเซิร์ฟเวอร์ด้วยโค้ดต่อไปนี้:

// server.go
package main

import (
	"context"
	"flag"
	"log"
	"strings"
	"time"

	"github.com/hibiken/asynq"
)

var (
	flagRedisAddr         = flag.String("redis-addr", "localhost:6379", "ที่อยู่ของเซิร์ฟเวอร์ Redis")
	flagGroupGracePeriod  = flag.Duration("grace-period", 10*time.Second, "ระยะเวลาอนุญาตสำหรับกลุ่ม")
	flagGroupMaxDelay     = flag.Duration("max-delay", 30*time.Second, "ค่าล่าช้าสุงสุดสำหรับกลุ่ม")
	flagGroupMaxSize      = flag.Int("max-size", 20, "ขนาดสูงสุดสำหรับกลุ่ม")
)

// ฟังก์ชันการรวมข้อความอย่างง่าย
// รวมข้อความที่ได้จากงานทั้งหมดเข้าด้วยกันด้วย โดยที่แต่ละข้อความใช้เส้นทางหนึ่ง
func aggregate(group string, tasks []*asynq.Task) *asynq.Task {
	log.Printf("รวม %d งานจากกลุ่ม %q", len(tasks), group)
	var b strings.Builder
	for _, t := range tasks {
		b.Write(t.Payload())
		b.WriteString("\n")
	}
	return asynq.NewTask("aggregated-task", []byte(b.String()))
}

func handleAggregatedTask(ctx context.Context, task *asynq.Task) error {
	log.Print("ผู้รับผิดชอบได้รับงานที่รวมกัน")
	log.Printf("ข้อความที่รวมกัน: %s", task.Payload())
	return nil
}

func main() {
	flag.Parse()

	srv := asynq.NewServer(
		asynq.RedisClientOpt{Addr: *flagRedisAddr},
		asynq.Config{
			Queues:           map[string]int{"tutorial": 1},
			GroupAggregator:  asynq.GroupAggregatorFunc(aggregate),
			GroupGracePeriod: *flagGroupGracePeriod,
			GroupMaxDelay:    *flagGroupMaxDelay,
			GroupMaxSize:     *flagGroupMaxSize,
		},
	)

	mux := asynq.NewServeMux()
	mux.HandleFunc("aggregated-task", handleAggregatedTask)

	if err := srv.Run(mux); err != nil {
		log.Fatalf("ไม่สามารถเริ่มเซิร์ฟเวอร์: %v", err)
	}
}

คุณสามารถรันโปรแกรมนี้และสังเกตผลลัพธ์ได้ดังนี้:

$ go build -o server server.go
$ ./server --redis-addr=

คุณควรสามารถเห็นในผลลัพธ์ว่าเซิร์ฟเวอร์ได้รวมงานในกลุ่มและตัวประมวลผลได้ประมวลผลงานที่ถูกรวมกันแล้ว ลองเปลี่ยน --grace-period, --max-delay, และ --max-size ในโปรแกรมข้างต้นเพื่อดูว่ามันมีผลกระทบต่อกลยุบการรวมหรือไม่