AsynqのUnique Task FeatureはRedisキュー内にタスクを一意に保証します。

タスクの重複を避け、重複したタスクを排除したい場合に、この機能は非常に有用です。

概要

Asynqにはタスクの一意性を確保するための2つの方法があります。

  1. TaskIDオプションの使用:自分で一意のタスクIDを生成します。
  2. Uniqueオプションの使用:Asynqがタスクの一意性ロックを作成するようにします。

TaskIDオプションの使用

最初の方法を選択すると、特定のタスクIDに対して同時に1つのタスクしか存在しないことを保証できます。同じタスクIDで別のタスクをエンキューしようとすると、ErrTaskIDConflictエラーが返されます。

// 最初のタスクは問題なくエンキューできる
_, err := client.Enqueue(task, asynq.TaskID("mytaskid"))

// 2番目のタスクは失敗します。前提として最初のタスクがまだ処理されていない場合、errはErrTaskIDConflictになります
_, err = client.Enqueue(task, asynq.TaskID("mytaskid"))

Uniqueオプションの使用

2番目の方法は一意性ロックに基づいています。Uniqueオプションを使用してタスクをエンキューすると、Clientは指定されたタスクのロックを取得できるかどうかをチェックします。ロックを取得できる場合のみタスクがエンキューされます。他のタスクが既にロックを保持している場合、Clientはエラーを返します(エラーのチェック方法については以下のコード例を参照してください)。

一意性ロックにはTTL(有効期限)が関連しており、ロックを永続的に保持することを避けるためです。ロックはTTL後に解放されるか、タスクがTTL前に正常に処理された場合に解放されます。

Asynqの一意のタスク機能はbest effort uniqueness(ベストエフォートの一意性)であることに注意してください。つまり、ロックがタスクの処理よりも先に期限切れになった場合、重複したタスクがエンキューされる可能性があります。

タスクの一意性は次の属性に基づいています。

  • タイプ
  • ペイロード
  • キュー

したがって、同じタイプとペイロードを持つタスクが同じキューにエンキューされている場合、ロックが解放されるまで同じ属性の他のタスクはエンキューされません。

c := asynq.NewClient(redis)

t1 := asynq.NewTask("example", []byte("hello"))

// t1は次の1時間、一意性のロックを保持します。
err := c.Enqueue(t1, asynq.Unique(time.Hour))
switch {
case errors.Is(err, asynq.ErrDuplicateTask):
    // 重複したタスクを処理
case err != nil:
    // その他のエラーを処理
}

t2 := asynq.NewTask("example", []byte("hello"))

// t2はt1の重複であるためエンキューできません。
err = c.Enqueue(t2, asynq.Unique(time.Hour))
switch {
case errors.Is(err, asynq.ErrDuplicateTask):
    // 重複したタスクを処理
case err != nil:
    // その他のエラーを処理
}

上記の例では、t2t1の重複であるためエンキューされません。返されたerror値がasynq.ErrDuplicateTaskをラップしているかどうかを確認するためにerrors.Isを使用できます。