Übersicht
Sie können einen Scheduler
gleichzeitig mit dem Server
ausführen, um Aufgaben periodisch zu verarbeiten. Der Scheduler fügt periodisch Aufgaben zur Warteschlange hinzu, die dann von verfügbaren Worker-Servern im Cluster ausgeführt werden.
Sie müssen sicherstellen, dass pro Zeitplan nur ein Scheduler läuft, um doppelte Aufgaben zu vermeiden. Die Verwendung eines zentralisierten Ansatzes bedeutet, dass keine Synchronisierung erforderlich ist und der Dienst ohne Verwendung von Sperrvorgängen ausgeführt werden kann.
Wenn Sie periodische Aufgaben dynamisch hinzufügen und entfernen müssen, verwenden Sie stattdessen PeriodicTaskManager
anstelle von Scheduler
. Weitere ausführliche Informationen finden Sie in diesem Wiki.
Zeitzone
Standardmäßig verwenden periodische Aufgaben die UTC-Zeit, aber Sie können SchedulerOpts
verwenden, um die verwendete Zeitzone zu ändern.
// Verwenden Sie beispielsweise die Zeitzone America/Los_Angeles anstelle der Standard-UTC-Zeitzone.
loc, err := time.LoadLocation("America/Los_Angeles")
if err != nil {
panic(err)
}
scheduler := asynq.NewScheduler(
redisConnOpt,
&asynq.SchedulerOpts{
Location: loc,
},
)
Aufgabenregistrierung
Um periodisch Aufgaben zur Warteschlange hinzuzufügen, müssen Sie einen Aufgaben-Datensatz beim Scheduler registrieren.
scheduler := asynq.NewScheduler(redisConnOpt, nil)
task := asynq.NewTask("example_task", nil)
// Sie können einen Cron-Spezifikationsstring verwenden, um den Zeitplan anzugeben.
entryID, err := scheduler.Register("* * * * *", task)
if err != nil {
log.Fatal(err)
}
log.Printf("Eintrag registriert: %q\n", entryID)
// Sie können auch "@every " verwenden, um Intervalle anzugeben.
entryID, err = scheduler.Register("@every 30s", task)
if err != nil {
log.Fatal(err)
}
log.Printf("Eintrag registriert: %q\n", entryID)
// Sie können auch Optionen übergeben.
entryID, err = scheduler.Register("@every 24h", task, asynq.Queue("myqueue"))
if err != nil {
log.Fatal(err)
}
log.Printf("Eintrag registriert: %q\n", entryID)
Ausführen des Schedulers
Um den Scheduler zu starten, rufen Sie Run
auf dem Scheduler auf.
scheduler := asynq.NewScheduler(redisConnOpt, nil)
// ... Aufgaben registrieren
if err := scheduler.Run(); err != nil {
log.Fatal(err)
}
Das Aufrufen von Run
wartet auf das TERM- oder INT-Signal (z. B. Strg+C).
Fehlerbehandlung
Sie können eine Handlerfunktion bereitstellen, um Fehler zu behandeln, wenn der Scheduler Aufgaben nicht in die Warteschlange einfügen kann.
func handleEnqueueError(task *asynq.Task, opts []asynq.Option, err error) {
// Ihre Fehlerbehandlungslogik
}
scheduler := asynq.NewScheduler(
redisConnOpt,
&asynq.SchedulerOpts{
EnqueueErrorHandler: handleEnqueueError,
},
)
Überprüfung über die Befehlszeile (CLI)
Die CLI verfügt über einen Unterbefehl namens cron
zum Überprüfen von Scheduler-Aufzeichnungen.
Um alle Aufzeichnungen des aktuell laufenden Schedulers anzuzeigen, können Sie den folgenden Befehl ausführen:
asynq cron ls
Dieser Befehl gibt eine Liste mit der ID, der Zeitplanspezifikation, der nächsten Einreihungszeit und der letzten Einreihungszeit für jede Aufzeichnung aus.
Sie können auch den folgenden Befehl ausführen, um die Geschichte jeder Aufzeichnung anzuzeigen:
asynq cron history