Aperçu
Vous pouvez exécuter un Scheduler
de manière concurrente avec le Serveur
pour gérer les tâches périodiquement. Le Scheduler ajoutera périodiquement des tâches à la file d'attente, qui seront ensuite exécutées par les serveurs de travail disponibles dans le cluster.
Vous devez vous assurer qu'un seul Scheduler s'exécute pour chaque planning afin d'éviter les tâches en double. Utiliser une approche centralisée signifie que la synchronisation n'est pas nécessaire, et que le service peut fonctionner sans utiliser de verrous.
Si vous devez ajouter et supprimer dynamiquement des tâches périodiques, utilisez PeriodicTaskManager
au lieu d'utiliser directement le Scheduler
. Voir cette page wiki pour des informations détaillées.
Fuseau horaire
Par défaut, les tâches périodiques utilisent l'heure UTC, mais vous pouvez utiliser SchedulerOpts
pour changer le fuseau horaire.
// Par exemple, utilisez le fuseau horaire America/Los_Angeles au lieu du fuseau horaire UTC par défaut.
loc, err := time.LoadLocation("America/Los_Angeles")
if err != nil {
panic(err)
}
scheduler := asynq.NewScheduler(
redisConnOpt,
&asynq.SchedulerOpts{
Location: loc,
},
)
Enregistrement des tâches
Pour ajouter périodiquement des tâches à la file d'attente, vous devez enregistrer un enregistrement de tâche avec le Scheduler.
scheduler := asynq.NewScheduler(redisConnOpt, nil)
task := asynq.NewTask("example_task", nil)
// Vous pouvez utiliser une chaîne de spécification cron pour définir le planning.
entryID, err := scheduler.Register("* * * * *", task)
if err != nil {
log.Fatal(err)
}
log.Printf("enregistrement d'une entrée : %q\n", entryID)
// Vous pouvez également utiliser "@every " pour spécifier les intervalles.
entryID, err = scheduler.Register("@every 30s", task)
if err != nil {
log.Fatal(err)
}
log.Printf("enregistrement d'une entrée : %q\n", entryID)
// Vous pouvez également passer des options.
entryID, err = scheduler.Register("@every 24h", task, asynq.Queue("myqueue"))
if err != nil {
log.Fatal(err)
}
log.Printf("enregistrement d'une entrée : %q\n", entryID)
Exécution du Scheduler
Pour démarrer le Scheduler, appelez Run
sur le Scheduler.
scheduler := asynq.NewScheduler(redisConnOpt, nil)
// ... Enregistrer des tâches
if err := scheduler.Run(); err != nil {
log.Fatal(err)
}
Appeler Run
attendra le signal TERM ou INT (par exemple, Ctrl-C).
Gestion des erreurs
Vous pouvez fournir une fonction de gestion des erreurs pour gérer les erreurs si le Scheduler échoue à mettre des tâches en file d'attente.
func handleEnqueueError(task *asynq.Task, opts []asynq.Option, err error) {
// Votre logique de gestion des erreurs
}
scheduler := asynq.NewScheduler(
redisConnOpt,
&asynq.SchedulerOpts{
EnqueueErrorHandler: handleEnqueueError,
},
)
Vérification via CLI
Le CLI a une sous-commande nommée cron
pour vérifier les enregistrements du Scheduler.
Pour voir tous les enregistrements du Scheduler actuellement en cours d'exécution, vous pouvez exécuter la commande suivante :
asynq cron ls
Cette commande affichera une liste contenant l'identifiant, la spécification du planning, l'heure de mise en file d'attente suivante et l'heure de la dernière mise en file d'attente pour chaque enregistrement.
Vous pouvez également exécuter la commande suivante pour voir l'historique de chaque enregistrement :
asynq cron history