Übersicht

Sie können einen Scheduler gleichzeitig mit dem Server ausführen, um Aufgaben periodisch zu verarbeiten. Der Scheduler fügt periodisch Aufgaben zur Warteschlange hinzu, die dann von verfügbaren Worker-Servern im Cluster ausgeführt werden.

Sie müssen sicherstellen, dass pro Zeitplan nur ein Scheduler läuft, um doppelte Aufgaben zu vermeiden. Die Verwendung eines zentralisierten Ansatzes bedeutet, dass keine Synchronisierung erforderlich ist und der Dienst ohne Verwendung von Sperrvorgängen ausgeführt werden kann.

Wenn Sie periodische Aufgaben dynamisch hinzufügen und entfernen müssen, verwenden Sie stattdessen PeriodicTaskManager anstelle von Scheduler. Weitere ausführliche Informationen finden Sie in diesem Wiki.

Zeitzone

Standardmäßig verwenden periodische Aufgaben die UTC-Zeit, aber Sie können SchedulerOpts verwenden, um die verwendete Zeitzone zu ändern.

// Verwenden Sie beispielsweise die Zeitzone America/Los_Angeles anstelle der Standard-UTC-Zeitzone.
loc, err := time.LoadLocation("America/Los_Angeles")
if err != nil {
    panic(err)
}
scheduler := asynq.NewScheduler(
    redisConnOpt, 
    &asynq.SchedulerOpts{
        Location: loc,
    },
)

Aufgabenregistrierung

Um periodisch Aufgaben zur Warteschlange hinzuzufügen, müssen Sie einen Aufgaben-Datensatz beim Scheduler registrieren.

scheduler := asynq.NewScheduler(redisConnOpt, nil)

task := asynq.NewTask("example_task", nil)

// Sie können einen Cron-Spezifikationsstring verwenden, um den Zeitplan anzugeben.
entryID, err := scheduler.Register("* * * * *", task)
if err != nil {
    log.Fatal(err)
}
log.Printf("Eintrag registriert: %q\n", entryID)

// Sie können auch "@every " verwenden, um Intervalle anzugeben.
entryID, err = scheduler.Register("@every 30s", task)
if err != nil {
    log.Fatal(err)
}
log.Printf("Eintrag registriert: %q\n", entryID)

// Sie können auch Optionen übergeben.
entryID, err = scheduler.Register("@every 24h", task, asynq.Queue("myqueue"))
if err != nil {
    log.Fatal(err)
}
log.Printf("Eintrag registriert: %q\n", entryID)

Ausführen des Schedulers

Um den Scheduler zu starten, rufen Sie Run auf dem Scheduler auf.

scheduler := asynq.NewScheduler(redisConnOpt, nil)

// ... Aufgaben registrieren

if err := scheduler.Run(); err != nil {
    log.Fatal(err)
}

Das Aufrufen von Run wartet auf das TERM- oder INT-Signal (z. B. Strg+C).

Fehlerbehandlung

Sie können eine Handlerfunktion bereitstellen, um Fehler zu behandeln, wenn der Scheduler Aufgaben nicht in die Warteschlange einfügen kann.

func handleEnqueueError(task *asynq.Task, opts []asynq.Option, err error) {
    // Ihre Fehlerbehandlungslogik
}

scheduler := asynq.NewScheduler(
    redisConnOpt, 
    &asynq.SchedulerOpts{
        EnqueueErrorHandler: handleEnqueueError,
    },
)

Überprüfung über die Befehlszeile (CLI)

Die CLI verfügt über einen Unterbefehl namens cron zum Überprüfen von Scheduler-Aufzeichnungen.

Um alle Aufzeichnungen des aktuell laufenden Schedulers anzuzeigen, können Sie den folgenden Befehl ausführen:

asynq cron ls

Dieser Befehl gibt eine Liste mit der ID, der Zeitplanspezifikation, der nächsten Einreihungszeit und der letzten Einreihungszeit für jede Aufzeichnung aus.

Sie können auch den folgenden Befehl ausführen, um die Geschichte jeder Aufzeichnung anzuzeigen:

asynq cron history