Skip to content

Commit

Permalink
drivers: fix concurrency with mvcc
Browse files Browse the repository at this point in the history
By default idx:max() or idx:min() have read confirmed isolation level.
It could lead to a task duplication or double task take when we
already insert or update a task, commited, but it is not yet
confirmed.

See also:

1. tarantool/queue#207
2. tarantool/queue#211
  • Loading branch information
oleg-jukovec committed Oct 31, 2023
1 parent 4b1aa1b commit 38921d5
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 30 deletions.
4 changes: 2 additions & 2 deletions sharded_queue/drivers/fifo.lua
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ end

-- put task in space
function method.put(args)
local task = box.atomic(function()
local task = utils.atomic(function()
local idx = get_index(args)
local task_id = utils.pack_task_id(
args.bucket_id,
Expand All @@ -96,7 +96,7 @@ end

-- take task
function method.take(args)
local task = box.atomic(function()
local task = utils.atomic(function()
local task = get_space(args).index.status:min { state.READY }
if task == nil or task[3] ~= state.READY then
return
Expand Down
65 changes: 37 additions & 28 deletions sharded_queue/drivers/fifottl.lua
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ function method.put(args)
local ttr = args.ttr or args.options.ttr or ttl
local priority = args.priority or args.options.priority or 0

local task = box.atomic(function()
local task = utils.atomic(function()
local idx = get_index(args.tube_name, args.bucket_id)

local next_event
Expand Down Expand Up @@ -301,7 +301,7 @@ end

function method.take(args)

local task = box.atomic(take, args)
local task = utils.atomic(take, args)
if task == nil then return end

if args.extra and args.extra.log_request then
Expand All @@ -314,10 +314,14 @@ function method.take(args)
end

function method.delete(args)
box.begin()
local task = box.space[args.tube_name]:get(args.task_id)
box.space[args.tube_name]:delete(args.task_id)
box.commit()
local task = utils.atomic(function()
local task = box.space[args.tube_name]:get(args.task_id)
if task ~= nil then
box.space[args.tube_name]:delete(args.task_id)
end
return task
end)

if task ~= nil then
task = task:tomap()
task.status = state.DONE
Expand Down Expand Up @@ -355,10 +359,13 @@ function method.touch(args)
end

function method.ack(args)
box.begin()
local task = box.space[args.tube_name]:get(args.task_id)
box.space[args.tube_name]:delete(args.task_id)
box.commit()
local task = utils.atomic(function()
local task = box.space[args.tube_name]:get(args.task_id)
if task ~= nil then
box.space[args.tube_name]:delete(args.task_id)
end
return task
end)
if task ~= nil then
task = task:tomap()
task.status = state.DONE
Expand All @@ -385,15 +392,16 @@ function method.peek(args)
end

function method.release(args)
box.begin()
local task = box.space[args.tube_name]:get(args.task_id)
if task ~= nil then
task = box.space[args.tube_name]:update(args.task_id, {
{'=', index.status, state.READY},
{'=', index.next_event, task[index.created] + task[index.ttl]},
})
end
box.commit()
local task = utils.atomic(function()
local task = box.space[args.tube_name]:get(args.task_id)
if task ~= nil then
task = box.space[args.tube_name]:update(args.task_id, {
{'=', index.status, state.READY},
{'=', index.next_event, task[index.created] + task[index.ttl]},
})
end
return task
end)

if args.extra and args.extra.log_request then
log_operation("release", task)
Expand All @@ -408,15 +416,16 @@ function method.bury(args)
update_stat(args.tube_name, 'bury')
wc_signal(args.tube_name)

box.begin()
local task = box.space[args.tube_name]:get(args.task_id)
if task ~= nil then
task = box.space[args.tube_name]:update(args.task_id, {
{'=', index.status, state.BURIED},
{'=', index.next_event, task[index.created] + task[index.ttl]},
})
end
box.commit()
local task = utils.atomic(function()
local task = box.space[args.tube_name]:get(args.task_id)
if task ~= nil then
task = box.space[args.tube_name]:update(args.task_id, {
{'=', index.status, state.BURIED},
{'=', index.next_event, task[index.created] + task[index.ttl]},
})
end
return task
end)

if args.extra and args.extra.log_request then
log_operation("bury", task)
Expand Down
26 changes: 26 additions & 0 deletions sharded_queue/utils.lua
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,32 @@ local fiber = require('fiber')

local utils = {}

local function atomic_tail(status, ...)
if not status then
box.rollback()
error((...), 2)
end
box.commit()
return ...
end

-- box.atomic(opts, fun, args) does not supported for all Tarantool's versions,
-- so we an analog.
function utils.atomic(fun, ...)
if box.cfg.memtx_use_mvcc_engine then
-- max() + insert() or min() + update() do not work as expected with
-- best-effort visibility: for write transactions it chooses
-- read-committed, for read transactions it chooses read-confirmed.
--
-- So max()/min() could return the same tuple even if a concurrent
-- insert()/update() has been committed, but has not confirmed yet.
box.begin({txn_isolation = 'read-committed'})
else
box.begin()
end
return atomic_tail(pcall(fun, ...))
end

function utils.array_shuffle(array)
if not array then return nil end
math.randomseed(tonumber(0ULL + fiber.time64()))
Expand Down

0 comments on commit 38921d5

Please sign in to comment.