Обзор
Вы можете запустить Планировщик
параллельно с Сервером
, чтобы обрабатывать задачи периодически. Планировщик будет периодически добавлять задачи в очередь, которые затем будут выполнены доступными рабочими серверами в кластере.
Вам необходимо убедиться, что для каждого расписания работает только один Планировщик, чтобы избежать дублирования задач. Использование централизованного подхода означает, что синхронизация не требуется, и сервис может работать без использования блокировок.
Если вам нужно динамически добавлять и удалять периодические задачи, используйте PeriodicTaskManager
, а не непосредственно Scheduler
. Для получения более подробной информации см. эту вики.
Часовой пояс
По умолчанию периодические задачи используют часовой пояс UTC, но вы можете использовать SchedulerOpts
для изменения используемого часового пояса.
// Например, используйте часовой пояс America/Los_Angeles вместо стандартного часового пояса UTC.
loc, err := time.LoadLocation("America/Los_Angeles")
if err != nil {
panic(err)
}
scheduler := asynq.NewScheduler(
redisConnOpt,
&asynq.SchedulerOpts{
Location: loc,
},
)
Регистрация задачи
Для периодического добавления задач в очередь вам необходимо зарегистрировать запись задачи в Планировщике.
scheduler := asynq.NewScheduler(redisConnOpt, nil)
task := asynq.NewTask("example_task", nil)
// Вы можете использовать строку спецификации cron для указания расписания.
entryID, err := scheduler.Register("* * * * *", task)
if err != nil {
log.Fatal(err)
}
log.Printf("зарегистрирована запись: %q\n", entryID)
// Вы также можете использовать "@every " для указания интервалов.
entryID, err = scheduler.Register("@every 30s", task)
if err != nil {
log.Fatal(err)
}
log.Printf("зарегистрирована запись: %q\n", entryID)
// Вы также можете передавать параметры.
entryID, err = scheduler.Register("@every 24h", task, asynq.Queue("myqueue"))
if err != nil {
log.Fatal(err)
}
log.Printf("зарегистрирована запись: %q\n", entryID)
Запуск Планировщика
Для запуска Планировщика вызовите Run
на Планировщике.
scheduler := asynq.NewScheduler(redisConnOpt, nil)
// ... Регистрация задач
if err := scheduler.Run(); err != nil {
log.Fatal(err)
}
Вызов Run
будет ожидать сигнал TERM или INT (например, Ctrl-C).
Обработка ошибок
Вы можете предоставить функцию обработчика для обработки ошибок, если Планировщик не может поставить задачи в очередь.
func handleEnqueueError(task *asynq.Task, opts []asynq.Option, err error) {
// Ваша логика обработки ошибок
}
scheduler := asynq.NewScheduler(
redisConnOpt,
&asynq.SchedulerOpts{
EnqueueErrorHandler: handleEnqueueError,
},
)
Проверка через CLI
У инструмента командной строки есть подкоманда с именем cron
для проверки записей Планировщика.
Чтобы просмотреть все записи для текущего работающего Планировщика, вы можете выполнить следующую команду:
asynq cron ls
Эта команда выведет список, содержащий ID, спецификацию расписания, следующее время добавления в очередь и последнее время добавления в очередь для каждой записи.
Вы также можете выполнить следующую команду для просмотра истории каждой записи:
asynq cron history