Tổng quan
Bạn có thể chạy một Scheduler
song song với Server
để xử lý các nhiệm vụ theo chu kỳ. Scheduler sẽ định kỳ thêm các nhiệm vụ vào hàng đợi, sau đó sẽ được thực thi bởi các máy chủ worker khả dụng trong cluster.
Bạn cần đảm bảo rằng chỉ có một Scheduler đang chạy cho mỗi lịch trình để tránh các nhiệm vụ trùng lặp. Sử dụng phương pháp tập trung có nghĩa là đồng bộ hóa không cần thiết, và dịch vụ có thể chạy mà không cần sử dụng khóa.
Nếu bạn cần động thêm và loại bỏ nhiệm vụ theo chu kỳ, hãy sử dụng PeriodicTaskManager
thay vì sử dụng Scheduler
trực tiếp. Xem trang wiki này để biết thông tin chi tiết hơn.
Múi giờ
Mặc định, các nhiệm vụ theo chu kỳ sử dụng múi giờ UTC, nhưng bạn có thể sử dụng SchedulerOpts
để thay đổi múi giờ được sử dụng.
// Ví dụ: sử dụng múi giờ America/Los_Angeles thay vì mặc định là múi giờ UTC.
loc, err := time.LoadLocation("America/Los_Angeles")
if err != nil {
panic(err)
}
scheduler := asynq.NewScheduler(
redisConnOpt,
&asynq.SchedulerOpts{
Location: loc,
},
)
Đăng ký Nhiệm vụ
Để định kỳ thêm các nhiệm vụ vào hàng đợi, bạn cần đăng ký một bản ghi nhiệm vụ với Scheduler.
scheduler := asynq.NewScheduler(redisConnOpt, nil)
task := asynq.NewTask("example_task", nil)
// Bạn có thể sử dụng một chuỗi đặc tả cron để chỉ định lịch trình.
entryID, err := scheduler.Register("* * * * *", task)
if err != nil {
log.Fatal(err)
}
log.Printf("đã đăng ký một bản ghi: %q\n", entryID)
// Bạn cũng có thể sử dụng "@every" để chỉ định khoảng thời gian.
entryID, err = scheduler.Register("@every 30s", task)
if err != nil {
log.Fatal(err)
}
log.Printf("đã đăng ký một bản ghi: %q\n", entryID)
// Bạn cũng có thể truyền các tùy chọn.
entryID, err = scheduler.Register("@every 24h", task, asynq.Queue("myqueue"))
if err != nil {
log.Fatal(err)
}
log.Printf("đã đăng ký một bản ghi: %q\n", entryID)
Chạy Scheduler
Để bắt đầu Scheduler, gọi Run
trên Scheduler.
scheduler := asynq.NewScheduler(redisConnOpt, nil)
// ... Đăng ký nhiệm vụ
if err := scheduler.Run(); err != nil {
log.Fatal(err)
}
Gọi Run
sẽ chờ tín hiệu TERM hoặc INT (ví dụ: Ctrl-C).
Xử lý Lỗi
Bạn có thể cung cấp một hàm xử lý để xử lý lỗi nếu Scheduler không thể đưa các nhiệm vụ vào hàng đợi.
func handleEnqueueError(task *asynq.Task, opts []asynq.Option, err error) {
// Logic xử lý lỗi của bạn
}
scheduler := asynq.NewScheduler(
redisConnOpt,
&asynq.SchedulerOpts{
EnqueueErrorHandler: handleEnqueueError,
},
)
Kiểm tra thông qua CLI
CLI có một lệnh phụ tên là cron
để kiểm tra các bản ghi Scheduler.
Để xem tất cả các bản ghi cho Scheduler đang chạy hiện tại, bạn có thể chạy lệnh sau:
asynq cron ls
Lệnh này sẽ đầu ra một danh sách chứa ID, đặc tả lịch trình, thời gian đưa vào hàng đợi tiếp theo và thời gian đưa vào hàng đợi cuối cùng cho mỗi bản ghi.
Bạn cũng có thể chạy lệnh sau để xem lịch sử của mỗi bản ghi:
asynq cron history