概要
Server
と同時にScheduler
を並行して実行して、定期的なタスクを処理することができます。Schedulerは定期的にタスクをキューに追加し、その後クラスタ内の利用可能なワーカーサーバーがそれらを実行します。
各スケジュールごとに重複したタスクを避けるために、1つのSchedulerだけが実行されるようにする必要があります。集中管理のアプローチを使用すると、同期が不要で、ロックを使用せずにサービスを実行できます。
動的に定期タスクを追加および削除する必要がある場合は、Scheduler
を直接使用する代わりにPeriodicTaskManager
を使用してください。詳細はこのWikiを参照してください。
タイムゾーン
デフォルトでは、定期的なタスクはUTC時刻を使用しますが、SchedulerOpts
を使用して使用するタイムゾーンを変更することができます。
// 例えば、デフォルトのUTCタイムゾーンの代わりにAmerica/Los_Angelesタイムゾーンを使用します。
loc, err := time.LoadLocation("America/Los_Angeles")
if err != nil {
panic(err)
}
scheduler := asynq.NewScheduler(
redisConnOpt,
&asynq.SchedulerOpts{
Location: loc,
},
)
タスク登録
定期的にタスクをキューに追加するには、Schedulerにタスクレコードを登録する必要があります。
scheduler := asynq.NewScheduler(redisConnOpt, nil)
task := asynq.NewTask("example_task", nil)
// スケジュールを指定するためにcron仕様文字列を使用することができます。
entryID, err := scheduler.Register("* * * * *", task)
if err != nil {
log.Fatal(err)
}
log.Printf("登録されたエントリ: %q\n", entryID)
// インターバルを指定するために"@every "を使用することもできます。
entryID, err = scheduler.Register("@every 30s", task)
if err != nil {
log.Fatal(err)
}
log.Printf("登録されたエントリ: %q\n", entryID)
// オプションを渡すこともできます。
entryID, err = scheduler.Register("@every 24h", task, asynq.Queue("myqueue"))
if err != nil {
log.Fatal(err)
}
log.Printf("登録されたエントリ: %q\n", entryID)
Schedulerの実行
Schedulerを開始するには、SchedulerのRun
を呼び出します。
scheduler := asynq.NewScheduler(redisConnOpt, nil)
// ... タスクを登録
if err := scheduler.Run(); err != nil {
log.Fatal(err)
}
Run
を呼び出すと、TERMやINTシグナル(例: Ctrl-C)を待機します。
エラーハンドリング
Schedulerがタスクのエンキューに失敗した場合に、エラーを処理するためのハンドラ関数を提供することができます。
func handleEnqueueError(task *asynq.Task, opts []asynq.Option, err error) {
// エラー処理ロジック
}
scheduler := asynq.NewScheduler(
redisConnOpt,
&asynq.SchedulerOpts{
EnqueueErrorHandler: handleEnqueueError,
},
)
CLIを使用したチェック
CLIには、Schedulerレコードを確認するためのcron
というサブコマンドがあります。
現在実行中のSchedulerのすべてのレコードを表示するには、次のコマンドを実行します。
asynq cron ls
このコマンドは、各レコードのID、スケジュール仕様、次のエンキュー時刻、および最後のエンキュー時刻を含むリストを出力します。
また、各レコードの履歴を表示するには、次のコマンドを実行できます。
asynq cron history