-
-
Notifications
You must be signed in to change notification settings - Fork 711
Unique Tasks
The unique tasks feature in Asynq make it simple to ensure that you have only one copy of a task enqueued in Redis.
This feature is useful when you want to deduplicate tasks to ensure that you are not creating redundant work.
There are two ways you can go about ensuring the uniqueness of tasks with Asynq.
- Using
TaskID
option: Generate a unique task ID on your own - Using
Unique
option: Let Asynq create a uniquness lock for the task
If you could go with the first approach, it's guaranteed that at any moment there's only one task with a given task ID. If you try to enqueue another task with the same task ID, you'll get ErrTaskIDConflict
error.
// First task should be ok
_, err := client.Enqueue(task, asynq.TaskID("mytaskid"))
// Second task will fail, err is ErrTaskIDConflict (assuming that the first task didn't get processed yet)
_, err = client.Enqueue(task, asynq.TaskID("mytaskid"))
The second approach is based on uniqueness locks. When enqueueing a task with Unique
option, Client
checks whether if it can acquire a lock for the given task. The task is enqueued only if the lock can be acquired. If there's already another task holding the lock, then the Client
will return an error (See example code below on how to inspect the error).
The uniqueness lock has a TTL associated with it to avoid holding the lock forever.
The lock will be released after the TTL or if the task holding the lock gets processed successfully before the TTL.
One important thing to note is that the Asynq's unique task feature is best-effort uniqueness. In other words, it's possible to enqueue a duplicate task if the lock has expired before the task gets processed.
The uniqueness of a task is based on the following properties:
- Type
- Payload
- Queue
So if there's a task with the same type and payload, and if it's enqueued to the same queue, then another task with these same properties won't be enqueued until the lock has been released.
c := asynq.NewClient(redis)
t1 := asynq.NewTask("example", []byte("hello"))
// t1 will hold the uniqueness lock for the next hour.
err := c.Enqueue(t1, asynq.Unique(time.Hour))
switch {
case errors.Is(err, asynq.ErrDuplicateTask):
// handle duplicate task
case err != nil:
// handle other errors
}
t2 := asynq.NewTask("example", []byte("hello"))
// t2 cannot be enqueued because it's a duplicate of t1.
err = c.Enqueue(t2, asynq.Unique(time.Hour))
switch {
case errors.Is(err, asynq.ErrDuplicateTask):
// handle duplicate task
case err != nil:
// handle other errors
}
In the above example, t2
won't be enqueued since t2
is a duplicate of t1
.
Returned error
value can be inspected using errors.Is
to see if it wraps asynq.ErrDuplicateTask
error.