Skip to content

Commit f316ad1

Browse files
committed
storage: fix moved buckets ref check
During the partial Map-Reduce the router might visit some storages more than once. Happens when after a ref on storage-A another storage-B reports A as having taken some buckets. Then router would come back to A to confirm that. The storage still must hold its previously created ref in order for such checks to make any sense. Otherwise any of the previously confirmed buckets could have had escaped by now. Without the ref-checking the router could reach the Map stage and send some Map requests even though could detect earlier, that not all storages would succeed. This wasn't strictly speaking a bug, but it was clearly suboptimal behaviour leading to the requests being executed not on all the needed storages while the others would report errors. NO_DOC=internal
1 parent ead3770 commit f316ad1

File tree

5 files changed

+80
-38
lines changed

5 files changed

+80
-38
lines changed

test/router-luatest/map_part_test.lua

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -267,6 +267,18 @@ g.test_map_part_ref_timeout = function(cg)
267267
end
268268
end, {{bid1, bid2, bid3, bid4}})
269269

270+
-- Count the map calls. The loss of ref must be detected before the
271+
-- map-stage.
272+
local _, err = vtest.cluster_exec_each_master(cg, function()
273+
rawset(_G, 'old_do_map', _G.do_map)
274+
rawset(_G, 'map_count', 0)
275+
_G.do_map = function(...)
276+
_G.map_count = _G.map_count + 1
277+
return _G.old_do_map(...)
278+
end
279+
end)
280+
t.assert_equals(err, nil)
281+
270282
-- Send bucket so the router thinks:
271283
-- rs1: {b1, b2}, rs2: {b3, b4}
272284
-- and actually the state is:
@@ -324,11 +336,20 @@ g.test_map_part_ref_timeout = function(cg)
324336
t.assert_equals(res.err_uuid, cg.rs2_uuid)
325337

326338
-- Make sure there are no references left.
327-
local _, err = vtest.cluster_exec_each(cg, function()
339+
_, err = vtest.cluster_exec_each(cg, function()
328340
ilt.assert_equals(require('vshard.storage.ref').count, 0)
329341
end)
330342
t.assert_equals(err, nil)
331343

344+
-- No maps had a chance to get executed.
345+
_, err = vtest.cluster_exec_each_master(cg, function()
346+
ilt.assert_equals(_G.map_count, 0)
347+
_G.do_map = _G.old_do_map
348+
_G.old_do_map = nil
349+
_G.map_count = nil
350+
end)
351+
t.assert_equals(err, nil)
352+
332353
-- Return the bucket back and re-enable discovery on the router.
333354
cg.replica_2_a:exec(function(bid, to)
334355
_G.bucket_send(bid, to)

test/storage-luatest/storage_1_test.lua

Lines changed: 19 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -236,47 +236,49 @@ test_group.test_ref_with_buckets_basic = function(g)
236236

237237
-- No buckets.
238238
res, err = ivshard.storage._call(
239-
'storage_ref_with_buckets', rid, iwait_timeout, {})
239+
'storage_ref_make_with_buckets', rid, iwait_timeout, {})
240240
ilt.assert_equals(err, nil)
241241
ilt.assert_equals(res, {moved = {}})
242242
ilt.assert_equals(lref.count, 0)
243243

244244
-- Check for a single ok bucket.
245245
res, err = ivshard.storage._call(
246-
'storage_ref_with_buckets', rid, iwait_timeout, {bids[1]})
246+
'storage_ref_make_with_buckets', rid, iwait_timeout, {bids[1]})
247247
ilt.assert_equals(err, nil)
248-
ilt.assert_equals(res, {rid = rid, moved = {}})
248+
ilt.assert_equals(res, {is_done = true, moved = {}})
249249
ilt.assert_equals(lref.count, 1)
250250
_, err = ivshard.storage._call('storage_unref', rid)
251251
ilt.assert_equals(err, nil)
252252
ilt.assert_equals(lref.count, 0)
253253

254254
-- Check for multiple ok buckets.
255255
res, err = ivshard.storage._call(
256-
'storage_ref_with_buckets', rid, iwait_timeout, {bids[1], bids[2]})
256+
'storage_ref_make_with_buckets', rid, iwait_timeout,
257+
{bids[1], bids[2]})
257258
ilt.assert_equals(err, nil)
258-
ilt.assert_equals(res, {rid = rid, moved = {}})
259+
ilt.assert_equals(res, {is_done = true, moved = {}})
259260
_, err = ivshard.storage._call('storage_unref', rid)
260261
ilt.assert_equals(err, nil)
261262

262263
-- Check for double referencing.
263264
res, err = ivshard.storage._call(
264-
'storage_ref_with_buckets', rid, iwait_timeout, {bids[1], bids[1]})
265+
'storage_ref_make_with_buckets', rid, iwait_timeout,
266+
{bids[1], bids[1]})
265267
ilt.assert_equals(err, nil)
266-
ilt.assert_equals(res, {rid = rid, moved = {}})
268+
ilt.assert_equals(res, {is_done = true, moved = {}})
267269
ilt.assert_equals(lref.count, 1)
268270
_, err = ivshard.storage._call('storage_unref', rid)
269271
ilt.assert_equals(err, nil)
270272
ilt.assert_equals(lref.count, 0)
271273

272274
-- Bucket mix.
273275
res, err = ivshard.storage._call(
274-
'storage_ref_with_buckets', rid, iwait_timeout,
276+
'storage_ref_make_with_buckets', rid, iwait_timeout,
275277
{bucket_count + 1, bids[1], bucket_count + 2, bids[2],
276278
bucket_count + 3})
277279
ilt.assert_equals(err, nil)
278280
ilt.assert_equals(res, {
279-
rid = rid,
281+
is_done = true,
280282
moved = {
281283
{id = bucket_count + 1},
282284
{id = bucket_count + 2},
@@ -288,7 +290,7 @@ test_group.test_ref_with_buckets_basic = function(g)
288290

289291
-- No ref when all buckets are missing.
290292
res, err = ivshard.storage._call(
291-
'storage_ref_with_buckets',
293+
'storage_ref_make_with_buckets',
292294
rid,
293295
iwait_timeout,
294296
{bucket_count + 1, bucket_count + 2}
@@ -315,7 +317,7 @@ test_group.test_ref_with_buckets_timeout = function(g)
315317
box.space._bucket:update(
316318
{bids[1]}, {{'=', 2, ivconst.BUCKET.SENDING}})
317319
local res, err = ivshard.storage._call(
318-
'storage_ref_with_buckets', rid, 0.01, {bids[2]})
320+
'storage_ref_make_with_buckets', rid, 0.01, {bids[2]})
319321
box.space._bucket:update(
320322
{bids[1]}, {{'=', 2, ivconst.BUCKET.ACTIVE}})
321323
t.assert_str_contains(err.message, 'Timeout exceeded')
@@ -341,7 +343,7 @@ test_group.test_ref_with_buckets_return_last_known_dst = function(g)
341343
box.space._bucket:update(
342344
{bid}, {{'=', 2, ivconst.BUCKET.SENT}})
343345
local res, err = ivshard.storage._call(
344-
'storage_ref_with_buckets', rid, iwait_timeout, {bid})
346+
'storage_ref_make_with_buckets', rid, iwait_timeout, {bid})
345347
ilt.assert_equals(err, nil)
346348
ilt.assert_equals(res, {moved = {{
347349
id = bid,
@@ -378,7 +380,7 @@ test_group.test_ref_with_buckets_move_part_while_referencing = function(g)
378380
local session_id
379381
local f = ifiber.new(function()
380382
session_id = box.session.id()
381-
return ivshard.storage._call('storage_ref_with_buckets', rid,
383+
return ivshard.storage._call('storage_ref_make_with_buckets', rid,
382384
iwait_timeout, {bids[1], bids[2]})
383385
end)
384386
f:set_joinable(true)
@@ -395,7 +397,7 @@ test_group.test_ref_with_buckets_move_part_while_referencing = function(g)
395397
t.assert_equals(err, nil)
396398
ilt.assert_equals(res, {
397399
moved = {{id = bids[2], dst = id}},
398-
rid = rid,
400+
is_done = true,
399401
})
400402
-- Ref was done, because at least one bucket was ok.
401403
ilt.assert_equals(lref.count, 1)
@@ -428,7 +430,7 @@ test_group.test_ref_with_buckets_move_all_while_referencing = function(g)
428430
{bids[3]}, {{'=', 2, ivconst.BUCKET.SENDING}, {'=', 3, id}})
429431
-- Start referencing.
430432
local f = ifiber.new(function()
431-
return ivshard.storage._call('storage_ref_with_buckets', rid,
433+
return ivshard.storage._call('storage_ref_make_with_buckets', rid,
432434
iwait_timeout, {bids[1], bids[2]})
433435
end)
434436
f:set_joinable(true)
@@ -536,10 +538,8 @@ test_group.test_moved_buckets_various_statuses = function(g)
536538
{{'=', 2, ivconst.BUCKET.GARBAGE}})
537539
_bucket:delete({bid_404})
538540

539-
local res, err = ivshard.storage._call('moved_buckets', bids)
540-
ilt.assert_equals(err, nil)
541-
ilt.assert(res and res.moved)
542-
ilt.assert_items_equals(res.moved, {
541+
local moved = ivshard.storage.internal.bucket_get_moved(bids)
542+
ilt.assert_items_equals(moved, {
543543
{
544544
id = bid_sent,
545545
dst = id_sent,

test/upgrade/upgrade.result

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -174,13 +174,13 @@ vshard.storage._call('test_api', 1, 2, 3)
174174
| - - bucket_recv
175175
| - bucket_test_gc
176176
| - info
177-
| - moved_buckets
178177
| - rebalancer_apply_routes
179178
| - rebalancer_request_state
180179
| - recovery_bucket_stat
181180
| - storage_map
182181
| - storage_ref
183-
| - storage_ref_with_buckets
182+
| - storage_ref_check_with_buckets
183+
| - storage_ref_make_with_buckets
184184
| - storage_unref
185185
| - test_api
186186
| - 1

vshard/router/init.lua

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -919,9 +919,11 @@ local function router_ref_storage_by_buckets(router, bucket_ids, timeout)
919919
if replicasets_to_map[id] then
920920
-- Replicaset is already referenced on a previous iteration.
921921
-- Simply get the moved buckets without double referencing.
922-
args_ref = {'moved_buckets', buckets}
922+
args_ref = {
923+
'storage_ref_check_with_buckets', rid, buckets}
923924
else
924-
args_ref = {'storage_ref_with_buckets', rid, timeout, buckets}
925+
args_ref = {
926+
'storage_ref_make_with_buckets', rid, timeout, buckets}
925927
end
926928
res, err = replicasets_all[id]:callrw('vshard.storage._call',
927929
args_ref, opts_async)
@@ -941,7 +943,7 @@ local function router_ref_storage_by_buckets(router, bucket_ids, timeout)
941943
err_id = id
942944
goto fail
943945
end
944-
-- Ref returns nil,err or {rid, moved}.
946+
-- Ref returns nil,err or {is_done, moved}.
945947
res, err = res[1], res[2]
946948
if res == nil then
947949
err_id = id
@@ -959,7 +961,7 @@ local function router_ref_storage_by_buckets(router, bucket_ids, timeout)
959961
end
960962
table.insert(bucket_ids, bid)
961963
end
962-
if res.rid then
964+
if res.is_done then
963965
assert(not replicasets_to_map[id])
964966
-- If there are no buckets on the replicaset, it would not be
965967
-- referenced.

vshard/storage/init.lua

Lines changed: 31 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -3184,9 +3184,9 @@ end
31843184
-- Lookup buckets which are definitely not going to recover into ACTIVE state
31853185
-- under any circumstances.
31863186
--
3187-
local function storage_moved_buckets(bucket_ids)
3187+
local function bucket_get_moved(bucket_ids)
31883188
local allstatus = consts.BUCKET
3189-
local moved_buckets = {}
3189+
local res = {}
31903190
for _, bucket_id in pairs(bucket_ids) do
31913191
local bucket = box.space._bucket:get{bucket_id}
31923192
local is_moved
@@ -3197,14 +3197,14 @@ local function storage_moved_buckets(bucket_ids)
31973197
is_moved = status == allstatus.GARBAGE or status == allstatus.SENT
31983198
end
31993199
if is_moved then
3200-
table.insert(moved_buckets, {
3200+
table.insert(res, {
32013201
id = bucket_id,
32023202
dst = bucket and bucket.destination or M.route_map[bucket_id],
32033203
status = bucket and bucket.status,
32043204
})
32053205
end
32063206
end
3207-
return { moved = moved_buckets }
3207+
return res
32083208
end
32093209

32103210
--
@@ -3228,12 +3228,12 @@ end
32283228
-- are absent, the reference is not created and a nil reference id
32293229
-- with the list of absent buckets is returned.
32303230
--
3231-
local function storage_ref_with_buckets(rid, timeout, bucket_ids)
3232-
local moved = storage_moved_buckets(bucket_ids).moved
3231+
local function storage_ref_make_with_buckets(rid, timeout, bucket_ids)
3232+
local moved = bucket_get_moved(bucket_ids)
32333233
if #moved == #bucket_ids then
32343234
-- If all the passed buckets are absent, there is no need to create a
32353235
-- ref.
3236-
return {rid = nil, moved = moved}
3236+
return {moved = moved}
32373237
end
32383238
local bucket_generation = M.bucket_generation
32393239
local ok, err = storage_ref(rid, timeout)
@@ -3243,13 +3243,31 @@ local function storage_ref_with_buckets(rid, timeout, bucket_ids)
32433243
if M.bucket_generation ~= bucket_generation then
32443244
-- Need to redo it. Otherwise there is a risk that some buckets were
32453245
-- moved while waiting for the ref.
3246-
moved = storage_moved_buckets(bucket_ids).moved
3246+
moved = bucket_get_moved(bucket_ids)
32473247
if #moved == #bucket_ids then
32483248
storage_unref(rid)
3249-
rid = nil
3249+
return {moved = moved}
32503250
end
32513251
end
3252-
return {rid = rid, moved = moved}
3252+
return {is_done = true, moved = moved}
3253+
end
3254+
3255+
--
3256+
-- Check which buckets from the given list are moved out of this storage, while
3257+
-- also making sure that the storage-ref is still in place.
3258+
--
3259+
-- Partial Map-Reduce makes a ref on the storages having any of the needed
3260+
-- buckets, but then can come back if other storages report the already visited
3261+
-- ones as having the needed buckets. Only makes sense to check, if this storage
3262+
-- still holds the ref. Otherwise its previous guarantees given during the ref
3263+
-- creation are all gone.
3264+
--
3265+
local function storage_ref_check_with_buckets(rid, bucket_ids)
3266+
local ok, err = lref.check(rid, box.session.id())
3267+
if not ok then
3268+
return nil, err
3269+
end
3270+
return {moved = bucket_get_moved(bucket_ids)}
32533271
end
32543272

32553273
--
@@ -3298,9 +3316,9 @@ service_call_api = setmetatable({
32983316
rebalancer_apply_routes = rebalancer_apply_routes,
32993317
rebalancer_request_state = rebalancer_request_state,
33003318
recovery_bucket_stat = recovery_bucket_stat,
3301-
moved_buckets = storage_moved_buckets,
33023319
storage_ref = storage_ref,
3303-
storage_ref_with_buckets = storage_ref_with_buckets,
3320+
storage_ref_make_with_buckets = storage_ref_make_with_buckets,
3321+
storage_ref_check_with_buckets = storage_ref_check_with_buckets,
33043322
storage_unref = storage_unref,
33053323
storage_map = storage_map,
33063324
info = storage_service_info,
@@ -4182,6 +4200,7 @@ M.bucket_state_edges = bucket_state_edges
41824200

41834201
M.bucket_are_all_rw = bucket_are_all_rw_public
41844202
M.bucket_generation_wait = bucket_generation_wait
4203+
M.bucket_get_moved = bucket_get_moved
41854204
lregistry.storage = M
41864205

41874206
--

0 commit comments

Comments
 (0)