From b689df2b0f49bad2e75b7a8425d4700e41b4ec18 Mon Sep 17 00:00:00 2001 From: Vladislav Shpilevoy Date: Mon, 30 Oct 2023 23:17:16 +0100 Subject: [PATCH] storage: fix send/recv unprotected when 2 at once If more than one bucket_send/recv would try to start on the same bucket on the same storage, it could lead to the bucket recovery or GC when there would be no need for it. Data couldn't be lost, and it wouldn't occur during automatic rebalancing, but manual usage of those functions could fail when it shouldn't have. Closes #434 --- test/storage-luatest/storage_1_1_test.lua | 84 +++++++++++++++++++++++ vshard/storage/init.lua | 36 ++++++++-- 2 files changed, 113 insertions(+), 7 deletions(-) diff --git a/test/storage-luatest/storage_1_1_test.lua b/test/storage-luatest/storage_1_1_test.lua index 740e635c..acda2e78 100644 --- a/test/storage-luatest/storage_1_1_test.lua +++ b/test/storage-luatest/storage_1_1_test.lua @@ -261,3 +261,87 @@ test_group.test_on_bucket_event = function(g) box.space.data2:drop() end) end + +-- +-- gh-434: bucket_send() shouldn't change the transfer flags if a transfer for +-- the same bucket is already in progress. +-- +test_group.test_bucket_double_send = function(g) + g.replica_2_a:exec(function() + ivshard.storage.internal.errinj.ERRINJ_LAST_RECEIVE_DELAY = true + end) + local bid = g.replica_1_a:exec(function(uuid) + local transfer_flags = + ivshard.storage.internal.rebalancer_transfering_buckets + local bid = _G.get_first_bucket() + local f = ifiber.create(ivshard.storage.bucket_send, bid, uuid, + {timeout = 100000}) + f:set_joinable(true) + rawset(_G, 'test_f', f) + ilt.assert(transfer_flags[bid]) + local ok, err = ivshard.storage.bucket_send(bid, uuid) + ilt.assert_equals(err.code, iverror.code.WRONG_BUCKET) + ilt.assert(not ok) + -- Before the bug was fixed, the second send would clear the flag, thus + -- leaving the first sending unprotected. + ilt.assert(transfer_flags[bid]) + return bid + end, {g.replica_2_a:replicaset_uuid()}) + g.replica_2_a:exec(function() + ivshard.storage.internal.errinj.ERRINJ_LAST_RECEIVE_DELAY = false + end) + g.replica_1_a:exec(function(bid) + local transfer_flags = + ivshard.storage.internal.rebalancer_transfering_buckets + local f = _G.test_f + _G.test_f = nil + local f_ok, ok, err = f:join(iwait_timeout) + ilt.assert_equals(err, nil) + ilt.assert(f_ok) + ilt.assert(ok) + ilt.assert(not transfer_flags[bid]) + _G.bucket_gc_wait() + end, {bid}) + -- + -- Cleanup. + -- + g.replica_2_a:exec(function(bid, uuid) + local ok, err = ivshard.storage.bucket_send(bid, uuid, + {timeout = iwait_timeout}) + ilt.assert_equals(err, nil) + ilt.assert(ok) + _G.bucket_gc_wait() + end, {bid, g.replica_1_a:replicaset_uuid()}) +end + +-- +-- gh-434: bucket_recv() shouldn't change the transfer flags if a transfer for +-- the same bucket is already in progress. +-- +test_group.test_bucket_double_recv = function(g) + g.replica_2_a:exec(function(bid, uuid) + local transfer_flags = + ivshard.storage.internal.rebalancer_transfering_buckets + local f = ifiber.create(ivshard.storage.bucket_recv, bid, uuid, {}) + f:set_joinable(true) + ilt.assert(transfer_flags[bid]) + local ok, err = ivshard.storage.bucket_recv(bid, uuid, {}) + -- Before the bug was fixed, the second recv would clear the flag, thus + -- leaving the first recv unprotected. + ilt.assert(transfer_flags[bid]) + ilt.assert_equals(err.code, iverror.code.WRONG_BUCKET) + ilt.assert(not ok) + local f_ok + f_ok, ok, err = f:join(iwait_timeout) + ilt.assert(not transfer_flags[bid]) + ilt.assert_equals(err, nil) + ilt.assert(f_ok) + ilt.assert(ok) + -- + -- Cleanup. + -- + _G.bucket_recovery_wait() + ilt.assert_equals(box.space._bucket:get{bid}, nil) + end, {vtest.storage_first_bucket(g.replica_1_a), + g.replica_1_a:replicaset_uuid()}) +end diff --git a/vshard/storage/init.lua b/vshard/storage/init.lua index 2213dc55..fffe6a12 100644 --- a/vshard/storage/init.lua +++ b/vshard/storage/init.lua @@ -328,6 +328,20 @@ local function bucket_generation_increment() M.bucket_generation_cond:broadcast() end +local function bucket_transfer_start(bid) + if M.rebalancer_transfering_buckets[bid] then + return nil, lerror.vshard(lerror.code.WRONG_BUCKET, bid, + 'transfer is already in progress', nil) + end + M.rebalancer_transfering_buckets[bid] = true + return true +end + +local function bucket_transfer_end(bid) + assert(M.rebalancer_transfering_buckets[bid]) + M.rebalancer_transfering_buckets[bid] = nil +end + -- -- Handle a bad update of _bucket space. -- @@ -1763,9 +1777,13 @@ local function bucket_recv(bucket_id, from, data, opts) while opts and opts.is_last and M.errinj.ERRINJ_LAST_RECEIVE_DELAY do lfiber.sleep(0.01) end - M.rebalancer_transfering_buckets[bucket_id] = true - local status, ret, err = pcall(bucket_recv_xc, bucket_id, from, data, opts) - M.rebalancer_transfering_buckets[bucket_id] = nil + local status, ret, err + status, err = bucket_transfer_start(bucket_id) + if not status then + return nil, err + end + status, ret, err = pcall(bucket_recv_xc, bucket_id, from, data, opts) + bucket_transfer_end(bucket_id) if status then if ret then return ret @@ -2155,14 +2173,18 @@ local function bucket_send(bucket_id, destination, opts) if type(bucket_id) ~= 'number' or type(destination) ~= 'string' then error('Usage: bucket_send(bucket_id, destination)') end - M.rebalancer_transfering_buckets[bucket_id] = true + local ret + local status, err = bucket_transfer_start(bucket_id) + if not status then + return nil, err + end local exception_guard = {} - local status, ret, err = pcall(bucket_send_xc, bucket_id, destination, opts, - exception_guard) + status, ret, err = pcall(bucket_send_xc, bucket_id, destination, opts, + exception_guard) if exception_guard.drop_rw_lock then exception_guard.ref.rw_lock = false end - M.rebalancer_transfering_buckets[bucket_id] = nil + bucket_transfer_end(bucket_id) if status then if ret then return ret