Trang này giới thiệu tính năng tổng hợp nhiệm vụ của Asynq.

Tổng quan

Tính năng tổng hợp nhiệm vụ cho phép bạn xếp hàng nhiều nhiệm vụ liên tiếp, thay vì đưa chúng một cách từng cái một đến một "Xử lý viên". Tính năng này cho phép bạn gom nhóm nhiều hoạt động liên tiếp thành một, tiết kiệm chi phí, tối ưu hóa caching, hoặc gửi thông báo theo lô.

Nguyên lý hoạt động

Để sử dụng tính năng tổng hợp nhiệm vụ, bạn cần xếp hàng nhiệm vụ với cùng tên nhóm vào cùng một hàng đợi. Các nhiệm vụ được xếp hàng bằng cặp (hàng đợi, nhóm) sẽ được tổng hợp thành một nhiệm vụ bởi GroupAggregator mà bạn cung cấp, và nhiệm vụ tổng hợp sẽ được chuyển đến bộ xử lý.

Khi tạo một nhiệm vụ tổng hợp, máy chủ Asynq sẽ chờ đợi nhiều nhiệm vụ hơn cho đến khi thời gian ân hạn có thể thiết lập hết hạn. Mỗi khi một nhiệm vụ mới được xếp hàng với cùng (hàng đợi, nhóm), thời gian ân hạn sẽ được cập nhật.

Thời gian ân hạn có một giới hạn cao có thể thiết lập: bạn có thể đặt thời gian trễ tổng hợp tối đa, sau đó máy chủ Asynq sẽ bỏ qua thời gian ân hạn còn lại và tổng hợp các nhiệm vụ.

Bạn cũng có thể đặt số lượng tối đa của nhiệm vụ có thể được tổng hợp cùng nhau. Nếu số lượng này được đạt, máy chủ Asynq sẽ ngay lập tức tổng hợp các nhiệm vụ.

Lưu ý: Lập lịch nhiệm vụ và tổng hợp là các tính năng xung đột, với lịch trình lập lịch được ưu tiên hơn tổng hợp.

Ví dụ Nhanh

Ở phía máy khách, sử dụng các tùy chọn QueueGroup để đưa các nhiệm vụ vào cùng một nhóm.

// Đưa ba nhiệm vụ vào cùng một nhóm.
client.Enqueue(task1, asynq.Queue("notifications"), asynq.Group("user1:email"))
client.Enqueue(task2, asynq.Queue("notifications"), asynq.Group("user1:email"))
client.Enqueue(task3, asynq.Queue("notifications"), asynq.Group("user1:email"))

Ở phía máy chủ, cung cấp một GroupAggregator để kích hoạt việc tổng hợp nhiệm vụ. Bạn có thể tùy chỉnh chiến lược tổng hợp bằng cách cấu hình GroupGracePeriod, GroupMaxDelay, và GroupMaxSize.

// Hàm này được sử dụng để tổng hợp nhiềm vụ thành một nhiệm vụ.
func aggregate(group string, tasks []*asynq.Task) *asynq.Task {
    // ... Logic của bạn để tổng hợp các nhiệm vụ đã cho và trả về nhiệm vụ đã tổng hợp.
    // ... Nếu cần thiết, sử dụng NewTask(typename, payload, opts...) để tạo một nhiệm vụ mới và đặt các tùy chọn.
    // ... (Lưu ý) Tùy chọn Queue sẽ bị bỏ qua, và nhiệm vụ đã tổng hợp sẽ luôn được đưa vào hàng đợi cùng nhóm.
}

srv := asynq.NewServer(
           redisConnOpt,
           asynq.Config{
               GroupAggregator:  asynq.GroupAggregatorFunc(aggregate),
               GroupMaxDelay:    10 * time.Minute,
               GroupGracePeriod: 2 * time.Minute,
               GroupMaxSize:     20,
               Queues: map[string]int{"notifications": 1},
           },
       )

Hướng dẫn

Trong phần này, chúng tôi cung cấp một chương trình đơn giản để thể hiện việc sử dụng tính năng tổng hợp.

Đầu tiên, tạo một chương trình máy khách với mã sau đây:

// client.go
package main

import (
        "flag"
        "log"

        "github.com/hibiken/asynq"
)

var (
       flagRedisAddr = flag.String("redis-addr", "localhost:6379", "Địa chỉ máy chủ Redis")
       flagMessage = flag.String("message", "xin chào", "Tin nhắn sẽ được in khi xử lý nhiệm vụ")
)

func main() {
        flag.Parse()

        c := asynq.NewClient(asynq.RedisClientOpt{Addr: *flagRedisAddr})
        defer c.Close()

        task := asynq.NewTask("aggregation-tutorial", []byte(*flagMessage))
        info, err := c.Enqueue(task, asynq.Queue("tutorial"), asynq.Group("example-group"))
        if err != nil {
                log.Fatalf("Không thể đưa nhiệm vụ vào hàng đợi: %v", err)
        }
        log.Printf("Đã đưa nhiệm vụ vào hàng đợi thành công: %s", info.ID)
}

Bạn có thể chạy chương trình này nhiều lần:

$ go build -o client client.go 
$ ./client --redis-addr=
$ ./client --message=hi --redis-addr=
$ ./client --message=bye --redis-addr=

Bây giờ, nếu bạn kiểm tra hàng đợi thông qua CLI hoặc Giao diện Người dùng Web, bạn sẽ thấy các nhiệm vụ được tổng hợp trong hàng đợi.

Tiếp theo, tạo một chương trình máy chủ với mã sau đây:

// server.go
package main

import (
	"context"
	"flag"
	"log"
	"strings"
	"time"

	"github.com/hibiken/asynq"
)

var (
	flagRedisAddr         = flag.String("redis-addr", "localhost:6379", "Địa chỉ của máy chủ Redis")
	flagGroupGracePeriod  = flag.Duration("grace-period", 10*time.Second, "Thời gian dành cho các nhóm")
	flagGroupMaxDelay     = flag.Duration("max-delay", 30*time.Second, "Độ trễ tối đa cho các nhóm")
	flagGroupMaxSize      = flag.Int("max-size", 20, "Kích thước tối đa cho các nhóm")
)

// Hàm tổng hợp đơn giản.
// Kết hợp các tin nhắn của tất cả các nhiệm vụ thành một, với mỗi tin nhắn chiếm một dòng.
func aggregate(group string, tasks []*asynq.Task) *asynq.Task {
	log.Printf("Đã tổng hợp %d nhiệm vụ từ nhóm %q", len(tasks), group)
	var b strings.Builder
	for _, t := range tasks {
		b.Write(t.Payload())
		b.WriteString("\n")
	}
	return asynq.NewTask("aggregated-task", []byte(b.String()))
}

func handleAggregatedTask(ctx context.Context, task *asynq.Task) error {
	log.Print("Người xử lý đã nhận nhiệm vụ đã tổng hợp")
	log.Printf("Tin nhắn đã tổng hợp: %s", task.Payload())
	return nil
}

func main() {
	flag.Parse()

	srv := asynq.NewServer(
		asynq.RedisClientOpt{Addr: *flagRedisAddr},
		asynq.Config{
			Queues:           map[string]int{"tutorial": 1},
			GroupAggregator:  asynq.GroupAggregatorFunc(aggregate),
			GroupGracePeriod: *flagGroupGracePeriod,
			GroupMaxDelay:    *flagGroupMaxDelay,
			GroupMaxSize:     *flagGroupMaxSize,
		},
	)

	mux := asynq.NewServeMux()
	mux.HandleFunc("aggregated-task", handleAggregatedTask)

	if err := srv.Run(mux); err != nil {
		log.Fatalf("Không thể khởi động máy chủ: %v", err)
	}
}

Bạn có thể chạy chương trình này và quan sát kết quả đầu ra:

$ go build -o server server.go
$ ./server --redis-addr=

Bạn sẽ thấy trong đầu ra rằng máy chủ đã tổng hợp các nhiệm vụ trong nhóm và người xử lý đã xử lý các nhiệm vụ đã tổng hợp. Hãy thử thay đổi các cờ --grace-period, --max-delay, và --max-size trong chương trình trên để xem chúng ảnh hưởng như thế nào đến chiến lược tổng hợp.