Recurso de Tarefa Única no Asynq garante que haja apenas uma tarefa na fila do Redis.

Quando você deseja deduplicar tarefas e evitar tarefas duplicadas, este recurso é muito útil.

Visão geral

Existem dois métodos para garantir a singularidade da tarefa no Asynq.

  1. Usando a opção TaskID: Gere um ID de tarefa único por conta própria.
  2. Usando a opção Unique: Deixe o Asynq criar um bloqueio de unicidade para a tarefa.

Usando a opção TaskID

Se você escolher o primeiro método, pode garantir que haja apenas uma tarefa com um determinado ID de tarefa em qualquer momento. Se você tentar enfileirar outra tarefa com o mesmo ID de tarefa, retornará um erro ErrTaskIDConflict.

// A primeira tarefa deve estar tudo bem
_, err := client.Enqueue(task, asynq.TaskID("mytaskid"))

// A segunda tarefa falhará, o err será ErrTaskIDConflict (assumindo que a primeira tarefa ainda não foi processada)
_, err = client.Enqueue(task, asynq.TaskID("mytaskid"))

Usando a opção Unique

O segundo método é baseado no bloqueio de unicidade. Ao enfileirar uma tarefa usando a opção Unique, o Client verificará se pode adquirir o bloqueio para a tarefa fornecida. A tarefa só será enfileirada se o bloqueio puder ser adquirido. Se outra tarefa já possuir o bloqueio, o Client retornará um erro (veja o código de exemplo abaixo de como verificar os erros).

O bloqueio de unicidade está associado a um TTL (tempo de vida) para evitar a retenção permanente do bloqueio. O bloqueio será liberado após o TTL ou após a tarefa ter sido processada com sucesso antes do TTL.

Um ponto importante a ser observado é que o recurso de tarefa única no Asynq é de unicidade de melhor esforço. Em outras palavras, se o bloqueio expirou antes que a tarefa seja processada, uma tarefa duplicada pode ser enfileirada.

A unicidade da tarefa é baseada nos seguintes atributos:

  • Tipo
  • Carga
  • Fila

Portanto, se houver tarefas com o mesmo tipo e carga enfileiradas na mesma fila, outra tarefa com os mesmos atributos não será enfileirada até que o bloqueio seja liberado.

c := asynq.NewClient(redis)

t1 := asynq.NewTask("exemplo", []byte("olá"))

// t1 manterá o bloqueio de unicidade pelas próximas horas.
err := c.Enqueue(t1, asynq.Unique(time.Hour))
switch {
case errors.Is(err, asynq.ErrDuplicateTask):
    // Lidar com tarefa duplicada
case err != nil:
    // Lidar com outros erros
}

t2 := asynq.NewTask("exemplo", []byte("olá"))

// t2 não pode ser enfileirado, pois é um duplicado de t1.
err = c.Enqueue(t2, asynq.Unique(time.Hour))
switch {
case errors.Is(err, asynq.ErrDuplicateTask):
    // Lidar com tarefa duplicada
case err != nil:
    // Lidar com outros erros
}

No exemplo acima, t2 não será enfileirado, pois é um duplicado de t1. Você pode usar errors.Is para verificar o valor de error retornado e determinar se ele envolve o erro asynq.ErrDuplicateTask.