Skip to content

Commit 8db176b

Browse files
committed
driver: fix duplicate id error with mvvc on on take
Taking the minimum of the index is an implicit transactions, so it is always done with 'read-confirmed' mvcc isolation level. It can lead to errors when trying to make parallel 'take' calls with mvcc enabled. It is hapenning because 'min' for several takes in parallel will be the same since read confirmed isolation level makes visible all transactions that finished the commit. To fix it we wrap it with box.begin/commit and set right isolation level. Current fix does not resolve that bug in situations when we already are in transaction since it will open nested transactions. Part of #207
1 parent 12b0495 commit 8db176b

File tree

4 files changed

+71
-7
lines changed

4 files changed

+71
-7
lines changed

queue/abstract/driver/fifo.lua

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,25 @@ end
9797

9898
-- take task
9999
function method.take(self)
100-
local task = self.space.index.status:min{state.READY}
100+
local task
101+
-- Taking the minimum is an implicit transactions, so it is
102+
-- always done with 'read-confirmed' mvcc isolation level.
103+
-- It can lead to errors when trying to make parallel 'take' calls with mvcc enabled.
104+
-- It is hapenning because 'min' for several takes in parallel will be the same since
105+
-- read confirmed isolation level makes visible all transactions that finished the commit.
106+
-- To fix it we wrap it with box.begin/commit and set right isolation level.
107+
-- Current fix does not resolve that bug in situations when we already are in transaction
108+
-- since it will open nested transactions.
109+
-- See https://github.com/tarantool/queue/issues/207
110+
-- See https://www.tarantool.io/ru/doc/latest/concepts/atomic/txn_mode_mvcc/
111+
if box.cfg.memtx_use_mvcc_engine and (not box.is_in_txn()) then
112+
box.begin({txn_isolation = 'read-committed'})
113+
task = self.space.index.status:min{state.READY}
114+
box.commit()
115+
else
116+
task = self.space.index.status:min{state.READY}
117+
end
118+
101119
if task ~= nil and task[2] == state.READY then
102120
task = self.space:update(task[1], { { '=', 2, state.TAKEN } })
103121
self.on_task_change(task, 'take')

queue/abstract/driver/fifottl.lua

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -285,10 +285,21 @@ end
285285
-- take task
286286
function method.take(self)
287287
local task = nil
288-
for _, t in self.space.index.status:pairs({state.READY}) do
289-
if not is_expired(t) then
290-
task = t
291-
break
288+
if box.cfg.memtx_use_mvcc_engine and (not box.is_in_txn()) then
289+
box.begin({txn_isolation = 'read-committed'})
290+
for _, t in self.space.index.status:pairs({state.READY}) do
291+
if not is_expired(t) then
292+
task = t
293+
break
294+
end
295+
end
296+
box.commit()
297+
else
298+
for _, t in self.space.index.status:pairs({state.READY}) do
299+
if not is_expired(t) then
300+
task = t
301+
break
302+
end
292303
end
293304
end
294305

queue/abstract/driver/utube.lua

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,8 +109,25 @@ function method.take(self)
109109
if task[2] ~= state.READY then
110110
break
111111
end
112+
local taken
113+
-- Taking the minimum is an implicit transactions, so it is
114+
-- always done with 'read-confirmed' mvcc isolation level.
115+
-- It can lead to errors when trying to make parallel 'take' calls with mvcc enabled.
116+
-- It is hapenning because 'min' for several takes in parallel will be the same since
117+
-- read confirmed isolation level makes visible all transactions that finished the commit.
118+
-- To fix it we wrap it with box.begin/commit and set right isolation level.
119+
-- Current fix does not resolve that bug in situations when we already are in transaction
120+
-- since it will open nested transactions.
121+
-- See https://github.com/tarantool/queue/issues/207
122+
-- See https://www.tarantool.io/ru/doc/latest/concepts/atomic/txn_mode_mvcc/
123+
if box.cfg.memtx_use_mvcc_engine and (not box.is_in_txn()) then
124+
box.begin({txn_isolation = 'read-committed'})
125+
taken = self.space.index.utube:min{state.TAKEN, task[3]}
126+
box.commit()
127+
else
128+
taken = self.space.index.utube:min{state.TAKEN, task[3]}
129+
end
112130

113-
local taken = self.space.index.utube:min{state.TAKEN, task[3]}
114131
if taken == nil or taken[2] ~= state.TAKEN then
115132
task = self.space:update(task[1], { { '=', 2, state.TAKEN } })
116133
self.on_task_change(task, 'take')

queue/abstract/driver/utubettl.lua

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -300,7 +300,25 @@ function method.take(self)
300300
break
301301
elseif not is_expired(t) then
302302
local next_event = util.time() + t[i_ttr]
303-
local taken = self.space.index.utube:min{state.TAKEN, t[i_utube]}
303+
local taken
304+
-- Taking the minimum is an implicit transactions, so it is
305+
-- always done with 'read-confirmed' mvcc isolation level.
306+
-- It can lead to errors when trying to make parallel 'take' calls with mvcc enabled.
307+
-- It is hapenning because 'min' for several takes in parallel will be the same since
308+
-- read confirmed isolation level makes visible all transactions that finished the commit.
309+
-- To fix it we wrap it with box.begin/commit and set right isolation level.
310+
-- Current fix does not resolve that bug in situations when we already are in transaction
311+
-- since it will open nested transactions.
312+
-- See https://github.com/tarantool/queue/issues/207
313+
-- See https://www.tarantool.io/ru/doc/latest/concepts/atomic/txn_mode_mvcc/
314+
if box.cfg.memtx_use_mvcc_engine and (not box.is_in_txn()) then
315+
box.begin({txn_isolation = 'read-committed'})
316+
taken = self.space.index.utube:min{state.TAKEN, t[i_utube]}
317+
box.commit()
318+
else
319+
taken = self.space.index.utube:min{state.TAKEN, t[i_utube]}
320+
end
321+
304322
if taken == nil or taken[i_status] ~= state.TAKEN then
305323
t = self.space:update(t[1], {
306324
{ '=', i_status, state.TAKEN },

0 commit comments

Comments
 (0)