Skip to content

Commit

Permalink
fifo: fix data race for put()/take() with vinyl
Browse files Browse the repository at this point in the history
put()/take() contains two parts:

1. Get a tuple from an index.
2. Insert a next tuple (with id based on the data from the first
   call)/update the current one.

For the vinyl engine it could lead to yield on the second step. As
result we could put a tuple with the same id or got the same task
multiple times.

This is a potential problem. It is very unlikely to get yield
on the second place because changes occur in the same place
in memory. So it is should not to be flushed from memory to disk
right after getting a tuple. But it could happen on practice, see a
problem with auto increment and concurrent lookups [1].

This issue has been fixed before for fifottl driver [2][3]. Here I
tried to fix it in a similar code style to make it easier to
maintain the code.

1. tarantool/tarantool#389
2. #28
3. #30
  • Loading branch information
oleg-jukovec committed Oct 31, 2023
1 parent cc77d61 commit 4b1aa1b
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 9 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.

### Fixed

- Data race with fifo driver for put()/take() methods with vinyl
engine (#64).

## 0.1.1 - 2023-09-06

The release fixes the loss of tasks in the `fifottl` driver.
Expand Down
36 changes: 27 additions & 9 deletions sharded_queue/drivers/fifo.lua
Original file line number Diff line number Diff line change
Expand Up @@ -74,21 +74,39 @@ end

-- put task in space
function method.put(args)
local idx = get_index(args)
local task_id = utils.pack_task_id(args.bucket_id, args.bucket_count, idx)
local task = get_space(args):insert { task_id, args.bucket_id, state.READY, args.data, idx }
local task = box.atomic(function()
local idx = get_index(args)
local task_id = utils.pack_task_id(
args.bucket_id,
args.bucket_count,
idx)

return get_space(args):insert {
task_id,
args.bucket_id,
state.READY,
args.data,
idx
}
end)

update_stat(args.tube_name, 'put')
return normalize_task(task)
end

-- take task
function method.take(args)
local task = get_space(args).index.status:min { state.READY }
if task ~= nil and task[3] == state.READY then
task = get_space(args):update(task.task_id, { { '=', 3, state.TAKEN } })
update_stat(args.tube_name, 'take')
return normalize_task(task)
end
local task = box.atomic(function()
local task = get_space(args).index.status:min { state.READY }
if task == nil or task[3] ~= state.READY then
return
end
return get_space(args):update(task.task_id, { { '=', 3, state.TAKEN } })
end)
if task == nil then return end

update_stat(args.tube_name, 'take')
return normalize_task(task)
end

function method.ack(args)
Expand Down

0 comments on commit 4b1aa1b

Please sign in to comment.