Skip to content

Commit

Permalink
driver: fix duplicate id error with mvvc on
Browse files Browse the repository at this point in the history
Taking the maximum of the index is an implicit transactions, so it is
always done with 'read-confirmed' mvcc isolation level.
It can lead to errors when trying to make parallel 'put' calls with mvcc enabled.
It is hapenning because 'max' for several puts in parallel will be the same since
read confirmed isolation level makes visible all transactions that finished the commit.
To fix it we wrap it with box.begin/commit and set right isolation level.

Closes #207
  • Loading branch information
better0fdead committed Jul 10, 2023
1 parent 16e3b74 commit 2a5186b
Show file tree
Hide file tree
Showing 5 changed files with 180 additions and 4 deletions.
34 changes: 33 additions & 1 deletion queue/abstract/driver/fifo.lua
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ local state = require('queue.abstract.state')

local num_type = require('queue.compat').num_type
local str_type = require('queue.compat').str_type
local check_version = require('queue.compat').check_version

local tube = {}
local method = {}
Expand Down Expand Up @@ -66,9 +67,40 @@ function method.normalize_task(self, task)
return task
end

-- check if mvcc is enabled
-- returns true if mvcc is enabled
local function check_mvcc_state()
if not check_version({2, 6, 1}) then
return false
end

if box.cfg.memtx_use_mvcc_engine then
return true
end

return false
end

-- put task in space
function method.put(self, data, opts)
local max = self.space.index.task_id:max()
local max

-- taking the maximum of the index is an implicit transactions, so it is
-- always done with 'read-confirmed' mvcc isolation level.
-- It can lead to errors when trying to make parallel 'put' calls with mvcc enabled.
-- It is hapenning because 'max' for several puts in parallel will be the same since
-- read confirmed isolation level makes visible all transactions that finished the commit.
-- To fix it we wrap it with box.begin/commit and set right isolation level.
-- See https://github.com/tarantool/queue/issues/207
-- See https://www.tarantool.io/ru/doc/latest/concepts/atomic/txn_mode_mvcc/
if check_mvcc_state() then
box.begin({txn_isolation = 'read-committed'})
max = self.space.index.task_id:max()
box.commit()
else
max = self.space.index.task_id:max()
end

local id = max and max[1] + 1 or 0
local task = self.space:insert{id, state.READY, data}
self.on_task_change(task, 'put')
Expand Down
29 changes: 28 additions & 1 deletion queue/abstract/driver/fifottl.lua
Original file line number Diff line number Diff line change
Expand Up @@ -207,9 +207,36 @@ function method.normalize_task(self, task)
return task and task:transform(3, 5)
end

-- check if mvcc is enabled
local function check_mvcc_state()
local cfg = box.cfg
if cfg['memtx_use_mvcc_engine'] == nil or cfg['memtx_use_mvcc_engine'] == false then
return false
end

return true
end

-- put task in space
function method.put(self, data, opts)
local max = self.space.index.task_id:max()
local max

-- taking the maximum of the index is an implicit transactions, so it is
-- always done with 'read-confirmed' mvcc isolation level.
-- It can lead to errors when trying to make parallel 'put' calls with mvcc enabled.
-- It is hapenning because 'max' for several puts in parallel will be the same since
-- read confirmed isolation level makes visible all transactions that finished the commit.
-- To fix it we wrap it with box.begin/commit and set right isolation level.
-- See https://github.com/tarantool/queue/issues/207
-- See https://www.tarantool.io/ru/doc/latest/concepts/atomic/txn_mode_mvcc/
if check_mvcc_state() then
box.begin({txn_isolation = 'read-committed'})
max = self.space.index.task_id:max()
box.commit()
else
max = self.space.index.task_id:max()
end

local id = max and max[i_id] + 1 or 0

local status
Expand Down
29 changes: 28 additions & 1 deletion queue/abstract/driver/utube.lua
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,36 @@ function method.normalize_task(self, task)
return task and task:transform(3, 1)
end

-- check if mvcc is enabled
local function check_mvcc_state()
local cfg = box.cfg
if cfg['memtx_use_mvcc_engine'] == nil or cfg['memtx_use_mvcc_engine'] == false then
return false
end

return true
end

-- put task in space
function method.put(self, data, opts)
local max = self.space.index.task_id:max()
local max

-- taking the maximum of the index is an implicit transactions, so it is
-- always done with 'read-confirmed' mvcc isolation level.
-- It can lead to errors when trying to make parallel 'put' calls with mvcc enabled.
-- It is hapenning because 'max' for several puts in parallel will be the same since
-- read confirmed isolation level makes visible all transactions that finished the commit.
-- To fix it we wrap it with box.begin/commit and set right isolation level.
-- See https://github.com/tarantool/queue/issues/207
-- See https://www.tarantool.io/ru/doc/latest/concepts/atomic/txn_mode_mvcc/
if check_mvcc_state() then
box.begin({txn_isolation = 'read-committed'})
max = self.space.index.task_id:max()
box.commit()
else
max = self.space.index.task_id:max()
end

local id = max and max[1] + 1 or 0
local task = self.space:insert{id, state.READY, tostring(opts.utube), data}
self.on_task_change(task, 'put')
Expand Down
29 changes: 28 additions & 1 deletion queue/abstract/driver/utubettl.lua
Original file line number Diff line number Diff line change
Expand Up @@ -215,9 +215,36 @@ function method.normalize_task(self, task)
return task and task:transform(i_next_event, i_data - i_next_event)
end

-- check if mvcc is enabled
local function check_mvcc_state()
local cfg = box.cfg
if cfg['memtx_use_mvcc_engine'] == nil or cfg['memtx_use_mvcc_engine'] == false then
return false
end

return true
end

-- put task in space
function method.put(self, data, opts)
local max = self.space.index.task_id:max()
local max

-- taking the maximum of the index is an implicit transactions, so it is
-- always done with 'read-confirmed' mvcc isolation level.
-- It can lead to errors when trying to make parallel 'put' calls with mvcc enabled.
-- It is hapenning because 'max' for several puts in parallel will be the same since
-- read confirmed isolation level makes visible all transactions that finished the commit.
-- To fix it we wrap it with box.begin/commit and set right isolation level.
-- See https://github.com/tarantool/queue/issues/207
-- See https://www.tarantool.io/ru/doc/latest/concepts/atomic/txn_mode_mvcc/
if check_mvcc_state() then
box.begin({txn_isolation = 'read-committed'})
max = self.space.index.task_id:max()
box.commit()
else
max = self.space.index.task_id:max()
end

local id = max and max[i_id] + 1 or 0

local status
Expand Down
63 changes: 63 additions & 0 deletions t/220-mvcc.t
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
#!/usr/bin/env tarantool
local qc = require('queue.compat')
local log = require('log')
if not qc.check_version({2, 6, 1}) then
log.info('Tests skipped, tarantool version < 2.6.1')
return
end
local yaml = require('yaml')
local fiber = require('fiber')

local test = require('tap').test()
test:plan(6)

local queue = require('queue')
local state = require('queue.abstract.state')

local tnt = require('t.tnt')
tnt.cfg{memtx_use_mvcc_engine = true}

local engine = 'memtx'

test:ok(rawget(box, 'space'), 'box started')
test:ok(queue, 'queue is loaded')

local tube = queue.create_tube('test', 'fifo', { engine = engine })
test:ok(tube, 'test tube created')
test:is(tube.name, 'test', 'tube.name')
test:is(tube.type, 'fifo', 'tube.type')

test:test('concurent take', function(test)
test:plan(151)

local channel = fiber.channel(1000)
test:ok(channel, 'channel created')

local res = {}
for i = 1, 50 do
fiber.create(function(i)
local taken = tube:take(1)
test:ok(taken, 'Task was taken ' .. i)
table.insert(res, { taken })
channel:put(true)
end, i)
end

fiber.sleep(.1)
test:ok(tube:put(1), 'task 1 was put')

for i = 2, 50 do
fiber.create(function(i)
test:ok(tube:put(i), 'task ' .. i .. ' was put')
end, i)
end
fiber.sleep(.1)
for i = 1, 50 do
test:ok(channel:get(1 / i), 'take was done ' .. i)
end
end)


tnt.finish()
os.exit(test:check() and 0 or 1)
-- vim: set ft=lua:

0 comments on commit 2a5186b

Please sign in to comment.