Skip to content

Commit 12b0495

Browse files
committed
driver: fix duplicate id error with mvvc on put
Taking the maximum of the index is an implicit transactions, so it is always done with 'read-confirmed' mvcc isolation level. It can lead to errors when trying to make parallel 'put' calls with mvcc enabled. It is hapenning because 'max' for several puts in parallel will be the same since read confirmed isolation level makes visible all transactions that finished the commit. To fix it we wrap it with box.begin/commit and set right isolation level. Current fix does not resolve that bug in situations when we already are in transaction since it will open nested transactions. Part of #207
1 parent 16e3b74 commit 12b0495

File tree

5 files changed

+147
-4
lines changed

5 files changed

+147
-4
lines changed

queue/abstract/driver/fifo.lua

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,27 @@ end
6868

6969
-- put task in space
7070
function method.put(self, data, opts)
71-
local max = self.space.index.task_id:max()
71+
local max
72+
73+
-- Taking the maximum of the index is an implicit transactions, so it is
74+
-- always done with 'read-confirmed' mvcc isolation level.
75+
-- It can lead to errors when trying to make parallel 'put' calls with mvcc enabled.
76+
-- It is hapenning because 'max' for several puts in parallel will be the same since
77+
-- read confirmed isolation level makes visible all transactions that finished the commit.
78+
-- To fix it we wrap it with box.begin/commit and set right isolation level.
79+
-- Current fix does not resolve that bug in situations when we already are in transaction
80+
-- since it will open nested transactions.
81+
-- See https://github.com/tarantool/queue/issues/207
82+
-- See https://www.tarantool.io/ru/doc/latest/concepts/atomic/txn_mode_mvcc/
83+
84+
if box.cfg.memtx_use_mvcc_engine and (not box.is_in_txn()) then
85+
box.begin({txn_isolation = 'read-committed'})
86+
max = self.space.index.task_id:max()
87+
box.commit()
88+
else
89+
max = self.space.index.task_id:max()
90+
end
91+
7292
local id = max and max[1] + 1 or 0
7393
local task = self.space:insert{id, state.READY, data}
7494
self.on_task_change(task, 'put')

queue/abstract/driver/fifottl.lua

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -209,7 +209,27 @@ end
209209

210210
-- put task in space
211211
function method.put(self, data, opts)
212-
local max = self.space.index.task_id:max()
212+
local max
213+
214+
-- Taking the maximum of the index is an implicit transactions, so it is
215+
-- always done with 'read-confirmed' mvcc isolation level.
216+
-- It can lead to errors when trying to make parallel 'put' calls with mvcc enabled.
217+
-- It is hapenning because 'max' for several puts in parallel will be the same since
218+
-- read confirmed isolation level makes visible all transactions that finished the commit.
219+
-- To fix it we wrap it with box.begin/commit and set right isolation level.
220+
-- Current fix does not resolve that bug in situations when we already are in transaction
221+
-- since it will open nested transactions.
222+
-- See https://github.com/tarantool/queue/issues/207
223+
-- See https://www.tarantool.io/ru/doc/latest/concepts/atomic/txn_mode_mvcc/
224+
225+
if box.cfg.memtx_use_mvcc_engine and not box.is_in_txn() then
226+
box.begin({txn_isolation = 'read-committed'})
227+
max = self.space.index.task_id:max()
228+
box.commit()
229+
else
230+
max = self.space.index.task_id:max()
231+
end
232+
213233
local id = max and max[i_id] + 1 or 0
214234

215235
local status

queue/abstract/driver/utube.lua

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,27 @@ end
7575

7676
-- put task in space
7777
function method.put(self, data, opts)
78-
local max = self.space.index.task_id:max()
78+
local max
79+
80+
-- Taking the maximum of the index is an implicit transactions, so it is
81+
-- always done with 'read-confirmed' mvcc isolation level.
82+
-- It can lead to errors when trying to make parallel 'put' calls with mvcc enabled.
83+
-- It is hapenning because 'max' for several puts in parallel will be the same since
84+
-- read confirmed isolation level makes visible all transactions that finished the commit.
85+
-- To fix it we wrap it with box.begin/commit and set right isolation level.
86+
-- Current fix does not resolve that bug in situations when we already are in transaction
87+
-- since it will open nested transactions.
88+
-- See https://github.com/tarantool/queue/issues/207
89+
-- See https://www.tarantool.io/ru/doc/latest/concepts/atomic/txn_mode_mvcc/
90+
91+
if box.cfg.memtx_use_mvcc_engine and not box.is_in_txn() then
92+
box.begin({txn_isolation = 'read-committed'})
93+
max = self.space.index.task_id:max()
94+
box.commit()
95+
else
96+
max = self.space.index.task_id:max()
97+
end
98+
7999
local id = max and max[1] + 1 or 0
80100
local task = self.space:insert{id, state.READY, tostring(opts.utube), data}
81101
self.on_task_change(task, 'put')

queue/abstract/driver/utubettl.lua

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -217,7 +217,27 @@ end
217217

218218
-- put task in space
219219
function method.put(self, data, opts)
220-
local max = self.space.index.task_id:max()
220+
local max
221+
222+
-- Taking the maximum of the index is an implicit transactions, so it is
223+
-- always done with 'read-confirmed' mvcc isolation level.
224+
-- It can lead to errors when trying to make parallel 'put' calls with mvcc enabled.
225+
-- It is hapenning because 'max' for several puts in parallel will be the same since
226+
-- read confirmed isolation level makes visible all transactions that finished the commit.
227+
-- To fix it we wrap it with box.begin/commit and set right isolation level.
228+
-- Current fix does not resolve that bug in situations when we already are in transaction
229+
-- since it will open nested transactions.
230+
-- See https://github.com/tarantool/queue/issues/207
231+
-- See https://www.tarantool.io/ru/doc/latest/concepts/atomic/txn_mode_mvcc/
232+
233+
if box.cfg.memtx_use_mvcc_engine and not box.is_in_txn() then
234+
box.begin({txn_isolation = 'read-committed'})
235+
max = self.space.index.task_id:max()
236+
box.commit()
237+
else
238+
max = self.space.index.task_id:max()
239+
end
240+
221241
local id = max and max[i_id] + 1 or 0
222242

223243
local status

t/220-mvcc.t

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
#!/usr/bin/env tarantool
2+
local qc = require('queue.compat')
3+
local log = require('log')
4+
if not qc.check_version({2, 6, 1}) then
5+
log.info('Tests skipped, tarantool version < 2.6.1')
6+
return
7+
end
8+
local yaml = require('yaml')
9+
local fiber = require('fiber')
10+
11+
local test = require('tap').test()
12+
test:plan(6)
13+
14+
local queue = require('queue')
15+
local state = require('queue.abstract.state')
16+
17+
local tnt = require('t.tnt')
18+
tnt.cfg{memtx_use_mvcc_engine = true}
19+
20+
local engine = 'memtx'
21+
22+
test:ok(rawget(box, 'space'), 'box started')
23+
test:ok(queue, 'queue is loaded')
24+
25+
local tube = queue.create_tube('test', 'fifo', { engine = engine })
26+
test:ok(tube, 'test tube created')
27+
test:is(tube.name, 'test', 'tube.name')
28+
test:is(tube.type, 'fifo', 'tube.type')
29+
30+
test:test('concurent take', function(test)
31+
test:plan(151)
32+
33+
local channel = fiber.channel(1000)
34+
test:ok(channel, 'channel created')
35+
36+
local res = {}
37+
for i = 1, 50 do
38+
fiber.create(function(i)
39+
local taken = tube:take(1)
40+
test:ok(taken, 'Task was taken ' .. i)
41+
table.insert(res, { taken })
42+
channel:put(true)
43+
end, i)
44+
end
45+
46+
fiber.sleep(.1)
47+
test:ok(tube:put(1), 'task 1 was put')
48+
49+
for i = 2, 50 do
50+
fiber.create(function(i)
51+
test:ok(tube:put(i), 'task ' .. i .. ' was put')
52+
end, i)
53+
end
54+
fiber.sleep(.1)
55+
for i = 1, 50 do
56+
test:ok(channel:get(1 / i), 'take was done ' .. i)
57+
end
58+
end)
59+
60+
61+
tnt.finish()
62+
os.exit(test:check() and 0 or 1)
63+
-- vim: set ft=lua:

0 commit comments

Comments
 (0)