Overview

You can run a Scheduler concurrently with the Server to handle tasks periodically. The Scheduler will periodically add tasks to the queue, which will then be executed by available worker servers in the cluster.

You need to ensure that only one Scheduler is running for each schedule to avoid duplicate tasks. Using a centralized approach means that synchronization is not necessary, and the service can run without using locks.

If you need to dynamically add and remove periodic tasks, use PeriodicTaskManager instead of directly using the Scheduler. See this wiki for more detailed information.

Time Zone

By default, periodic tasks use UTC time, but you can use SchedulerOpts to change the time zone used.

// For example, use the America/Los_Angeles time zone instead of the default UTC time zone.
loc, err := time.LoadLocation("America/Los_Angeles")
if err != nil {
    panic(err)
}
scheduler := asynq.NewScheduler(
    redisConnOpt, 
    &asynq.SchedulerOpts{
        Location: loc,
    },
)

Task Registration

To periodically add tasks to the queue, you need to register a task record with the Scheduler.

scheduler := asynq.NewScheduler(redisConnOpt, nil)

task := asynq.NewTask("example_task", nil)

// You can use a cron specification string to specify the schedule.
entryID, err := scheduler.Register("* * * * *", task)
if err != nil {
    log.Fatal(err)
}
log.Printf("registered an entry: %q\n", entryID)

// You can also use "@every " to specify intervals.
entryID, err = scheduler.Register("@every 30s", task)
if err != nil {
    log.Fatal(err)
}
log.Printf("registered an entry: %q\n", entryID)

// You can also pass options.
entryID, err = scheduler.Register("@every 24h", task, asynq.Queue("myqueue"))
if err != nil {
    log.Fatal(err)
}
log.Printf("registered an entry: %q\n", entryID)

Running the Scheduler

To start the Scheduler, call Run on the Scheduler.

scheduler := asynq.NewScheduler(redisConnOpt, nil)

// ... Register tasks

if err := scheduler.Run(); err != nil {
    log.Fatal(err)
}

Calling Run will wait for the TERM or INT signal (e.g., Ctrl-C).

Error Handling

You can provide a handler function to handle errors if the Scheduler fails to enqueue tasks.

func handleEnqueueError(task *asynq.Task, opts []asynq.Option, err error) {
    // Your error handling logic
}

scheduler := asynq.NewScheduler(
    redisConnOpt, 
    &asynq.SchedulerOpts{
        EnqueueErrorHandler: handleEnqueueError,
    },
)

Checking via CLI

The CLI has a subcommand named cron for checking Scheduler records.

To view all records for the currently running Scheduler, you can run the following command:

asynq cron ls

This command will output a list containing the ID, schedule specification, next enqueue time, and last enqueue time for each record.

You can also run the following command to view the history of each record:

asynq cron history