Unique Task Feature in Asynq ensures that there is only one task in the Redis queue.
When you want to deduplicate tasks and avoid duplicate tasks, this feature is very useful.
Overview
There are two methods to ensure task uniqueness in Asynq.
- Using the
TaskID
option: Generate a unique task ID by yourself. - Using the
Unique
option: Let Asynq create a uniqueness lock for the task.
Using the TaskID
Option
If you choose the first method, you can ensure that there is only one task with a given task ID at any given time. If you attempt to enqueue another task with the same task ID, it will return an ErrTaskIDConflict
error.
// The first task should be fine
_, err := client.Enqueue(task, asynq.TaskID("mytaskid"))
// The second task will fail, err will be ErrTaskIDConflict (assuming the first task has not been processed yet)
_, err = client.Enqueue(task, asynq.TaskID("mytaskid"))
Using the Unique
Option
The second method is based on uniqueness lock. When enqueuing a task using the Unique
option, the Client
will check if it can acquire the lock for the given task. The task will only be enqueued if the lock can be acquired. If another task already holds the lock, the Client
will return an error (see the example code below on how to check for errors).
The uniqueness lock is associated with a TTL (time to live) to avoid holding the lock permanently. The lock will be released after the TTL or after the task has been successfully processed before the TTL.
One important thing to note is that the unique task feature in Asynq is best effort uniqueness. In other words, if the lock has expired before the task is processed, a duplicate task may be enqueued.
The uniqueness of the task is based on the following attributes:
- Type
- Payload
- Queue
Therefore, if there are tasks with the same type and payload enqueued to the same queue, another task with the same attributes will not be enqueued until the lock is released.
c := asynq.NewClient(redis)
t1 := asynq.NewTask("example", []byte("hello"))
// t1 will hold the uniqueness lock for the next hour.
err := c.Enqueue(t1, asynq.Unique(time.Hour))
switch {
case errors.Is(err, asynq.ErrDuplicateTask):
// Handle duplicate task
case err != nil:
// Handle other errors
}
t2 := asynq.NewTask("example", []byte("hello"))
// t2 cannot be enqueued as it is a duplicate of t1.
err = c.Enqueue(t2, asynq.Unique(time.Hour))
switch {
case errors.Is(err, asynq.ErrDuplicateTask):
// Handle duplicate task
case err != nil:
// Handle other errors
}
In the above example, t2
will not be enqueued as it is a duplicate of t1
. You can use errors.Is
to check the returned error
value to determine if it wraps the asynq.ErrDuplicateTask
error.