La característica de tarea única en Asynq garantiza que solo haya una tarea en la cola de Redis.

Cuando se desea eliminar tareas duplicadas y evitar tareas duplicadas, esta característica es muy útil.

Visión general

Hay dos métodos para garantizar la unicidad de las tareas en Asynq.

  1. Usar la opción TaskID: Generar un ID de tarea único por ti mismo.
  2. Usar la opción Unique: Dejar que Asynq cree un bloqueo de unicidad para la tarea.

Usando la opción TaskID

Si eliges el primer método, puedes garantizar que solo haya una tarea con el ID de tarea dado en un momento dado. Si intentas encolar otra tarea con el mismo ID de tarea, devolverá un error ErrTaskIDConflict.

// La primera tarea debería estar bien
_, err := client.Enqueue(task, asynq.TaskID("miidetarea"))

// La segunda tarea fallará, err será ErrTaskIDConflict (asumiendo que la primera tarea aún no se ha procesado)
_, err = client.Enqueue(task, asynq.TaskID("miidetarea"))

Usando la opción Unique

El segundo método se basa en el bloqueo de unicidad. Al encolar una tarea usando la opción Unique, el Client verificará si puede adquirir el bloqueo para la tarea dada. La tarea solo se encolará si se puede adquirir el bloqueo. Si otra tarea ya posee el bloqueo, el Client devolverá un error (ver código de ejemplo a continuación sobre cómo comprobar los errores).

El bloqueo de unicidad está asociado con un TTL (tiempo de vida) para evitar mantener el bloqueo permanentemente. El bloqueo se liberará después del TTL o después de que la tarea se haya procesado correctamente antes del TTL.

Un punto importante a tener en cuenta es que la característica de tarea única en Asynq es un unicidad con el mejor esfuerzo. En otras palabras, si el bloqueo ha caducado antes de que se procese la tarea, puede encolarse una tarea duplicada.

La unicidad de la tarea se basa en los siguientes atributos:

  • Tipo
  • Carga
  • Cola

Por lo tanto, si hay tareas con el mismo tipo y carga encoladas en la misma cola, otra tarea con los mismos atributos no se encolará hasta que se libere el bloqueo.

c := asynq.NewClient(redis)

t1 := asynq.NewTask("ejemplo", []byte("hola"))

// t1 mantendrá el bloqueo de unicidad durante la próxima hora.
err := c.Enqueue(t1, asynq.Unique(time.Hour))
switch {
case errors.Is(err, asynq.ErrDuplicateTask):
    // Manejar tarea duplicada
case err != nil:
    // Manejar otros errores
}

t2 := asynq.NewTask("ejemplo", []byte("hola"))

// t2 no se puede encolar ya que es un duplicado de t1.
err = c.Enqueue(t2, asynq.Unique(time.Hour))
switch {
case errors.Is(err, asynq.ErrDuplicateTask):
    // Manejar tarea duplicada
case err != nil:
    // Manejar otros errores
}

En el ejemplo anterior, t2 no se encolará ya que es un duplicado de t1. Se puede usar errors.Is para comprobar el valor de error devuelto y determinar si envuelve el error asynq.ErrDuplicateTask.