From 5cee8a1dcf7f6a337467a7dd1336322ed44a3b6e Mon Sep 17 00:00:00 2001 From: Nikita Zheleztsov Date: Thu, 15 Aug 2024 14:48:52 +0300 Subject: [PATCH 1/3] router: introduce request_timeout for read-only calls New option for call, callro, callre, callbro, callbre: `request_timeout`. Note, that for callrw (and call with 'write' mode) this option is NoOp. The option allows to specify the maximum number of seconds single request in call takes. By default it's equal to the `timeout` option. Must be always <= timeout. Router's read-only requests retry, when recoverable error happens (e.g. WRONG_BUCKET or timeout). By default in case of timeout error vshard's call will exit with error, as request_timeout = timeout. However, if request_timeout < timeout, then several requests will be done in scope of one call. These requests are balanced between replicas, we won't go to the not responding one in the scope of the single request. Balancing is introduced in the following commit. May be useful for mission critical systems, where the number of failed requests must be minimized. Part of tarantool/vshard#484 NO_DOC= --- test/instances/router.lua | 1 + test/router-luatest/router_test.lua | 63 +++++++++++++++++++++++++++++ vshard/replicaset.lua | 23 +++++++++-- vshard/router/init.lua | 8 +++- 4 files changed, 90 insertions(+), 5 deletions(-) diff --git a/test/instances/router.lua b/test/instances/router.lua index f373f21e..a9505e66 100755 --- a/test/instances/router.lua +++ b/test/instances/router.lua @@ -10,6 +10,7 @@ _G.ifiber = require('fiber') _G.ilt = require('luatest') _G.imsgpack = require('msgpack') _G.ivtest = require('test.luatest_helpers.vtest') +_G.ivconst = require('vshard.consts') _G.iwait_timeout = _G.ivtest.wait_timeout -- Do not load entire vshard into the global namespace to catch errors when code diff --git a/test/router-luatest/router_test.lua b/test/router-luatest/router_test.lua index a29a495a..04399179 100644 --- a/test/router-luatest/router_test.lua +++ b/test/router-luatest/router_test.lua @@ -798,3 +798,66 @@ g.test_master_discovery_on_disconnect = function(g) vtest.router_cfg(g.router, global_cfg) vtest.cluster_cfg(g, global_cfg) end + +g.test_request_timeout = function(g) + local bid = vtest.storage_first_bucket(g.replica_1_a) + -- + -- Test request_timeout API. + -- + g.router:exec(function(bid) + local opts = {} + -- request_timeout must be <= timeout. + local err_msg = 'request_timeout must be <= timeout' + opts.request_timeout = ivconst.CALL_TIMEOUT_MIN * 2 + t.assert_error_msg_contains(err_msg, function() + ivshard.router.callro(bid, 'echo', {1}, opts) + end) + -- request_timeout must be a number. + err_msg = 'Usage: call' + opts.request_timeout = 'string' + t.assert_error_msg_contains(err_msg, function() + ivshard.router.callro(bid, 'echo', {1}, opts) + end) + -- request_timeout <= 0 leads to the TimeOut error. + err_msg = 'Timeout' + for _, timeout in ipairs({-1, 0}) do + opts.request_timeout = timeout + local ok, err = ivshard.router.callro(bid, 'echo', {1}, opts) + t.assert_not(ok) + t.assert_str_contains(err.message, err_msg) + end + end, {bid}) + + -- + -- Test, that router makes the desired number of calls, when + -- request_timeout is not a divisor of the timeout. + -- + g.replica_1_a:exec(function() + rawset(_G, 'sleep_num', 0) + rawset(_G, 'sleep_cond', ifiber.cond()) + rawset(_G, 'old_get_uuid', _G.get_uuid) + _G.get_uuid = function() + _G.sleep_num = _G.sleep_num + 1 + _G.sleep_cond:wait() + end + end) + g.router:exec(function(bid) + local ok, err = ivshard.router.callro(bid, 'get_uuid', {}, { + request_timeout = 1, + timeout = 1.5, + }) + t.assert_not(ok) + t.assert_str_contains(err.message, 'timed out') + end, {bid}) + + -- Timeout errors are retried. Master's priority is higher, when weights + -- are equal, currently they're not specified at all. + t.assert_equals(g.replica_1_a:eval('return _G.sleep_num'), 2) + g.replica_1_a:exec(function() + _G.sleep_cond:broadcast() + _G.get_uuid = _G.old_get_uuid + _G.old_get_uuid = nil + _G.sleep_cond = nil + _G.sleep_num = nil + end) +end diff --git a/vshard/replicaset.lua b/vshard/replicaset.lua index 298ad2a5..e5ad28c5 100644 --- a/vshard/replicaset.lua +++ b/vshard/replicaset.lua @@ -710,7 +710,7 @@ local function can_retry_after_error(e) e.code == lerror.code.TRANSFER_IS_IN_PROGRESS) then return true end - return e.type == 'ClientError' and e.code == box.error.TIMEOUT + return lerror.is_timeout(e) end -- @@ -813,8 +813,11 @@ local function replicaset_template_multicallro(prefer_replica, balance) assert(opts == nil or type(opts) == 'table') opts = opts and table.copy(opts) or {} local timeout = opts.timeout or consts.CALL_TIMEOUT_MAX + local request_timeout = opts.request_timeout or timeout + assert(request_timeout <= timeout) + opts.request_timeout = nil local net_status, storage_status, retval, err, replica - if timeout <= 0 then + if timeout <= 0 or request_timeout <= 0 then return nil, lerror.timeout() end local now = fiber_clock() @@ -833,11 +836,25 @@ local function replicaset_template_multicallro(prefer_replica, balance) replica.backoff_err) end end - opts.timeout = timeout + opts.timeout = request_timeout net_status, storage_status, retval, err = replica_call(replica, func, args, opts) now = fiber_clock() timeout = end_time - now + if now + request_timeout > end_time then + -- The `timeout` option sets a strict limit for the entire + -- operation, which must not be exceeded. To ensure this, we + -- make as much requests as we can with the speciified + -- `request_timeout`. However, the last request will use all + -- the remaining time left within the overall `timeout`, + -- rather than the `request_timeout` value. + -- + -- For example, if `request_timeout` is 1 second and `timeout` + -- is 2.5 seconds, the first two requests to the replicaset + -- will use a 1-second timeout, but the last request will + -- have only 0.5 seconds remaining. + request_timeout = timeout + end if not net_status and not storage_status and not can_retry_after_error(retval) then if can_backoff_after_error(retval, func) then diff --git a/vshard/router/init.lua b/vshard/router/init.lua index 05a08422..6ac80b79 100644 --- a/vshard/router/init.lua +++ b/vshard/router/init.lua @@ -585,8 +585,9 @@ local function router_call_impl(router, bucket_id, mode, prefer_replica, balance, func, args, opts) local do_return_raw if opts then - if type(opts) ~= 'table' or - (opts.timeout and type(opts.timeout) ~= 'number') then + if type(opts) ~= 'table' or (opts.timeout and + type(opts.timeout) ~= 'number') or (opts.request_timeout and + type(opts.request_timeout) ~= 'number') then error('Usage: call(bucket_id, mode, func, args, opts)') end opts = table.copy(opts) @@ -596,6 +597,9 @@ local function router_call_impl(router, bucket_id, mode, prefer_replica, do_return_raw = false end local timeout = opts.timeout or consts.CALL_TIMEOUT_MIN + if opts.request_timeout and opts.request_timeout > timeout then + error('request_timeout must be <= timeout') + end local replicaset, err local tend = fiber_clock() + timeout if bucket_id > router.total_bucket_count or bucket_id <= 0 then From b39bd6f31da79f48501e5cd5bc23a8e11be3c947 Mon Sep 17 00:00:00 2001 From: Nikita Zheleztsov Date: Thu, 15 Aug 2024 21:08:19 +0300 Subject: [PATCH 2/3] replicaset: introduce stateless balancing for read-only requests This commit introduces balancing for requests, where balance mode is not set. The motivation for this change is that we should make our best to succeed with call, which is important for mission critical systems. When balance mode is set, replicas are balanced between requests, consequent calls won't go to the same replica. Balancing is done according to the round-robin strategy, weights doesn't affect such balancing. However, when balance is false, callro and callre are still balanced now. But this balancing happens only in the scope of the call, consequent calls go to the same, most prioritized replica firstly. Retry happens, when network error happens (e.g. timeout). During such balancing callro method doesn't distrinuish master and replica. If master has the highest priority according to config, then callro will go to master firstly, and only after that to replica. callre method, on the other hand, will firstly try to access all replicas, if all requests fail, then we fallback to master. Closes tarantool/vshard#484 NO_DOC= --- test/replicaset-luatest/replicaset_3_test.lua | 135 ++++++++++++++++++ test/router-luatest/router_test.lua | 14 +- vshard/replicaset.lua | 111 ++++++++------ 3 files changed, 207 insertions(+), 53 deletions(-) diff --git a/test/replicaset-luatest/replicaset_3_test.lua b/test/replicaset-luatest/replicaset_3_test.lua index 253327e7..bbc5ca48 100644 --- a/test/replicaset-luatest/replicaset_3_test.lua +++ b/test/replicaset-luatest/replicaset_3_test.lua @@ -366,3 +366,138 @@ test_group.test_ipv6_uri = function(g) local replica_string = 'replica_1_a(storage@[::1]:3301)' t.assert_equals(tostring(rs.master), replica_string) end + +local function hang_get_uuid(instance) + instance:exec(function() + rawset(_G, 'sleep_num', 0) + rawset(_G, 'sleep_cond', ifiber.cond()) + rawset(_G, 'old_get_uuid', _G.get_uuid) + _G.get_uuid = function() + _G.sleep_num = _G.sleep_num + 1 + _G.sleep_cond:wait() + end + end) +end + +local function reset_sleep(instance) + instance:exec(function() + _G.sleep_cond:broadcast() + _G.sleep_num = 0 + end) +end + +local function reset_get_uuid(instance) + instance:exec(function() + _G.sleep_cond:broadcast() + _G.get_uuid = _G.old_get_uuid + _G.old_get_uuid = nil + _G.sleep_cond = nil + _G.sleep_num = nil + end) +end + +local function prepare_stateless_balancing_rs() + local new_cfg_template = table.deepcopy(cfg_template) + -- replica_1_b > replica_1_a (master) > replica_1_c + new_cfg_template.sharding[1].replicas.replica_1_b.zone = 2 + new_cfg_template.sharding[1].replicas.replica_1_a.zone = 3 + new_cfg_template.sharding[1].replicas.replica_1_c.zone = 4 + new_cfg_template.zone = 1 + new_cfg_template.weights = { + [1] = { + [2] = 1, + [3] = 2, + [4] = 3, + } + } + + local new_global_cfg = vtest.config_new(new_cfg_template) + local _, rs = next(vreplicaset.buildall(new_global_cfg)) + rs:wait_connected_all(timeout_opts) + return rs +end + +test_group.test_stateless_balancing_callro = function(g) + local rs = prepare_stateless_balancing_rs() + -- replica_1_b is the prioritized one. + local uuid = rs:callro('get_uuid', {}, timeout_opts) + t.assert_equals(uuid, g.replica_1_b:instance_uuid()) + + -- + -- callro fallback to lowest prioritized replica , if request to other + -- instances fail. + -- + hang_get_uuid(g.replica_1_a) + hang_get_uuid(g.replica_1_b) + local request_timeout_opts = {request_timeout = 1, timeout = 3} + uuid = rs:callro('get_uuid', {}, request_timeout_opts) + t.assert_equals(uuid, g.replica_1_c:instance_uuid()) + t.assert_equals(g.replica_1_a:eval('return _G.sleep_num'), 1) + t.assert_equals(g.replica_1_b:eval('return _G.sleep_num'), 1) + reset_sleep(g.replica_1_a) + reset_sleep(g.replica_1_b) + + -- + -- If all instances are unresponsive, there's nothing we can do. + -- Test, that when timeout > request_timeout * #rs, then we make + -- several requests to some replicas. + -- + local err + hang_get_uuid(g.replica_1_c) + request_timeout_opts.timeout = 3.5 + uuid, err = rs:callro('get_uuid', {}, request_timeout_opts) + t.assert_equals(uuid, nil) + -- Either 'timed out' or 'Timeout exceeded' message. Depends on version. + t.assert(verror.is_timeout(err)) + t.assert_equals(g.replica_1_b:eval('return _G.sleep_num'), 2) + t.assert_equals(g.replica_1_c:eval('return _G.sleep_num'), 1) + t.assert_equals(g.replica_1_a:eval('return _G.sleep_num'), 1) + + reset_get_uuid(g.replica_1_a) + reset_get_uuid(g.replica_1_b) + reset_get_uuid(g.replica_1_c) +end + +test_group.test_stateless_balancing_callre = function(g) + local rs = prepare_stateless_balancing_rs() + -- replica_1_b is the prioritized one. + local uuid = rs:callre('get_uuid', {}, timeout_opts) + t.assert_equals(uuid, g.replica_1_b:instance_uuid()) + + -- + -- callre fallback to another replica, if request fail. + -- + hang_get_uuid(g.replica_1_b) + local request_timeout_opts = {request_timeout = 1, timeout = 3} + uuid = rs:callre('get_uuid', {}, request_timeout_opts) + t.assert_equals(uuid, g.replica_1_c:instance_uuid()) + t.assert_equals(g.replica_1_b:eval('return _G.sleep_num'), 1) + reset_sleep(g.replica_1_b) + + -- + -- If all replicas are unresponsive, fallback to the master. + -- + hang_get_uuid(g.replica_1_c) + uuid = rs:callre('get_uuid', {}, request_timeout_opts) + t.assert_equals(uuid, g.replica_1_a:instance_uuid()) + t.assert_equals(g.replica_1_b:eval('return _G.sleep_num'), 1) + t.assert_equals(g.replica_1_c:eval('return _G.sleep_num'), 1) + reset_sleep(g.replica_1_b) + reset_sleep(g.replica_1_c) + + -- + -- If it's not enough time to make requests to all replicas and then + -- to master, we won't fallback to master and will fail earlier. + -- + local err + request_timeout_opts.timeout = 1.5 + uuid, err = rs:callre('get_uuid', {}, request_timeout_opts) + t.assert_equals(uuid, nil) + -- Either 'timed out' or 'Timeout exceeded' message. Depends on version. + t.assert(verror.is_timeout(err)) + t.assert_equals(g.replica_1_b:eval('return _G.sleep_num'), 1) + t.assert_equals(g.replica_1_c:eval('return _G.sleep_num'), 1) + + reset_get_uuid(g.replica_1_b) + reset_get_uuid(g.replica_1_c) +end diff --git a/test/router-luatest/router_test.lua b/test/router-luatest/router_test.lua index 04399179..776c1337 100644 --- a/test/router-luatest/router_test.lua +++ b/test/router-luatest/router_test.lua @@ -841,18 +841,18 @@ g.test_request_timeout = function(g) _G.sleep_cond:wait() end end) - g.router:exec(function(bid) - local ok, err = ivshard.router.callro(bid, 'get_uuid', {}, { + g.router:exec(function(bid, uuid) + local res = ivshard.router.callro(bid, 'get_uuid', {}, { request_timeout = 1, timeout = 1.5, }) - t.assert_not(ok) - t.assert_str_contains(err.message, 'timed out') - end, {bid}) + t.assert_equals(res, uuid) + end, {bid, g.replica_1_b:instance_uuid()}) -- Timeout errors are retried. Master's priority is higher, when weights - -- are equal, currently they're not specified at all. - t.assert_equals(g.replica_1_a:eval('return _G.sleep_num'), 2) + -- are equal, currently they're not specified at all. Request to master + -- fails and we fallback to the replica. + t.assert_equals(g.replica_1_a:eval('return _G.sleep_num'), 1) g.replica_1_a:exec(function() _G.sleep_cond:broadcast() _G.get_uuid = _G.old_get_uuid diff --git a/vshard/replicaset.lua b/vshard/replicaset.lua index e5ad28c5..d38ef49a 100644 --- a/vshard/replicaset.lua +++ b/vshard/replicaset.lua @@ -749,19 +749,6 @@ local function can_backoff_after_error(e, func) return false end --- --- Pick a next replica according to round-robin load balancing --- policy. --- -local function replicaset_balance_replica(replicaset) - local i = replicaset.balance_i - local pl = replicaset.priority_list - local size = #pl - replicaset.balance_i = i % size + 1 - assert(i <= size) - return pl[i] -end - -- -- Template to implement a function able to visit multiple -- replicas with certain details. One of applications - a function @@ -771,42 +758,58 @@ end -- until failover fiber will repair the nearest connection. -- local function replicaset_template_multicallro(prefer_replica, balance) - local function pick_next_replica(replicaset, now) - local r - local master = replicaset.master + -- + -- Stateful balancing (balance = true) changes the current + -- replicaset.balance_i, consequently balancing is preserved between + -- requests, sequential requests will go to different replicas. + -- + -- Stateless balancing (balance = false) is local to the current call, it + -- doesn't change priority list, every new request is firstly made to the + -- most preferred replica. If request fails with TimeOut error, it's + -- retried with the next replica by priority in scope of the same call. + -- + local function replicaset_balance_replica(replicaset, state) + local i + local pl = replicaset.priority_list + local size = #pl if balance then - local i = #replicaset.priority_list - while i > 0 do - r = replicaset_balance_replica(replicaset) - i = i - 1 - if r:is_connected() and (not prefer_replica or r ~= master) and - replica_check_backoff(r, now) then - return r - end - end + i = replicaset.balance_i + replicaset.balance_i = i % size + 1 else - local start_r = replicaset.replica - r = start_r - while r do - if r:is_connected() and (not prefer_replica or r ~= master) and - replica_check_backoff(r, now) then - return r - end - r = r.next_by_priority - end - -- Iteration above could start not from the best prio replica. - -- Check the beginning of the list too. - for _, r in ipairs(replicaset.priority_list) do - if r == start_r then - -- Reached already checked part. - break - end - if r:is_connected() and (not prefer_replica or r ~= master) and - replica_check_backoff(r, now) then - return r - end + i = state.balance_i + state.balance_i = state.balance_i % size + 1 + end + assert(i <= size) + return pl[i] + end + + -- + -- Pick a next replica according to round-robin load balancing policy. + -- + local function pick_next_replica(replicaset, now, state) + local r + local master = replicaset.master + local i = #replicaset.priority_list + if prefer_replica and state.balance_checked_num >= i then + -- callre should fallback to master if it tried to access all + -- replicas and didn't succeed (connections are broken, replicas + -- are in backoff (e.g. due to insuficient privileges) or + -- requests failed with timed out error (request_timeout). + goto fallback_to_master + end + while i > 0 do + i = i - 1 + state.balance_checked_num = state.balance_checked_num + 1 + r = replicaset_balance_replica(replicaset, state) + if r:is_connected() and (not prefer_replica or r ~= master) and + replica_check_backoff(r, now) then + return r end end + +::fallback_to_master:: + state.balance_checked_num = 0 + return nil end return function(replicaset, func, args, opts) @@ -820,10 +823,26 @@ local function replicaset_template_multicallro(prefer_replica, balance) if timeout <= 0 or request_timeout <= 0 then return nil, lerror.timeout() end + local state = { + -- Number of replicas, we tried to access. See pick_next_replica() + -- for details. Used only when prefer_replica = true. + balance_checked_num = 0, + -- Stateless balancer index. See replicaset_balance_replica() for + -- details. Used only when balance = true. + balance_i = 1, + } + if not balance and replicaset.replica then + -- Initialize stateless balancer with replicaset.replica's index. + replica = replicaset.priority_list[state.balance_i] + while replica ~= replicaset.replica do + state.balance_i = state.balance_i + 1 + replica = replicaset.priority_list[state.balance_i] + end + end local now = fiber_clock() local end_time = now + timeout while not net_status and timeout > 0 do - replica = pick_next_replica(replicaset, now) + replica = pick_next_replica(replicaset, now, state) if not replica then replica, timeout = replicaset_wait_master(replicaset, timeout) if not replica then From b36e90990c999fa88eb0458d6eb10c6ac034d76e Mon Sep 17 00:00:00 2001 From: Nikita Zheleztsov Date: Fri, 16 Aug 2024 16:12:28 +0300 Subject: [PATCH 3/3] router: calls affect temporary prioritized replica Previously prioritized replica was changed only if it was disconnected for FAILOVER_DOWN_TIMEOUT seconds. However, if connection is shows as 'connected' it doesn't mean, that this connection actually works. The connection must be pingable in order to be operational. This commit makes failover temporary lower replica's priority if FAILOVER_DOWN_SEQUENTIAL_FAIL requests fail to it. All vshard internal requests (including failover ping) and all user calls affect the number of sequentially failed requests. Note, that we consider request failed, when net.box connection is not operational (cannot make conn.call, e.g. connection is not yet established or timeout is reached), user functions throwing errors won't affect prioritized replica. The behavior of failover is the following after this commit: 1. Failover pings all prioritized replicas. If ping doesn't succeed, the connection is recreated, which is needed, if user returns too big values from the functions, in such case no other request can be done until this value is returned. Failed ping affects the number of sequentially failed requests. 2. If connection is down for >= than FAILOVER_DOWN_TIMEOUT or if the number of sequentially failed requests is >= FAILOVER_DOWN_SEQUENTIAL_FAIL, than we take replica with lower priority as the main one. 3. If failover didn't try to use the more prioritized replica (according to weights) for more than FAILOVER_UP_TIMEOUT, then we try to set a new replica as the prioritized one. Note, that we don't set it, if ping to it didn't succeed during ping round in (1). Closes tarantool/vshard#483 NO_DOC=bugfix --- test/instances/router.lua | 1 + test/router-luatest/router_test.lua | 243 +++++++++++++++++++++++ test/router/exponential_timeout.result | 7 +- test/router/exponential_timeout.test.lua | 5 +- test/router/retry_reads.result | 9 +- test/router/retry_reads.test.lua | 7 +- test/router/router.result | 3 +- test/router/router_1.lua | 36 +++- vshard/consts.lua | 1 + vshard/replicaset.lua | 17 +- vshard/router/init.lua | 89 ++++++--- 11 files changed, 369 insertions(+), 49 deletions(-) mode change 120000 => 100755 test/router/router_1.lua diff --git a/test/instances/router.lua b/test/instances/router.lua index a9505e66..6614539e 100755 --- a/test/instances/router.lua +++ b/test/instances/router.lua @@ -11,6 +11,7 @@ _G.ilt = require('luatest') _G.imsgpack = require('msgpack') _G.ivtest = require('test.luatest_helpers.vtest') _G.ivconst = require('vshard.consts') +_G.iverror = require('vshard.error') _G.iwait_timeout = _G.ivtest.wait_timeout -- Do not load entire vshard into the global namespace to catch errors when code diff --git a/test/router-luatest/router_test.lua b/test/router-luatest/router_test.lua index 776c1337..b2e12951 100644 --- a/test/router-luatest/router_test.lua +++ b/test/router-luatest/router_test.lua @@ -1,6 +1,7 @@ local t = require('luatest') local vtest = require('test.luatest_helpers.vtest') local vutil = require('vshard.util') +local vconsts = require('vshard.consts') local g = t.group('router') local cfg_template = { @@ -861,3 +862,245 @@ g.test_request_timeout = function(g) _G.sleep_num = nil end) end + +local function prepare_affect_priority_rs(g) + local new_cfg_template = table.deepcopy(cfg_template) + new_cfg_template.sharding[1].replicas.replica_1_a.zone = 3 + new_cfg_template.sharding[1].replicas.replica_1_b.zone = 2 + new_cfg_template.zone = 1 + new_cfg_template.weights = { + [1] = { + [1] = 0, + [2] = 1, + [3] = 2, + }, + } + -- So that ping timeout is always > replica.net_timeout. + -- net_timeout starts with CALL_TIMEOUT_MIN and is mutiplied by 2 if number + -- of failed requests is >= 2. + new_cfg_template.failover_ping_timeout = vconsts.CALL_TIMEOUT_MIN * 4 + local new_cluster_cfg = vtest.config_new(new_cfg_template) + vtest.router_cfg(g.router, new_cluster_cfg) +end + +local function affect_priority_clear_net_timeout(g) + g.router:exec(function() + -- Reset net_timeout, so that it doesn't affect the test. This is + -- needed as we use the absolute minimum failover_ping_timeout for + -- FAILOVER_DOWN_SEQUENTIAL_FAIL = 3. 10 successful calls are needed + -- to restore it to CALL_TIMEOUT_MIN wthout reset. + local router = ivshard.router.internal.static_router + for _, rs in pairs(router.replicasets) do + for _, r in pairs(rs.replicas) do + r.net_timeout = ivconst.CALL_TIMEOUT_MIN + end + end + end) +end + +-- +-- gh-483: failover ping temporary lower replica's priority, when it cannot be +-- reached several times in a row: +-- +-- 1. replica_1_b is the prioritized one. replica_1_a is the second one. +-- 2. router establishes connection to all instances, failover sets prioritized +-- replica_1_b. +-- 3. Node breaks and stops to respond. +-- 4. Failover retries ping FAILOVER_DOWN_SEQUENTIAL_FAIL times and changes +-- prioritized replica to the lower one. Note, that connection is recreated +-- on every failed ping. +-- 5. Every FAILOVER_UP_TIMEOUT failover checks, if any replica with higher +-- priority can be reached and changes the prioritized replica if it's so. +-- +g.test_failover_ping_affects_priority = function() + prepare_affect_priority_rs(g) + + -- Find prioritized replica and disable failover for now. + g.router:exec(function(rs_uuid, replica_uuid) + local router = ivshard.router.internal.static_router + local rs = router.replicasets[rs_uuid] + local opts = {timeout = iwait_timeout} + rs:wait_connected_all(opts) + + t.helpers.retrying(opts, function() + router.failover_fiber:wakeup() + t.assert_equals(rs.replica.uuid, replica_uuid, + 'Prioritized replica have not been set yet') + end) + + local errinj = ivshard.router.internal.errinj + errinj.ERRINJ_FAILOVER_DELAY = true + t.helpers.retrying(opts, function() + router.failover_fiber:wakeup() + t.assert_equals(errinj.ERRINJ_FAILOVER_DELAY, 'in', + 'Failover have not been stopped yet') + end) + end, {g.replica_1_b:replicaset_uuid(), g.replica_1_b:instance_uuid()}) + + -- Break 'info' request on replica so that it fails with TimedOut error. + g.replica_1_b:exec(function() + rawset(_G, 'old_call', ivshard.storage._call) + ivshard.storage._call = function(service_name, ...) + if service_name == 'info' then + ifiber.sleep(ivconst.CALL_TIMEOUT_MIN * 5) + end + return _G.old_call(service_name, ...) + end + end) + + affect_priority_clear_net_timeout(g) + g.router:exec(function(rs_uuid, master_uuid) + local router = ivshard.router.internal.static_router + local rs = router.replicasets[rs_uuid] + + -- And we change the prioritized replica. + ivshard.router.internal.errinj.ERRINJ_FAILOVER_DELAY = false + t.helpers.retrying({timeout = iwait_timeout}, function() + router.failover_fiber:wakeup() + t.assert_equals(rs.replica.uuid, master_uuid) + end) + + -- Check, that prioritized replica is not changed, as it's still broken. + rawset(_G, 'old_up_timeout', ivconst.FAILOVER_UP_TIMEOUT) + ivconst.FAILOVER_UP_TIMEOUT = 0.01 + ivtest.service_wait_for_new_ok(router.failover_service, + {on_yield = router.failover_fiber:wakeup()}) + t.assert_equals(rs.replica.uuid, master_uuid) + end, {g.replica_1_b:replicaset_uuid(), g.replica_1_a:instance_uuid()}) + + -- Restore 'info' request. + g.replica_1_b:exec(function() + ivshard.storage._call = _G.old_call + _G.old_call = nil + end) + + -- As replica_1_b has higher priority, it should be restored automatically. + g.router:exec(function(rs_uuid, replica_uuid) + local router = ivshard.router.internal.static_router + local rs = router.replicasets[rs_uuid] + t.assert_equals(rs.priority_list[1].uuid, replica_uuid) + t.helpers.retrying({timeout = iwait_timeout}, function() + router.failover_fiber:wakeup() + t.assert_equals(rs.replica.uuid, replica_uuid, + 'Prioritized replica is not up yet') + end) + end, {g.replica_1_b:replicaset_uuid(), g.replica_1_b:instance_uuid()}) + + vtest.router_cfg(g.router, global_cfg) + g.router:exec(function() + ivconst.FAILOVER_UP_TIMEOUT = _G.old_up_timeout + _G.old_up_timeout = nil + end) +end + +-- +-- gh-483: user calls also affects priority. If several sequential requests +-- fail, then the same logic as in the previous test happens. +-- +g.test_failed_calls_affect_priority = function() + prepare_affect_priority_rs(g) + local timeout = vconsts.CALL_TIMEOUT_MIN * 4 + + -- Find prioritized replica and disable failover for now. + g.router:exec(function(rs_uuid, replica_uuid) + local router = ivshard.router.internal.static_router + local rs = router.replicasets[rs_uuid] + local opts = {timeout = iwait_timeout} + rs:wait_connected_all(opts) + + t.helpers.retrying(opts, function() + router.failover_fiber:wakeup() + t.assert_equals(rs.replica.uuid, replica_uuid, + 'Prioritized replica have not been set yet') + end) + + local errinj = ivshard.router.internal.errinj + errinj.ERRINJ_FAILOVER_DELAY = true + t.helpers.retrying(opts, function() + router.failover_fiber:wakeup() + t.assert_equals(errinj.ERRINJ_FAILOVER_DELAY, 'in', + 'Failover have not been stopped yet') + end) + + -- Discovery is disabled, as it may affect `net_sequential_fail` + -- and leads to flakiness of the test. + errinj.ERRINJ_LONG_DISCOVERY = true + t.helpers.retrying(opts, function() + router.discovery_fiber:wakeup() + t.assert_equals(errinj.ERRINJ_LONG_DISCOVERY, 'waiting', + 'Discovery have not been stopped yet') + end) + end, {g.replica_1_b:replicaset_uuid(), g.replica_1_b:instance_uuid()}) + + -- Break 'info' request on replica so that it fails with TimedOut error. + -- No other request can be broken, as only failover changes priority and + -- as soon as it wakes up it succeeds with `_call` and sets + -- `net_sequential_fail` to 0. + g.replica_1_b:exec(function() + rawset(_G, 'old_call', ivshard.storage._call) + ivshard.storage._call = function(service_name, ...) + if service_name == 'info' then + ifiber.sleep(ivconst.CALL_TIMEOUT_MIN * 5) + end + return _G.old_call(service_name, ...) + end + end) + + affect_priority_clear_net_timeout(g) + local bid = vtest.storage_first_bucket(g.replica_1_a) + g.router:exec(function(bid, timeout, rs_uuid, replica_uuid) + local router = ivshard.router.internal.static_router + local replica = router.replicasets[rs_uuid].replica + t.assert_equals(replica.uuid, replica_uuid) + + local fails = replica.net_sequential_fail + for _ = 1, ivconst.FAILOVER_DOWN_SEQUENTIAL_FAIL do + local ok, err = ivshard.router.callro(bid, 'vshard.storage._call', + {'info'}, {timeout = timeout}) + t.assert_not(ok) + t.assert(iverror.is_timeout(err)) + end + + t.assert_equals(replica.net_sequential_fail, + fails + ivconst.FAILOVER_DOWN_SEQUENTIAL_FAIL) + + -- Priority is changed only by failover. So, the prioritized replica + -- is still the failing one. + t.assert_equals(router.replicasets[rs_uuid].replica.uuid, replica_uuid) + end, {bid, timeout, g.replica_1_b:replicaset_uuid(), + g.replica_1_b:instance_uuid()}) + + -- Enable failover, which changes priority of the replica. + g.router:exec(function(rs_uuid, master_uuid) + local router = ivshard.router.internal.static_router + ivshard.router.internal.errinj.ERRINJ_FAILOVER_DELAY = false + t.helpers.retrying({timeout = iwait_timeout}, function() + router.failover_fiber:wakeup() + t.assert_equals(router.replicasets[rs_uuid].replica.uuid, + master_uuid, 'Master is not prioritized yet') + end) + end, {g.replica_1_b:replicaset_uuid(), g.replica_1_a:instance_uuid()}) + + -- Restore 'info' request. + g.replica_1_b:exec(function() + ivshard.storage._call = _G.old_call + _G.old_call = nil + end) + + -- As replica_1_b has higher priority, it should be restored automatically. + g.router:exec(function(rs_uuid, replica_uuid) + local old_up_timeout = ivconst.FAILOVER_UP_TIMEOUT + ivconst.FAILOVER_UP_TIMEOUT = 1 + local router = ivshard.router.internal.static_router + local rs = router.replicasets[rs_uuid] + t.assert_equals(rs.priority_list[1].uuid, replica_uuid) + t.helpers.retrying({timeout = iwait_timeout}, function() + router.failover_fiber:wakeup() + t.assert_equals(rs.replica.uuid, replica_uuid, + 'Prioritized replica is not up yet') + end) + ivconst.FAILOVER_UP_TIMEOUT = old_up_timeout + end, {g.replica_1_b:replicaset_uuid(), g.replica_1_b:instance_uuid()}) + + vtest.router_cfg(g.router, global_cfg) +end diff --git a/test/router/exponential_timeout.result b/test/router/exponential_timeout.result index 252b816c..bd62b739 100644 --- a/test/router/exponential_timeout.result +++ b/test/router/exponential_timeout.result @@ -28,9 +28,10 @@ util.map_evals(test_run, {REPLICASET_1, REPLICASET_2}, 'bootstrap_storage(\'memt _ = test_run:cmd("create server router_1 with script='router/router_1.lua'") --- ... --- Discovery algorithm changes sometimes and should not affect the +-- Discovery algorithm and failover changes sometimes and should not affect the -- exponential timeout test. -_ = test_run:cmd("start server router_1 with args='discovery_disable'") +_ = test_run:cmd("start server router_1 with " .. \ + "args='discovery_disable failover_disable'") --- ... _ = test_run:switch('router_1') @@ -103,7 +104,7 @@ util.collect_timeouts(rs1) - - fail: 0 ok: 0 timeout: 0.5 - - fail: 1 + - fail: 2 ok: 0 timeout: 1 ... diff --git a/test/router/exponential_timeout.test.lua b/test/router/exponential_timeout.test.lua index 881b9a7e..ab677d60 100644 --- a/test/router/exponential_timeout.test.lua +++ b/test/router/exponential_timeout.test.lua @@ -10,9 +10,10 @@ util.wait_master(test_run, REPLICASET_1, 'storage_1_a') util.wait_master(test_run, REPLICASET_2, 'storage_2_a') util.map_evals(test_run, {REPLICASET_1, REPLICASET_2}, 'bootstrap_storage(\'memtx\')') _ = test_run:cmd("create server router_1 with script='router/router_1.lua'") --- Discovery algorithm changes sometimes and should not affect the +-- Discovery algorithm and failover changes sometimes and should not affect the -- exponential timeout test. -_ = test_run:cmd("start server router_1 with args='discovery_disable'") +_ = test_run:cmd("start server router_1 with " .. \ + "args='discovery_disable failover_disable'") _ = test_run:switch('router_1') util = require('util') diff --git a/test/router/retry_reads.result b/test/router/retry_reads.result index 80834750..e057422f 100644 --- a/test/router/retry_reads.result +++ b/test/router/retry_reads.result @@ -28,9 +28,10 @@ util.map_evals(test_run, {REPLICASET_1, REPLICASET_2}, 'bootstrap_storage(\'memt _ = test_run:cmd("create server router_1 with script='router/router_1.lua'") --- ... --- Discovery algorithm changes sometimes and should not affect the --- read retry decisions. -_ = test_run:cmd("start server router_1 with args='discovery_disable'") +-- Discovery algorithm and failover changes sometimes and should not affect the +-- exponential timeout test. +_ = test_run:cmd("start server router_1 with " .. \ + "args='discovery_disable failover_disable'") --- ... _ = test_run:switch('router_1') @@ -69,7 +70,7 @@ util.collect_timeouts(rs1) - - fail: 0 ok: 0 timeout: 0.5 - - fail: 1 + - fail: 2 ok: 0 timeout: 1 ... diff --git a/test/router/retry_reads.test.lua b/test/router/retry_reads.test.lua index f0a44457..a9554d77 100644 --- a/test/router/retry_reads.test.lua +++ b/test/router/retry_reads.test.lua @@ -10,9 +10,10 @@ util.wait_master(test_run, REPLICASET_1, 'storage_1_a') util.wait_master(test_run, REPLICASET_2, 'storage_2_a') util.map_evals(test_run, {REPLICASET_1, REPLICASET_2}, 'bootstrap_storage(\'memtx\')') _ = test_run:cmd("create server router_1 with script='router/router_1.lua'") --- Discovery algorithm changes sometimes and should not affect the --- read retry decisions. -_ = test_run:cmd("start server router_1 with args='discovery_disable'") +-- Discovery algorithm and failover changes sometimes and should not affect the +-- exponential timeout test. +_ = test_run:cmd("start server router_1 with " .. \ + "args='discovery_disable failover_disable'") _ = test_run:switch('router_1') util = require('util') diff --git a/test/router/router.result b/test/router/router.result index 4389367f..8f678564 100644 --- a/test/router/router.result +++ b/test/router/router.result @@ -1527,7 +1527,8 @@ table.sort(error_messages) ... error_messages --- -- - Use replica:check_is_connected(...) instead of replica.check_is_connected(...) +- - Use replica:call(...) instead of replica.call(...) + - Use replica:check_is_connected(...) instead of replica.check_is_connected(...) - Use replica:detach_conn(...) instead of replica.detach_conn(...) - Use replica:is_connected(...) instead of replica.is_connected(...) - Use replica:safe_uri(...) instead of replica.safe_uri(...) diff --git a/test/router/router_1.lua b/test/router/router_1.lua deleted file mode 120000 index da63b08a..00000000 --- a/test/router/router_1.lua +++ /dev/null @@ -1 +0,0 @@ -../../example/router_1.lua \ No newline at end of file diff --git a/test/router/router_1.lua b/test/router/router_1.lua new file mode 100755 index 00000000..887c43bc --- /dev/null +++ b/test/router/router_1.lua @@ -0,0 +1,35 @@ +#!/usr/bin/env tarantool + +require('strict').on() +fiber = require('fiber') + +-- Check if we are running under test-run +if os.getenv('ADMIN') then + test_run = require('test_run').new() + require('console').listen(os.getenv('ADMIN')) +end + +replicasets = {'cbf06940-0790-498b-948d-042b62cf3d29', + 'ac522f65-aa94-4134-9f64-51ee384f1a54'} + +-- Call a configuration provider +cfg = dofile('localcfg.lua') +if arg[1] == 'discovery_disable' then + cfg.discovery_mode = 'off' +end + +-- Start the database with sharding +vshard = require('vshard') + +if arg[2] == 'failover_disable' then + vshard.router.internal.errinj.ERRINJ_FAILOVER_DELAY = true +end + +vshard.router.cfg(cfg) + +if arg[2] == 'failover_disable' then + while vshard.router.internal.errinj.ERRINJ_FAILOVER_DELAY ~= 'in' do + router.failover_fiber:wakeup() + fiber.sleep(0.01) + end +end diff --git a/vshard/consts.lua b/vshard/consts.lua index 249d4915..f8bedc4b 100644 --- a/vshard/consts.lua +++ b/vshard/consts.lua @@ -49,6 +49,7 @@ return { CALL_TIMEOUT_MAX = 64; FAILOVER_UP_TIMEOUT = 5; FAILOVER_DOWN_TIMEOUT = 1; + FAILOVER_DOWN_SEQUENTIAL_FAIL = 3; DEFAULT_FAILOVER_PING_TIMEOUT = 5; DEFAULT_SYNC_TIMEOUT = 1; RECONNECT_TIMEOUT = 0.5; diff --git a/vshard/replicaset.lua b/vshard/replicaset.lua index d38ef49a..abdffe67 100644 --- a/vshard/replicaset.lua +++ b/vshard/replicaset.lua @@ -483,8 +483,10 @@ end -- local function replicaset_down_replica_priority(replicaset) local old_replica = replicaset.replica - assert(old_replica and old_replica.down_ts and - not old_replica:is_connected()) + assert(old_replica and ((old_replica.down_ts and + not old_replica:is_connected()) or + old_replica.net_sequential_fail >= + consts.FAILOVER_DOWN_SEQUENTIAL_FAIL)) local new_replica = old_replica.next_by_priority if new_replica then assert(new_replica ~= old_replica) @@ -511,7 +513,8 @@ local function replicaset_up_replica_priority(replicaset) -- Failed to up priority. return end - if replica:is_connected() then + if replica:is_connected() and replica.net_sequential_ok > 0 then + assert(replica.net_sequential_fail == 0) replicaset.replica = replica assert(not old_replica or old_replica.weight >= replicaset.replica.weight) @@ -527,15 +530,12 @@ end -- local function replica_on_failed_request(replica) replica.net_sequential_ok = 0 - local val = replica.net_sequential_fail + 1 - if val >= 2 then + replica.net_sequential_fail = replica.net_sequential_fail + 1 + if replica.net_sequential_fail >= 2 then local new_timeout = replica.net_timeout * 2 if new_timeout <= consts.CALL_TIMEOUT_MAX then replica.net_timeout = new_timeout end - replica.net_sequential_fail = 1 - else - replica.net_sequential_fail = val end end @@ -1268,6 +1268,7 @@ local replica_mt = { return util.uri_format(uri) end, detach_conn = replica_detach_conn, + call = replica_call, }, __tostring = function(replica) if replica.name then diff --git a/vshard/router/init.lua b/vshard/router/init.lua index 6ac80b79..cbd978ef 100644 --- a/vshard/router/init.lua +++ b/vshard/router/init.lua @@ -44,6 +44,7 @@ if not M then ERRINJ_CFG = false, ERRINJ_CFG_DELAY = false, ERRINJ_FAILOVER_CHANGE_CFG = false, + ERRINJ_FAILOVER_DELAY = false, ERRINJ_RELOAD = false, ERRINJ_LONG_DISCOVERY = false, ERRINJ_MASTER_SEARCH_DELAY = false, @@ -973,41 +974,26 @@ end -- Failover -------------------------------------------------------------------------------- -local function failover_ping_round(router) - for _, replicaset in pairs(router.replicasets) do - local replica = replicaset.replica - if replica ~= nil and replica.conn ~= nil and - replica.down_ts == nil then - if not replica.conn:ping({timeout = - router.failover_ping_timeout}) then - log.info('Ping error from %s: perhaps a connection is down', - replica) - -- Connection hangs. Recreate it to be able to - -- fail over to a replica next by priority. The - -- old connection is not closed in case if it just - -- processes too big response at this moment. Any - -- way it will be eventually garbage collected - -- and closed. - replica:detach_conn() - replicaset:connect_replica(replica) - end - end - end -end - -- -- Replicaset must fall its replica connection to lower priority, -- if the current one is down too long. -- local function failover_need_down_priority(replicaset, curr_ts) local r = replicaset.replica + if not r or not r.next_by_priority then + return false + end -- down_ts not nil does not mean that the replica is not -- connected. Probably it is connected and now fetches schema, -- or does authorization. Either case, it is healthy, no need -- to down the prio. - return r and r.down_ts and not r:is_connected() and - curr_ts - r.down_ts >= consts.FAILOVER_DOWN_TIMEOUT - and r.next_by_priority + local is_down_ts = r.down_ts and not r:is_connected() and + curr_ts - r.down_ts >= consts.FAILOVER_DOWN_TIMEOUT + -- If we failed several sequential requests to replica, then something + -- is wrong with it. Temporary lower its priority. + local is_sequential_fails = + r.net_sequential_fail >= consts.FAILOVER_DOWN_SEQUENTIAL_FAIL + return is_down_ts or is_sequential_fails end -- @@ -1035,6 +1021,49 @@ local function failover_collect_to_update(router) return id_to_update end +local function failover_ping(replica, opts) + return replica:call('vshard.storage._call', {'info'}, opts) +end + +local function failover_ping_round(router, curr_ts) + local opts = {timeout = router.failover_ping_timeout} + for _, replicaset in pairs(router.replicasets) do + local replica = replicaset.replica + if failover_need_up_priority(replicaset, curr_ts) then + -- When its time to increase priority in replicaset, all instances, + -- priority of which are higher than the current one,are pinged so + -- that we know, which connections are working properly. + for _, r in pairs(replicaset.priority_list) do + if r == replica then + break + end + if r.conn ~= nil and r.down_ts == nil then + -- We don't need return values, r.net_sequential_ok is + -- incremented on succcessful request. + failover_ping(r, opts) + end + end + end + if replica ~= nil and replica.conn ~= nil and + replica.down_ts == nil then + local net_status, _, err = failover_ping(replica, opts) + if not net_status then + log.info('Ping error from %s: perhaps a connection is down: %s', + replica, err) + -- Connection hangs. Recreate it to be able to + -- fail over to a replica next by priority. The + -- old connection is not closed in case if it just + -- processes too big response at this moment. Any + -- way it will be eventually garbage collected + -- and closed. + replica:detach_conn() + replicaset:connect_replica(replica) + end + end + end +end + + -- -- Detect not optimal or disconnected replicas. For not optimal -- try to update them to optimal, and down priority of @@ -1042,12 +1071,12 @@ end -- @retval true A replica of an replicaset has been changed. -- local function failover_step(router) - failover_ping_round(router) + local curr_ts = fiber_clock() + failover_ping_round(router, curr_ts) local id_to_update = failover_collect_to_update(router) if #id_to_update == 0 then return false end - local curr_ts = fiber_clock() local replica_is_changed = false for _, id in pairs(id_to_update) do local rs = router.replicasets[id] @@ -1105,6 +1134,12 @@ local function failover_service_f(router, service) -- each min_timeout seconds. local prev_was_ok = false while module_version == M.module_version do + if M.errinj.ERRINJ_FAILOVER_DELAY then + M.errinj.ERRINJ_FAILOVER_DELAY = 'in' + repeat + lfiber.sleep(0.001) + until not M.errinj.ERRINJ_FAILOVER_DELAY + end service:next_iter() service:set_activity('updating replicas') local ok, replica_is_changed = pcall(failover_step, router)