Aperçu

Vous pouvez exécuter un Scheduler de manière concurrente avec le Serveur pour gérer les tâches périodiquement. Le Scheduler ajoutera périodiquement des tâches à la file d'attente, qui seront ensuite exécutées par les serveurs de travail disponibles dans le cluster.

Vous devez vous assurer qu'un seul Scheduler s'exécute pour chaque planning afin d'éviter les tâches en double. Utiliser une approche centralisée signifie que la synchronisation n'est pas nécessaire, et que le service peut fonctionner sans utiliser de verrous.

Si vous devez ajouter et supprimer dynamiquement des tâches périodiques, utilisez PeriodicTaskManager au lieu d'utiliser directement le Scheduler. Voir cette page wiki pour des informations détaillées.

Fuseau horaire

Par défaut, les tâches périodiques utilisent l'heure UTC, mais vous pouvez utiliser SchedulerOpts pour changer le fuseau horaire.

// Par exemple, utilisez le fuseau horaire America/Los_Angeles au lieu du fuseau horaire UTC par défaut.
loc, err := time.LoadLocation("America/Los_Angeles")
if err != nil {
    panic(err)
}
scheduler := asynq.NewScheduler(
    redisConnOpt, 
    &asynq.SchedulerOpts{
        Location: loc,
    },
)

Enregistrement des tâches

Pour ajouter périodiquement des tâches à la file d'attente, vous devez enregistrer un enregistrement de tâche avec le Scheduler.

scheduler := asynq.NewScheduler(redisConnOpt, nil)

task := asynq.NewTask("example_task", nil)

// Vous pouvez utiliser une chaîne de spécification cron pour définir le planning.
entryID, err := scheduler.Register("* * * * *", task)
if err != nil {
    log.Fatal(err)
}
log.Printf("enregistrement d'une entrée : %q\n", entryID)

// Vous pouvez également utiliser "@every " pour spécifier les intervalles.
entryID, err = scheduler.Register("@every 30s", task)
if err != nil {
    log.Fatal(err)
}
log.Printf("enregistrement d'une entrée : %q\n", entryID)

// Vous pouvez également passer des options.
entryID, err = scheduler.Register("@every 24h", task, asynq.Queue("myqueue"))
if err != nil {
    log.Fatal(err)
}
log.Printf("enregistrement d'une entrée : %q\n", entryID)

Exécution du Scheduler

Pour démarrer le Scheduler, appelez Run sur le Scheduler.

scheduler := asynq.NewScheduler(redisConnOpt, nil)

// ... Enregistrer des tâches

if err := scheduler.Run(); err != nil {
    log.Fatal(err)
}

Appeler Run attendra le signal TERM ou INT (par exemple, Ctrl-C).

Gestion des erreurs

Vous pouvez fournir une fonction de gestion des erreurs pour gérer les erreurs si le Scheduler échoue à mettre des tâches en file d'attente.

func handleEnqueueError(task *asynq.Task, opts []asynq.Option, err error) {
    // Votre logique de gestion des erreurs
}

scheduler := asynq.NewScheduler(
    redisConnOpt, 
    &asynq.SchedulerOpts{
        EnqueueErrorHandler: handleEnqueueError,
    },
)

Vérification via CLI

Le CLI a une sous-commande nommée cron pour vérifier les enregistrements du Scheduler.

Pour voir tous les enregistrements du Scheduler actuellement en cours d'exécution, vous pouvez exécuter la commande suivante :

asynq cron ls

Cette commande affichera une liste contenant l'identifiant, la spécification du planning, l'heure de mise en file d'attente suivante et l'heure de la dernière mise en file d'attente pour chaque enregistrement.

Vous pouvez également exécuter la commande suivante pour voir l'historique de chaque enregistrement :

asynq cron history