Уникальная функция задачи в Asynq гарантирует, что в очереди Redis есть только одна задача.
Когда вам нужно исключить дубликаты задач и избежать повторных задач, эта функция очень полезна.
Обзор
В Asynq существуют два метода обеспечения уникальности задач.
- Использование опции
TaskID
: Сгенерируйте уникальный идентификатор задачи самостоятельно. - Использование опции
Unique
: Позвольте Asynq создать уникальный блокировку для задачи.
Использование опции TaskID
Если вы выберете первый метод, вы можете гарантировать, что в любой момент времени существует только одна задача с заданным идентификатором задачи. Если вы попытаетесь поставить в очередь другую задачу с тем же идентификатором задачи, будет возвращена ошибка ErrTaskIDConflict
.
// Первая задача должна быть успешно добавлена в очередь
_, err := client.Enqueue(task, asynq.TaskID("mytaskid"))
// Вторая задача не будет добавлена, err будет содержать ошибку ErrTaskIDConflict (при условии, что первая задача еще не была обработана)
_, err = client.Enqueue(task, asynq.TaskID("mytaskid"))
Использование опции Unique
Второй метод основан на блокировке уникальности. При постановке задачи в очередь с использованием опции Unique
, Client
проверит, может ли он получить блокировку для данной задачи. Задача будет добавлена в очередь только в том случае, если блокировка может быть получена. Если другая задача уже удерживает блокировку, Client
вернет ошибку (смотрите пример кода ниже, как проверить ошибки).
Блокировка уникальности связана с TTL (время жизни) для того, чтобы избежать постоянного удержания блокировки. Блокировка будет освобождена после TTL или после успешной обработки задачи до TTL.
Важно отметить, что уникальная функция задачи в Asynq представляет собой уникальность на усмотрение. Иными словами, если блокировка истекла до обработки задачи, дубликат задачи может быть добавлен в очередь.
Уникальность задачи основана на следующих атрибутах:
- Тип
- Нагрузка
- Очередь
Следовательно, если в очередь были поставлены задачи с тем же типом и нагрузкой, другая задача с теми же атрибутами не будет добавлена в очередь, пока блокировка не будет освобождена.
c := asynq.NewClient(redis)
t1 := asynq.NewTask("example", []byte("hello"))
// t1 будет удерживать блокировку уникальности следующий час.
err := c.Enqueue(t1, asynq.Unique(time.Hour))
switch {
case errors.Is(err, asynq.ErrDuplicateTask):
// Обработать дублирующуюся задачу
case err != nil:
// Обработать другие ошибки
}
t2 := asynq.NewTask("example", []byte("hello"))
// t2 не может быть добавлена в очередь, так как это дубликат t1.
err = c.Enqueue(t2, asynq.Unique(time.Hour))
switch {
case errors.Is(err, asynq.ErrDuplicateTask):
// Обработать дублирующуюся задачу
case err != nil:
// Обработать другие ошибки
}
В приведенном выше примере t2
не будет добавлена в очередь, так как это дубликат t1
. Вы можете использовать errors.Is
для проверки возвращенного значения error
, чтобы определить, обертывает ли оно ошибку asynq.ErrDuplicateTask
.