Обзор

Вы можете запустить Планировщик параллельно с Сервером, чтобы обрабатывать задачи периодически. Планировщик будет периодически добавлять задачи в очередь, которые затем будут выполнены доступными рабочими серверами в кластере.

Вам необходимо убедиться, что для каждого расписания работает только один Планировщик, чтобы избежать дублирования задач. Использование централизованного подхода означает, что синхронизация не требуется, и сервис может работать без использования блокировок.

Если вам нужно динамически добавлять и удалять периодические задачи, используйте PeriodicTaskManager, а не непосредственно Scheduler. Для получения более подробной информации см. эту вики.

Часовой пояс

По умолчанию периодические задачи используют часовой пояс UTC, но вы можете использовать SchedulerOpts для изменения используемого часового пояса.

// Например, используйте часовой пояс America/Los_Angeles вместо стандартного часового пояса UTC.
loc, err := time.LoadLocation("America/Los_Angeles")
if err != nil {
    panic(err)
}
scheduler := asynq.NewScheduler(
    redisConnOpt, 
    &asynq.SchedulerOpts{
        Location: loc,
    },
)

Регистрация задачи

Для периодического добавления задач в очередь вам необходимо зарегистрировать запись задачи в Планировщике.

scheduler := asynq.NewScheduler(redisConnOpt, nil)

task := asynq.NewTask("example_task", nil)

// Вы можете использовать строку спецификации cron для указания расписания.
entryID, err := scheduler.Register("* * * * *", task)
if err != nil {
    log.Fatal(err)
}
log.Printf("зарегистрирована запись: %q\n", entryID)

// Вы также можете использовать "@every " для указания интервалов.
entryID, err = scheduler.Register("@every 30s", task)
if err != nil {
    log.Fatal(err)
}
log.Printf("зарегистрирована запись: %q\n", entryID)

// Вы также можете передавать параметры.
entryID, err = scheduler.Register("@every 24h", task, asynq.Queue("myqueue"))
if err != nil {
    log.Fatal(err)
}
log.Printf("зарегистрирована запись: %q\n", entryID)

Запуск Планировщика

Для запуска Планировщика вызовите Run на Планировщике.

scheduler := asynq.NewScheduler(redisConnOpt, nil)

// ... Регистрация задач

if err := scheduler.Run(); err != nil {
    log.Fatal(err)
}

Вызов Run будет ожидать сигнал TERM или INT (например, Ctrl-C).

Обработка ошибок

Вы можете предоставить функцию обработчика для обработки ошибок, если Планировщик не может поставить задачи в очередь.

func handleEnqueueError(task *asynq.Task, opts []asynq.Option, err error) {
    // Ваша логика обработки ошибок
}

scheduler := asynq.NewScheduler(
    redisConnOpt, 
    &asynq.SchedulerOpts{
        EnqueueErrorHandler: handleEnqueueError,
    },
)

Проверка через CLI

У инструмента командной строки есть подкоманда с именем cron для проверки записей Планировщика.

Чтобы просмотреть все записи для текущего работающего Планировщика, вы можете выполнить следующую команду:

asynq cron ls

Эта команда выведет список, содержащий ID, спецификацию расписания, следующее время добавления в очередь и последнее время добавления в очередь для каждой записи.

Вы также можете выполнить следующую команду для просмотра истории каждой записи:

asynq cron history