Уникальная функция задачи в Asynq гарантирует, что в очереди Redis есть только одна задача.

Когда вам нужно исключить дубликаты задач и избежать повторных задач, эта функция очень полезна.

Обзор

В Asynq существуют два метода обеспечения уникальности задач.

  1. Использование опции TaskID: Сгенерируйте уникальный идентификатор задачи самостоятельно.
  2. Использование опции Unique: Позвольте Asynq создать уникальный блокировку для задачи.

Использование опции TaskID

Если вы выберете первый метод, вы можете гарантировать, что в любой момент времени существует только одна задача с заданным идентификатором задачи. Если вы попытаетесь поставить в очередь другую задачу с тем же идентификатором задачи, будет возвращена ошибка ErrTaskIDConflict.

// Первая задача должна быть успешно добавлена в очередь
_, err := client.Enqueue(task, asynq.TaskID("mytaskid"))

// Вторая задача не будет добавлена, err будет содержать ошибку ErrTaskIDConflict (при условии, что первая задача еще не была обработана)
_, err = client.Enqueue(task, asynq.TaskID("mytaskid"))

Использование опции Unique

Второй метод основан на блокировке уникальности. При постановке задачи в очередь с использованием опции Unique, Client проверит, может ли он получить блокировку для данной задачи. Задача будет добавлена в очередь только в том случае, если блокировка может быть получена. Если другая задача уже удерживает блокировку, Client вернет ошибку (смотрите пример кода ниже, как проверить ошибки).

Блокировка уникальности связана с TTL (время жизни) для того, чтобы избежать постоянного удержания блокировки. Блокировка будет освобождена после TTL или после успешной обработки задачи до TTL.

Важно отметить, что уникальная функция задачи в Asynq представляет собой уникальность на усмотрение. Иными словами, если блокировка истекла до обработки задачи, дубликат задачи может быть добавлен в очередь.

Уникальность задачи основана на следующих атрибутах:

  • Тип
  • Нагрузка
  • Очередь

Следовательно, если в очередь были поставлены задачи с тем же типом и нагрузкой, другая задача с теми же атрибутами не будет добавлена в очередь, пока блокировка не будет освобождена.

c := asynq.NewClient(redis)

t1 := asynq.NewTask("example", []byte("hello"))

// t1 будет удерживать блокировку уникальности следующий час.
err := c.Enqueue(t1, asynq.Unique(time.Hour))
switch {
case errors.Is(err, asynq.ErrDuplicateTask):
    // Обработать дублирующуюся задачу
case err != nil:
    // Обработать другие ошибки
}

t2 := asynq.NewTask("example", []byte("hello"))

// t2 не может быть добавлена в очередь, так как это дубликат t1.
err = c.Enqueue(t2, asynq.Unique(time.Hour))
switch {
case errors.Is(err, asynq.ErrDuplicateTask):
    // Обработать дублирующуюся задачу
case err != nil:
    // Обработать другие ошибки
}

В приведенном выше примере t2 не будет добавлена в очередь, так как это дубликат t1. Вы можете использовать errors.Is для проверки возвращенного значения error, чтобы определить, обертывает ли оно ошибку asynq.ErrDuplicateTask.