Recurso de Tarefa Única no Asynq garante que haja apenas uma tarefa na fila do Redis.
Quando você deseja deduplicar tarefas e evitar tarefas duplicadas, este recurso é muito útil.
Visão geral
Existem dois métodos para garantir a singularidade da tarefa no Asynq.
- Usando a opção
TaskID
: Gere um ID de tarefa único por conta própria. - Usando a opção
Unique
: Deixe o Asynq criar um bloqueio de unicidade para a tarefa.
Usando a opção TaskID
Se você escolher o primeiro método, pode garantir que haja apenas uma tarefa com um determinado ID de tarefa em qualquer momento. Se você tentar enfileirar outra tarefa com o mesmo ID de tarefa, retornará um erro ErrTaskIDConflict
.
// A primeira tarefa deve estar tudo bem
_, err := client.Enqueue(task, asynq.TaskID("mytaskid"))
// A segunda tarefa falhará, o err será ErrTaskIDConflict (assumindo que a primeira tarefa ainda não foi processada)
_, err = client.Enqueue(task, asynq.TaskID("mytaskid"))
Usando a opção Unique
O segundo método é baseado no bloqueio de unicidade. Ao enfileirar uma tarefa usando a opção Unique
, o Client
verificará se pode adquirir o bloqueio para a tarefa fornecida. A tarefa só será enfileirada se o bloqueio puder ser adquirido. Se outra tarefa já possuir o bloqueio, o Client
retornará um erro (veja o código de exemplo abaixo de como verificar os erros).
O bloqueio de unicidade está associado a um TTL (tempo de vida) para evitar a retenção permanente do bloqueio. O bloqueio será liberado após o TTL ou após a tarefa ter sido processada com sucesso antes do TTL.
Um ponto importante a ser observado é que o recurso de tarefa única no Asynq é de unicidade de melhor esforço. Em outras palavras, se o bloqueio expirou antes que a tarefa seja processada, uma tarefa duplicada pode ser enfileirada.
A unicidade da tarefa é baseada nos seguintes atributos:
- Tipo
- Carga
- Fila
Portanto, se houver tarefas com o mesmo tipo e carga enfileiradas na mesma fila, outra tarefa com os mesmos atributos não será enfileirada até que o bloqueio seja liberado.
c := asynq.NewClient(redis)
t1 := asynq.NewTask("exemplo", []byte("olá"))
// t1 manterá o bloqueio de unicidade pelas próximas horas.
err := c.Enqueue(t1, asynq.Unique(time.Hour))
switch {
case errors.Is(err, asynq.ErrDuplicateTask):
// Lidar com tarefa duplicada
case err != nil:
// Lidar com outros erros
}
t2 := asynq.NewTask("exemplo", []byte("olá"))
// t2 não pode ser enfileirado, pois é um duplicado de t1.
err = c.Enqueue(t2, asynq.Unique(time.Hour))
switch {
case errors.Is(err, asynq.ErrDuplicateTask):
// Lidar com tarefa duplicada
case err != nil:
// Lidar com outros erros
}
No exemplo acima, t2
não será enfileirado, pois é um duplicado de t1
. Você pode usar errors.Is
para verificar o valor de error
retornado e determinar se ele envolve o erro asynq.ErrDuplicateTask
.