Unikalna funkcja zadania w Asynq zapewnia, że w kolejce Redis znajduje się tylko jedno zadanie.

Kiedy chcesz uniknąć zduplikowanych zadań, ta funkcja jest bardzo przydatna.

Przegląd

Istnieją dwie metody zapewnienia unikalności zadania w Asynq.

  1. Korzystając z opcji TaskID: Wygeneruj unikalne ID zadania samodzielnie.
  2. Korzystając z opcji Unique: Pozwól Asynq utworzyć blokadę unikalności dla zadania.

Korzystanie z opcji TaskID

Jeśli wybierzesz pierwszą metodę, możesz zapewnić, że w danym momencie istnieje tylko jedno zadanie z określonym ID zadania. Jeśli spróbujesz dodać kolejne zadanie z tym samym ID zadania, zostanie zwrócony błąd ErrTaskIDConflict.

// Pierwsze zadanie powinno być poprawne
_, err := client.Enqueue(task, asynq.TaskID("mytaskid"))

// Drugie zadanie zakończy się niepowodzeniem, błąd zostanie zwrócony jako ErrTaskIDConflict (zakładając, że pierwsze zadanie nie zostało jeszcze przetworzone)
_, err = client.Enqueue(task, asynq.TaskID("mytaskid"))

Korzystanie z opcji Unique

Drugi sposób opiera się na blokadzie unikalności. Podczas dodawania zadania za pomocą opcji Unique, Client sprawdzi, czy może uzyskać blokadę dla danego zadania. Zadanie zostanie dodane do kolejki tylko wtedy, gdy blokada zostanie uzyskana. Jeśli inne zadanie już posiada blokadę, Client zwróci błąd (zobacz poniższy przykładowy kod, jak sprawdzić błędy).

Blokada unikalności jest związana z czasem życia (TTL), aby uniknąć trwałego utrzymania blokady. Blokada zostanie zwolniona po upływie TTL lub po pomyślnym przetworzeniu zadania przed upływem TTL.

Warto zauważyć, że unikalna funkcja zadania w Asynq opiera się na unikalności w miarę możliwości. Innymi słowy, jeśli blokada wygaśnie przed przetworzeniem zadania, możliwe jest dodanie zduplikowanego zadania.

Unikalność zadania opiera się na następujących atrybutach:

  • Typ
  • Ładunek
  • Kolejka

Dlatego jeśli są zadania o takim samym typie i ładunku dodane do tej samej kolejki, inne zadanie o tych samych atrybutach nie zostanie dodane, dopóki blokada nie zostanie zwolniona.

c := asynq.NewClient(redis)

t1 := asynq.NewTask("example", []byte("hello"))

// t1 będzie trzymać blokadę unikalności przez następną godzinę.
err := c.Enqueue(t1, asynq.Unique(time.Hour))
switch {
case errors.Is(err, asynq.ErrDuplicateTask):
    // Obsłuż duplikat zadania
case err != nil:
    // Obsłuż inne błędy
}

t2 := asynq.NewTask("example", []byte("hello"))

// t2 nie może zostać dodane, ponieważ jest duplikatem t1.
err = c.Enqueue(t2, asynq.Unique(time.Hour))
switch {
case errors.Is(err, asynq.ErrDuplicateTask):
    // Obsłuż duplikat zadania
case err != nil:
    // Obsłuż inne błędy
}

W powyższym przykładzie t2 nie zostanie dodane, ponieważ jest duplikatem t1. Można użyć errors.Is, aby sprawdzić zwracaną wartość error i określić, czy zawiera błąd asynq.ErrDuplicateTask.