diff --git a/test/instances/router.lua b/test/instances/router.lua index f373f21e..6614539e 100755 --- a/test/instances/router.lua +++ b/test/instances/router.lua @@ -10,6 +10,8 @@ _G.ifiber = require('fiber') _G.ilt = require('luatest') _G.imsgpack = require('msgpack') _G.ivtest = require('test.luatest_helpers.vtest') +_G.ivconst = require('vshard.consts') +_G.iverror = require('vshard.error') _G.iwait_timeout = _G.ivtest.wait_timeout -- Do not load entire vshard into the global namespace to catch errors when code diff --git a/test/replicaset-luatest/replicaset_3_test.lua b/test/replicaset-luatest/replicaset_3_test.lua index 253327e7..bbc5ca48 100644 --- a/test/replicaset-luatest/replicaset_3_test.lua +++ b/test/replicaset-luatest/replicaset_3_test.lua @@ -366,3 +366,138 @@ test_group.test_ipv6_uri = function(g) local replica_string = 'replica_1_a(storage@[::1]:3301)' t.assert_equals(tostring(rs.master), replica_string) end + +local function hang_get_uuid(instance) + instance:exec(function() + rawset(_G, 'sleep_num', 0) + rawset(_G, 'sleep_cond', ifiber.cond()) + rawset(_G, 'old_get_uuid', _G.get_uuid) + _G.get_uuid = function() + _G.sleep_num = _G.sleep_num + 1 + _G.sleep_cond:wait() + end + end) +end + +local function reset_sleep(instance) + instance:exec(function() + _G.sleep_cond:broadcast() + _G.sleep_num = 0 + end) +end + +local function reset_get_uuid(instance) + instance:exec(function() + _G.sleep_cond:broadcast() + _G.get_uuid = _G.old_get_uuid + _G.old_get_uuid = nil + _G.sleep_cond = nil + _G.sleep_num = nil + end) +end + +local function prepare_stateless_balancing_rs() + local new_cfg_template = table.deepcopy(cfg_template) + -- replica_1_b > replica_1_a (master) > replica_1_c + new_cfg_template.sharding[1].replicas.replica_1_b.zone = 2 + new_cfg_template.sharding[1].replicas.replica_1_a.zone = 3 + new_cfg_template.sharding[1].replicas.replica_1_c.zone = 4 + new_cfg_template.zone = 1 + new_cfg_template.weights = { + [1] = { + [2] = 1, + [3] = 2, + [4] = 3, + } + } + + local new_global_cfg = vtest.config_new(new_cfg_template) + local _, rs = next(vreplicaset.buildall(new_global_cfg)) + rs:wait_connected_all(timeout_opts) + return rs +end + +test_group.test_stateless_balancing_callro = function(g) + local rs = prepare_stateless_balancing_rs() + -- replica_1_b is the prioritized one. + local uuid = rs:callro('get_uuid', {}, timeout_opts) + t.assert_equals(uuid, g.replica_1_b:instance_uuid()) + + -- + -- callro fallback to lowest prioritized replica , if request to other + -- instances fail. + -- + hang_get_uuid(g.replica_1_a) + hang_get_uuid(g.replica_1_b) + local request_timeout_opts = {request_timeout = 1, timeout = 3} + uuid = rs:callro('get_uuid', {}, request_timeout_opts) + t.assert_equals(uuid, g.replica_1_c:instance_uuid()) + t.assert_equals(g.replica_1_a:eval('return _G.sleep_num'), 1) + t.assert_equals(g.replica_1_b:eval('return _G.sleep_num'), 1) + reset_sleep(g.replica_1_a) + reset_sleep(g.replica_1_b) + + -- + -- If all instances are unresponsive, there's nothing we can do. + -- Test, that when timeout > request_timeout * #rs, then we make + -- several requests to some replicas. + -- + local err + hang_get_uuid(g.replica_1_c) + request_timeout_opts.timeout = 3.5 + uuid, err = rs:callro('get_uuid', {}, request_timeout_opts) + t.assert_equals(uuid, nil) + -- Either 'timed out' or 'Timeout exceeded' message. Depends on version. + t.assert(verror.is_timeout(err)) + t.assert_equals(g.replica_1_b:eval('return _G.sleep_num'), 2) + t.assert_equals(g.replica_1_c:eval('return _G.sleep_num'), 1) + t.assert_equals(g.replica_1_a:eval('return _G.sleep_num'), 1) + + reset_get_uuid(g.replica_1_a) + reset_get_uuid(g.replica_1_b) + reset_get_uuid(g.replica_1_c) +end + +test_group.test_stateless_balancing_callre = function(g) + local rs = prepare_stateless_balancing_rs() + -- replica_1_b is the prioritized one. + local uuid = rs:callre('get_uuid', {}, timeout_opts) + t.assert_equals(uuid, g.replica_1_b:instance_uuid()) + + -- + -- callre fallback to another replica, if request fail. + -- + hang_get_uuid(g.replica_1_b) + local request_timeout_opts = {request_timeout = 1, timeout = 3} + uuid = rs:callre('get_uuid', {}, request_timeout_opts) + t.assert_equals(uuid, g.replica_1_c:instance_uuid()) + t.assert_equals(g.replica_1_b:eval('return _G.sleep_num'), 1) + reset_sleep(g.replica_1_b) + + -- + -- If all replicas are unresponsive, fallback to the master. + -- + hang_get_uuid(g.replica_1_c) + uuid = rs:callre('get_uuid', {}, request_timeout_opts) + t.assert_equals(uuid, g.replica_1_a:instance_uuid()) + t.assert_equals(g.replica_1_b:eval('return _G.sleep_num'), 1) + t.assert_equals(g.replica_1_c:eval('return _G.sleep_num'), 1) + reset_sleep(g.replica_1_b) + reset_sleep(g.replica_1_c) + + -- + -- If it's not enough time to make requests to all replicas and then + -- to master, we won't fallback to master and will fail earlier. + -- + local err + request_timeout_opts.timeout = 1.5 + uuid, err = rs:callre('get_uuid', {}, request_timeout_opts) + t.assert_equals(uuid, nil) + -- Either 'timed out' or 'Timeout exceeded' message. Depends on version. + t.assert(verror.is_timeout(err)) + t.assert_equals(g.replica_1_b:eval('return _G.sleep_num'), 1) + t.assert_equals(g.replica_1_c:eval('return _G.sleep_num'), 1) + + reset_get_uuid(g.replica_1_b) + reset_get_uuid(g.replica_1_c) +end diff --git a/test/router-luatest/router_test.lua b/test/router-luatest/router_test.lua index a29a495a..b2e12951 100644 --- a/test/router-luatest/router_test.lua +++ b/test/router-luatest/router_test.lua @@ -1,6 +1,7 @@ local t = require('luatest') local vtest = require('test.luatest_helpers.vtest') local vutil = require('vshard.util') +local vconsts = require('vshard.consts') local g = t.group('router') local cfg_template = { @@ -798,3 +799,308 @@ g.test_master_discovery_on_disconnect = function(g) vtest.router_cfg(g.router, global_cfg) vtest.cluster_cfg(g, global_cfg) end + +g.test_request_timeout = function(g) + local bid = vtest.storage_first_bucket(g.replica_1_a) + -- + -- Test request_timeout API. + -- + g.router:exec(function(bid) + local opts = {} + -- request_timeout must be <= timeout. + local err_msg = 'request_timeout must be <= timeout' + opts.request_timeout = ivconst.CALL_TIMEOUT_MIN * 2 + t.assert_error_msg_contains(err_msg, function() + ivshard.router.callro(bid, 'echo', {1}, opts) + end) + -- request_timeout must be a number. + err_msg = 'Usage: call' + opts.request_timeout = 'string' + t.assert_error_msg_contains(err_msg, function() + ivshard.router.callro(bid, 'echo', {1}, opts) + end) + -- request_timeout <= 0 leads to the TimeOut error. + err_msg = 'Timeout' + for _, timeout in ipairs({-1, 0}) do + opts.request_timeout = timeout + local ok, err = ivshard.router.callro(bid, 'echo', {1}, opts) + t.assert_not(ok) + t.assert_str_contains(err.message, err_msg) + end + end, {bid}) + + -- + -- Test, that router makes the desired number of calls, when + -- request_timeout is not a divisor of the timeout. + -- + g.replica_1_a:exec(function() + rawset(_G, 'sleep_num', 0) + rawset(_G, 'sleep_cond', ifiber.cond()) + rawset(_G, 'old_get_uuid', _G.get_uuid) + _G.get_uuid = function() + _G.sleep_num = _G.sleep_num + 1 + _G.sleep_cond:wait() + end + end) + g.router:exec(function(bid, uuid) + local res = ivshard.router.callro(bid, 'get_uuid', {}, { + request_timeout = 1, + timeout = 1.5, + }) + t.assert_equals(res, uuid) + end, {bid, g.replica_1_b:instance_uuid()}) + + -- Timeout errors are retried. Master's priority is higher, when weights + -- are equal, currently they're not specified at all. Request to master + -- fails and we fallback to the replica. + t.assert_equals(g.replica_1_a:eval('return _G.sleep_num'), 1) + g.replica_1_a:exec(function() + _G.sleep_cond:broadcast() + _G.get_uuid = _G.old_get_uuid + _G.old_get_uuid = nil + _G.sleep_cond = nil + _G.sleep_num = nil + end) +end + +local function prepare_affect_priority_rs(g) + local new_cfg_template = table.deepcopy(cfg_template) + new_cfg_template.sharding[1].replicas.replica_1_a.zone = 3 + new_cfg_template.sharding[1].replicas.replica_1_b.zone = 2 + new_cfg_template.zone = 1 + new_cfg_template.weights = { + [1] = { + [1] = 0, + [2] = 1, + [3] = 2, + }, + } + -- So that ping timeout is always > replica.net_timeout. + -- net_timeout starts with CALL_TIMEOUT_MIN and is mutiplied by 2 if number + -- of failed requests is >= 2. + new_cfg_template.failover_ping_timeout = vconsts.CALL_TIMEOUT_MIN * 4 + local new_cluster_cfg = vtest.config_new(new_cfg_template) + vtest.router_cfg(g.router, new_cluster_cfg) +end + +local function affect_priority_clear_net_timeout(g) + g.router:exec(function() + -- Reset net_timeout, so that it doesn't affect the test. This is + -- needed as we use the absolute minimum failover_ping_timeout for + -- FAILOVER_DOWN_SEQUENTIAL_FAIL = 3. 10 successful calls are needed + -- to restore it to CALL_TIMEOUT_MIN wthout reset. + local router = ivshard.router.internal.static_router + for _, rs in pairs(router.replicasets) do + for _, r in pairs(rs.replicas) do + r.net_timeout = ivconst.CALL_TIMEOUT_MIN + end + end + end) +end + +-- +-- gh-483: failover ping temporary lower replica's priority, when it cannot be +-- reached several times in a row: +-- +-- 1. replica_1_b is the prioritized one. replica_1_a is the second one. +-- 2. router establishes connection to all instances, failover sets prioritized +-- replica_1_b. +-- 3. Node breaks and stops to respond. +-- 4. Failover retries ping FAILOVER_DOWN_SEQUENTIAL_FAIL times and changes +-- prioritized replica to the lower one. Note, that connection is recreated +-- on every failed ping. +-- 5. Every FAILOVER_UP_TIMEOUT failover checks, if any replica with higher +-- priority can be reached and changes the prioritized replica if it's so. +-- +g.test_failover_ping_affects_priority = function() + prepare_affect_priority_rs(g) + + -- Find prioritized replica and disable failover for now. + g.router:exec(function(rs_uuid, replica_uuid) + local router = ivshard.router.internal.static_router + local rs = router.replicasets[rs_uuid] + local opts = {timeout = iwait_timeout} + rs:wait_connected_all(opts) + + t.helpers.retrying(opts, function() + router.failover_fiber:wakeup() + t.assert_equals(rs.replica.uuid, replica_uuid, + 'Prioritized replica have not been set yet') + end) + + local errinj = ivshard.router.internal.errinj + errinj.ERRINJ_FAILOVER_DELAY = true + t.helpers.retrying(opts, function() + router.failover_fiber:wakeup() + t.assert_equals(errinj.ERRINJ_FAILOVER_DELAY, 'in', + 'Failover have not been stopped yet') + end) + end, {g.replica_1_b:replicaset_uuid(), g.replica_1_b:instance_uuid()}) + + -- Break 'info' request on replica so that it fails with TimedOut error. + g.replica_1_b:exec(function() + rawset(_G, 'old_call', ivshard.storage._call) + ivshard.storage._call = function(service_name, ...) + if service_name == 'info' then + ifiber.sleep(ivconst.CALL_TIMEOUT_MIN * 5) + end + return _G.old_call(service_name, ...) + end + end) + + affect_priority_clear_net_timeout(g) + g.router:exec(function(rs_uuid, master_uuid) + local router = ivshard.router.internal.static_router + local rs = router.replicasets[rs_uuid] + + -- And we change the prioritized replica. + ivshard.router.internal.errinj.ERRINJ_FAILOVER_DELAY = false + t.helpers.retrying({timeout = iwait_timeout}, function() + router.failover_fiber:wakeup() + t.assert_equals(rs.replica.uuid, master_uuid) + end) + + -- Check, that prioritized replica is not changed, as it's still broken. + rawset(_G, 'old_up_timeout', ivconst.FAILOVER_UP_TIMEOUT) + ivconst.FAILOVER_UP_TIMEOUT = 0.01 + ivtest.service_wait_for_new_ok(router.failover_service, + {on_yield = router.failover_fiber:wakeup()}) + t.assert_equals(rs.replica.uuid, master_uuid) + end, {g.replica_1_b:replicaset_uuid(), g.replica_1_a:instance_uuid()}) + + -- Restore 'info' request. + g.replica_1_b:exec(function() + ivshard.storage._call = _G.old_call + _G.old_call = nil + end) + + -- As replica_1_b has higher priority, it should be restored automatically. + g.router:exec(function(rs_uuid, replica_uuid) + local router = ivshard.router.internal.static_router + local rs = router.replicasets[rs_uuid] + t.assert_equals(rs.priority_list[1].uuid, replica_uuid) + t.helpers.retrying({timeout = iwait_timeout}, function() + router.failover_fiber:wakeup() + t.assert_equals(rs.replica.uuid, replica_uuid, + 'Prioritized replica is not up yet') + end) + end, {g.replica_1_b:replicaset_uuid(), g.replica_1_b:instance_uuid()}) + + vtest.router_cfg(g.router, global_cfg) + g.router:exec(function() + ivconst.FAILOVER_UP_TIMEOUT = _G.old_up_timeout + _G.old_up_timeout = nil + end) +end + +-- +-- gh-483: user calls also affects priority. If several sequential requests +-- fail, then the same logic as in the previous test happens. +-- +g.test_failed_calls_affect_priority = function() + prepare_affect_priority_rs(g) + local timeout = vconsts.CALL_TIMEOUT_MIN * 4 + + -- Find prioritized replica and disable failover for now. + g.router:exec(function(rs_uuid, replica_uuid) + local router = ivshard.router.internal.static_router + local rs = router.replicasets[rs_uuid] + local opts = {timeout = iwait_timeout} + rs:wait_connected_all(opts) + + t.helpers.retrying(opts, function() + router.failover_fiber:wakeup() + t.assert_equals(rs.replica.uuid, replica_uuid, + 'Prioritized replica have not been set yet') + end) + + local errinj = ivshard.router.internal.errinj + errinj.ERRINJ_FAILOVER_DELAY = true + t.helpers.retrying(opts, function() + router.failover_fiber:wakeup() + t.assert_equals(errinj.ERRINJ_FAILOVER_DELAY, 'in', + 'Failover have not been stopped yet') + end) + + -- Discovery is disabled, as it may affect `net_sequential_fail` + -- and leads to flakiness of the test. + errinj.ERRINJ_LONG_DISCOVERY = true + t.helpers.retrying(opts, function() + router.discovery_fiber:wakeup() + t.assert_equals(errinj.ERRINJ_LONG_DISCOVERY, 'waiting', + 'Discovery have not been stopped yet') + end) + end, {g.replica_1_b:replicaset_uuid(), g.replica_1_b:instance_uuid()}) + + -- Break 'info' request on replica so that it fails with TimedOut error. + -- No other request can be broken, as only failover changes priority and + -- as soon as it wakes up it succeeds with `_call` and sets + -- `net_sequential_fail` to 0. + g.replica_1_b:exec(function() + rawset(_G, 'old_call', ivshard.storage._call) + ivshard.storage._call = function(service_name, ...) + if service_name == 'info' then + ifiber.sleep(ivconst.CALL_TIMEOUT_MIN * 5) + end + return _G.old_call(service_name, ...) + end + end) + + affect_priority_clear_net_timeout(g) + local bid = vtest.storage_first_bucket(g.replica_1_a) + g.router:exec(function(bid, timeout, rs_uuid, replica_uuid) + local router = ivshard.router.internal.static_router + local replica = router.replicasets[rs_uuid].replica + t.assert_equals(replica.uuid, replica_uuid) + + local fails = replica.net_sequential_fail + for _ = 1, ivconst.FAILOVER_DOWN_SEQUENTIAL_FAIL do + local ok, err = ivshard.router.callro(bid, 'vshard.storage._call', + {'info'}, {timeout = timeout}) + t.assert_not(ok) + t.assert(iverror.is_timeout(err)) + end + + t.assert_equals(replica.net_sequential_fail, + fails + ivconst.FAILOVER_DOWN_SEQUENTIAL_FAIL) + + -- Priority is changed only by failover. So, the prioritized replica + -- is still the failing one. + t.assert_equals(router.replicasets[rs_uuid].replica.uuid, replica_uuid) + end, {bid, timeout, g.replica_1_b:replicaset_uuid(), + g.replica_1_b:instance_uuid()}) + + -- Enable failover, which changes priority of the replica. + g.router:exec(function(rs_uuid, master_uuid) + local router = ivshard.router.internal.static_router + ivshard.router.internal.errinj.ERRINJ_FAILOVER_DELAY = false + t.helpers.retrying({timeout = iwait_timeout}, function() + router.failover_fiber:wakeup() + t.assert_equals(router.replicasets[rs_uuid].replica.uuid, + master_uuid, 'Master is not prioritized yet') + end) + end, {g.replica_1_b:replicaset_uuid(), g.replica_1_a:instance_uuid()}) + + -- Restore 'info' request. + g.replica_1_b:exec(function() + ivshard.storage._call = _G.old_call + _G.old_call = nil + end) + + -- As replica_1_b has higher priority, it should be restored automatically. + g.router:exec(function(rs_uuid, replica_uuid) + local old_up_timeout = ivconst.FAILOVER_UP_TIMEOUT + ivconst.FAILOVER_UP_TIMEOUT = 1 + local router = ivshard.router.internal.static_router + local rs = router.replicasets[rs_uuid] + t.assert_equals(rs.priority_list[1].uuid, replica_uuid) + t.helpers.retrying({timeout = iwait_timeout}, function() + router.failover_fiber:wakeup() + t.assert_equals(rs.replica.uuid, replica_uuid, + 'Prioritized replica is not up yet') + end) + ivconst.FAILOVER_UP_TIMEOUT = old_up_timeout + end, {g.replica_1_b:replicaset_uuid(), g.replica_1_b:instance_uuid()}) + + vtest.router_cfg(g.router, global_cfg) +end diff --git a/test/router/exponential_timeout.result b/test/router/exponential_timeout.result index 252b816c..bd62b739 100644 --- a/test/router/exponential_timeout.result +++ b/test/router/exponential_timeout.result @@ -28,9 +28,10 @@ util.map_evals(test_run, {REPLICASET_1, REPLICASET_2}, 'bootstrap_storage(\'memt _ = test_run:cmd("create server router_1 with script='router/router_1.lua'") --- ... --- Discovery algorithm changes sometimes and should not affect the +-- Discovery algorithm and failover changes sometimes and should not affect the -- exponential timeout test. -_ = test_run:cmd("start server router_1 with args='discovery_disable'") +_ = test_run:cmd("start server router_1 with " .. \ + "args='discovery_disable failover_disable'") --- ... _ = test_run:switch('router_1') @@ -103,7 +104,7 @@ util.collect_timeouts(rs1) - - fail: 0 ok: 0 timeout: 0.5 - - fail: 1 + - fail: 2 ok: 0 timeout: 1 ... diff --git a/test/router/exponential_timeout.test.lua b/test/router/exponential_timeout.test.lua index 881b9a7e..ab677d60 100644 --- a/test/router/exponential_timeout.test.lua +++ b/test/router/exponential_timeout.test.lua @@ -10,9 +10,10 @@ util.wait_master(test_run, REPLICASET_1, 'storage_1_a') util.wait_master(test_run, REPLICASET_2, 'storage_2_a') util.map_evals(test_run, {REPLICASET_1, REPLICASET_2}, 'bootstrap_storage(\'memtx\')') _ = test_run:cmd("create server router_1 with script='router/router_1.lua'") --- Discovery algorithm changes sometimes and should not affect the +-- Discovery algorithm and failover changes sometimes and should not affect the -- exponential timeout test. -_ = test_run:cmd("start server router_1 with args='discovery_disable'") +_ = test_run:cmd("start server router_1 with " .. \ + "args='discovery_disable failover_disable'") _ = test_run:switch('router_1') util = require('util') diff --git a/test/router/retry_reads.result b/test/router/retry_reads.result index 80834750..e057422f 100644 --- a/test/router/retry_reads.result +++ b/test/router/retry_reads.result @@ -28,9 +28,10 @@ util.map_evals(test_run, {REPLICASET_1, REPLICASET_2}, 'bootstrap_storage(\'memt _ = test_run:cmd("create server router_1 with script='router/router_1.lua'") --- ... --- Discovery algorithm changes sometimes and should not affect the --- read retry decisions. -_ = test_run:cmd("start server router_1 with args='discovery_disable'") +-- Discovery algorithm and failover changes sometimes and should not affect the +-- exponential timeout test. +_ = test_run:cmd("start server router_1 with " .. \ + "args='discovery_disable failover_disable'") --- ... _ = test_run:switch('router_1') @@ -69,7 +70,7 @@ util.collect_timeouts(rs1) - - fail: 0 ok: 0 timeout: 0.5 - - fail: 1 + - fail: 2 ok: 0 timeout: 1 ... diff --git a/test/router/retry_reads.test.lua b/test/router/retry_reads.test.lua index f0a44457..a9554d77 100644 --- a/test/router/retry_reads.test.lua +++ b/test/router/retry_reads.test.lua @@ -10,9 +10,10 @@ util.wait_master(test_run, REPLICASET_1, 'storage_1_a') util.wait_master(test_run, REPLICASET_2, 'storage_2_a') util.map_evals(test_run, {REPLICASET_1, REPLICASET_2}, 'bootstrap_storage(\'memtx\')') _ = test_run:cmd("create server router_1 with script='router/router_1.lua'") --- Discovery algorithm changes sometimes and should not affect the --- read retry decisions. -_ = test_run:cmd("start server router_1 with args='discovery_disable'") +-- Discovery algorithm and failover changes sometimes and should not affect the +-- exponential timeout test. +_ = test_run:cmd("start server router_1 with " .. \ + "args='discovery_disable failover_disable'") _ = test_run:switch('router_1') util = require('util') diff --git a/test/router/router.result b/test/router/router.result index 4389367f..8f678564 100644 --- a/test/router/router.result +++ b/test/router/router.result @@ -1527,7 +1527,8 @@ table.sort(error_messages) ... error_messages --- -- - Use replica:check_is_connected(...) instead of replica.check_is_connected(...) +- - Use replica:call(...) instead of replica.call(...) + - Use replica:check_is_connected(...) instead of replica.check_is_connected(...) - Use replica:detach_conn(...) instead of replica.detach_conn(...) - Use replica:is_connected(...) instead of replica.is_connected(...) - Use replica:safe_uri(...) instead of replica.safe_uri(...) diff --git a/test/router/router_1.lua b/test/router/router_1.lua deleted file mode 120000 index da63b08a..00000000 --- a/test/router/router_1.lua +++ /dev/null @@ -1 +0,0 @@ -../../example/router_1.lua \ No newline at end of file diff --git a/test/router/router_1.lua b/test/router/router_1.lua new file mode 100755 index 00000000..887c43bc --- /dev/null +++ b/test/router/router_1.lua @@ -0,0 +1,35 @@ +#!/usr/bin/env tarantool + +require('strict').on() +fiber = require('fiber') + +-- Check if we are running under test-run +if os.getenv('ADMIN') then + test_run = require('test_run').new() + require('console').listen(os.getenv('ADMIN')) +end + +replicasets = {'cbf06940-0790-498b-948d-042b62cf3d29', + 'ac522f65-aa94-4134-9f64-51ee384f1a54'} + +-- Call a configuration provider +cfg = dofile('localcfg.lua') +if arg[1] == 'discovery_disable' then + cfg.discovery_mode = 'off' +end + +-- Start the database with sharding +vshard = require('vshard') + +if arg[2] == 'failover_disable' then + vshard.router.internal.errinj.ERRINJ_FAILOVER_DELAY = true +end + +vshard.router.cfg(cfg) + +if arg[2] == 'failover_disable' then + while vshard.router.internal.errinj.ERRINJ_FAILOVER_DELAY ~= 'in' do + router.failover_fiber:wakeup() + fiber.sleep(0.01) + end +end diff --git a/vshard/consts.lua b/vshard/consts.lua index 249d4915..f8bedc4b 100644 --- a/vshard/consts.lua +++ b/vshard/consts.lua @@ -49,6 +49,7 @@ return { CALL_TIMEOUT_MAX = 64; FAILOVER_UP_TIMEOUT = 5; FAILOVER_DOWN_TIMEOUT = 1; + FAILOVER_DOWN_SEQUENTIAL_FAIL = 3; DEFAULT_FAILOVER_PING_TIMEOUT = 5; DEFAULT_SYNC_TIMEOUT = 1; RECONNECT_TIMEOUT = 0.5; diff --git a/vshard/replicaset.lua b/vshard/replicaset.lua index 298ad2a5..abdffe67 100644 --- a/vshard/replicaset.lua +++ b/vshard/replicaset.lua @@ -483,8 +483,10 @@ end -- local function replicaset_down_replica_priority(replicaset) local old_replica = replicaset.replica - assert(old_replica and old_replica.down_ts and - not old_replica:is_connected()) + assert(old_replica and ((old_replica.down_ts and + not old_replica:is_connected()) or + old_replica.net_sequential_fail >= + consts.FAILOVER_DOWN_SEQUENTIAL_FAIL)) local new_replica = old_replica.next_by_priority if new_replica then assert(new_replica ~= old_replica) @@ -511,7 +513,8 @@ local function replicaset_up_replica_priority(replicaset) -- Failed to up priority. return end - if replica:is_connected() then + if replica:is_connected() and replica.net_sequential_ok > 0 then + assert(replica.net_sequential_fail == 0) replicaset.replica = replica assert(not old_replica or old_replica.weight >= replicaset.replica.weight) @@ -527,15 +530,12 @@ end -- local function replica_on_failed_request(replica) replica.net_sequential_ok = 0 - local val = replica.net_sequential_fail + 1 - if val >= 2 then + replica.net_sequential_fail = replica.net_sequential_fail + 1 + if replica.net_sequential_fail >= 2 then local new_timeout = replica.net_timeout * 2 if new_timeout <= consts.CALL_TIMEOUT_MAX then replica.net_timeout = new_timeout end - replica.net_sequential_fail = 1 - else - replica.net_sequential_fail = val end end @@ -710,7 +710,7 @@ local function can_retry_after_error(e) e.code == lerror.code.TRANSFER_IS_IN_PROGRESS) then return true end - return e.type == 'ClientError' and e.code == box.error.TIMEOUT + return lerror.is_timeout(e) end -- @@ -749,19 +749,6 @@ local function can_backoff_after_error(e, func) return false end --- --- Pick a next replica according to round-robin load balancing --- policy. --- -local function replicaset_balance_replica(replicaset) - local i = replicaset.balance_i - local pl = replicaset.priority_list - local size = #pl - replicaset.balance_i = i % size + 1 - assert(i <= size) - return pl[i] -end - -- -- Template to implement a function able to visit multiple -- replicas with certain details. One of applications - a function @@ -771,56 +758,91 @@ end -- until failover fiber will repair the nearest connection. -- local function replicaset_template_multicallro(prefer_replica, balance) - local function pick_next_replica(replicaset, now) - local r - local master = replicaset.master + -- + -- Stateful balancing (balance = true) changes the current + -- replicaset.balance_i, consequently balancing is preserved between + -- requests, sequential requests will go to different replicas. + -- + -- Stateless balancing (balance = false) is local to the current call, it + -- doesn't change priority list, every new request is firstly made to the + -- most preferred replica. If request fails with TimeOut error, it's + -- retried with the next replica by priority in scope of the same call. + -- + local function replicaset_balance_replica(replicaset, state) + local i + local pl = replicaset.priority_list + local size = #pl if balance then - local i = #replicaset.priority_list - while i > 0 do - r = replicaset_balance_replica(replicaset) - i = i - 1 - if r:is_connected() and (not prefer_replica or r ~= master) and - replica_check_backoff(r, now) then - return r - end - end + i = replicaset.balance_i + replicaset.balance_i = i % size + 1 else - local start_r = replicaset.replica - r = start_r - while r do - if r:is_connected() and (not prefer_replica or r ~= master) and - replica_check_backoff(r, now) then - return r - end - r = r.next_by_priority - end - -- Iteration above could start not from the best prio replica. - -- Check the beginning of the list too. - for _, r in ipairs(replicaset.priority_list) do - if r == start_r then - -- Reached already checked part. - break - end - if r:is_connected() and (not prefer_replica or r ~= master) and - replica_check_backoff(r, now) then - return r - end + i = state.balance_i + state.balance_i = state.balance_i % size + 1 + end + assert(i <= size) + return pl[i] + end + + -- + -- Pick a next replica according to round-robin load balancing policy. + -- + local function pick_next_replica(replicaset, now, state) + local r + local master = replicaset.master + local i = #replicaset.priority_list + if prefer_replica and state.balance_checked_num >= i then + -- callre should fallback to master if it tried to access all + -- replicas and didn't succeed (connections are broken, replicas + -- are in backoff (e.g. due to insuficient privileges) or + -- requests failed with timed out error (request_timeout). + goto fallback_to_master + end + while i > 0 do + i = i - 1 + state.balance_checked_num = state.balance_checked_num + 1 + r = replicaset_balance_replica(replicaset, state) + if r:is_connected() and (not prefer_replica or r ~= master) and + replica_check_backoff(r, now) then + return r end end + +::fallback_to_master:: + state.balance_checked_num = 0 + return nil end return function(replicaset, func, args, opts) assert(opts == nil or type(opts) == 'table') opts = opts and table.copy(opts) or {} local timeout = opts.timeout or consts.CALL_TIMEOUT_MAX + local request_timeout = opts.request_timeout or timeout + assert(request_timeout <= timeout) + opts.request_timeout = nil local net_status, storage_status, retval, err, replica - if timeout <= 0 then + if timeout <= 0 or request_timeout <= 0 then return nil, lerror.timeout() end + local state = { + -- Number of replicas, we tried to access. See pick_next_replica() + -- for details. Used only when prefer_replica = true. + balance_checked_num = 0, + -- Stateless balancer index. See replicaset_balance_replica() for + -- details. Used only when balance = true. + balance_i = 1, + } + if not balance and replicaset.replica then + -- Initialize stateless balancer with replicaset.replica's index. + replica = replicaset.priority_list[state.balance_i] + while replica ~= replicaset.replica do + state.balance_i = state.balance_i + 1 + replica = replicaset.priority_list[state.balance_i] + end + end local now = fiber_clock() local end_time = now + timeout while not net_status and timeout > 0 do - replica = pick_next_replica(replicaset, now) + replica = pick_next_replica(replicaset, now, state) if not replica then replica, timeout = replicaset_wait_master(replicaset, timeout) if not replica then @@ -833,11 +855,25 @@ local function replicaset_template_multicallro(prefer_replica, balance) replica.backoff_err) end end - opts.timeout = timeout + opts.timeout = request_timeout net_status, storage_status, retval, err = replica_call(replica, func, args, opts) now = fiber_clock() timeout = end_time - now + if now + request_timeout > end_time then + -- The `timeout` option sets a strict limit for the entire + -- operation, which must not be exceeded. To ensure this, we + -- make as much requests as we can with the speciified + -- `request_timeout`. However, the last request will use all + -- the remaining time left within the overall `timeout`, + -- rather than the `request_timeout` value. + -- + -- For example, if `request_timeout` is 1 second and `timeout` + -- is 2.5 seconds, the first two requests to the replicaset + -- will use a 1-second timeout, but the last request will + -- have only 0.5 seconds remaining. + request_timeout = timeout + end if not net_status and not storage_status and not can_retry_after_error(retval) then if can_backoff_after_error(retval, func) then @@ -1232,6 +1268,7 @@ local replica_mt = { return util.uri_format(uri) end, detach_conn = replica_detach_conn, + call = replica_call, }, __tostring = function(replica) if replica.name then diff --git a/vshard/router/init.lua b/vshard/router/init.lua index 05a08422..cbd978ef 100644 --- a/vshard/router/init.lua +++ b/vshard/router/init.lua @@ -44,6 +44,7 @@ if not M then ERRINJ_CFG = false, ERRINJ_CFG_DELAY = false, ERRINJ_FAILOVER_CHANGE_CFG = false, + ERRINJ_FAILOVER_DELAY = false, ERRINJ_RELOAD = false, ERRINJ_LONG_DISCOVERY = false, ERRINJ_MASTER_SEARCH_DELAY = false, @@ -585,8 +586,9 @@ local function router_call_impl(router, bucket_id, mode, prefer_replica, balance, func, args, opts) local do_return_raw if opts then - if type(opts) ~= 'table' or - (opts.timeout and type(opts.timeout) ~= 'number') then + if type(opts) ~= 'table' or (opts.timeout and + type(opts.timeout) ~= 'number') or (opts.request_timeout and + type(opts.request_timeout) ~= 'number') then error('Usage: call(bucket_id, mode, func, args, opts)') end opts = table.copy(opts) @@ -596,6 +598,9 @@ local function router_call_impl(router, bucket_id, mode, prefer_replica, do_return_raw = false end local timeout = opts.timeout or consts.CALL_TIMEOUT_MIN + if opts.request_timeout and opts.request_timeout > timeout then + error('request_timeout must be <= timeout') + end local replicaset, err local tend = fiber_clock() + timeout if bucket_id > router.total_bucket_count or bucket_id <= 0 then @@ -969,41 +974,26 @@ end -- Failover -------------------------------------------------------------------------------- -local function failover_ping_round(router) - for _, replicaset in pairs(router.replicasets) do - local replica = replicaset.replica - if replica ~= nil and replica.conn ~= nil and - replica.down_ts == nil then - if not replica.conn:ping({timeout = - router.failover_ping_timeout}) then - log.info('Ping error from %s: perhaps a connection is down', - replica) - -- Connection hangs. Recreate it to be able to - -- fail over to a replica next by priority. The - -- old connection is not closed in case if it just - -- processes too big response at this moment. Any - -- way it will be eventually garbage collected - -- and closed. - replica:detach_conn() - replicaset:connect_replica(replica) - end - end - end -end - -- -- Replicaset must fall its replica connection to lower priority, -- if the current one is down too long. -- local function failover_need_down_priority(replicaset, curr_ts) local r = replicaset.replica + if not r or not r.next_by_priority then + return false + end -- down_ts not nil does not mean that the replica is not -- connected. Probably it is connected and now fetches schema, -- or does authorization. Either case, it is healthy, no need -- to down the prio. - return r and r.down_ts and not r:is_connected() and - curr_ts - r.down_ts >= consts.FAILOVER_DOWN_TIMEOUT - and r.next_by_priority + local is_down_ts = r.down_ts and not r:is_connected() and + curr_ts - r.down_ts >= consts.FAILOVER_DOWN_TIMEOUT + -- If we failed several sequential requests to replica, then something + -- is wrong with it. Temporary lower its priority. + local is_sequential_fails = + r.net_sequential_fail >= consts.FAILOVER_DOWN_SEQUENTIAL_FAIL + return is_down_ts or is_sequential_fails end -- @@ -1031,6 +1021,49 @@ local function failover_collect_to_update(router) return id_to_update end +local function failover_ping(replica, opts) + return replica:call('vshard.storage._call', {'info'}, opts) +end + +local function failover_ping_round(router, curr_ts) + local opts = {timeout = router.failover_ping_timeout} + for _, replicaset in pairs(router.replicasets) do + local replica = replicaset.replica + if failover_need_up_priority(replicaset, curr_ts) then + -- When its time to increase priority in replicaset, all instances, + -- priority of which are higher than the current one,are pinged so + -- that we know, which connections are working properly. + for _, r in pairs(replicaset.priority_list) do + if r == replica then + break + end + if r.conn ~= nil and r.down_ts == nil then + -- We don't need return values, r.net_sequential_ok is + -- incremented on succcessful request. + failover_ping(r, opts) + end + end + end + if replica ~= nil and replica.conn ~= nil and + replica.down_ts == nil then + local net_status, _, err = failover_ping(replica, opts) + if not net_status then + log.info('Ping error from %s: perhaps a connection is down: %s', + replica, err) + -- Connection hangs. Recreate it to be able to + -- fail over to a replica next by priority. The + -- old connection is not closed in case if it just + -- processes too big response at this moment. Any + -- way it will be eventually garbage collected + -- and closed. + replica:detach_conn() + replicaset:connect_replica(replica) + end + end + end +end + + -- -- Detect not optimal or disconnected replicas. For not optimal -- try to update them to optimal, and down priority of @@ -1038,12 +1071,12 @@ end -- @retval true A replica of an replicaset has been changed. -- local function failover_step(router) - failover_ping_round(router) + local curr_ts = fiber_clock() + failover_ping_round(router, curr_ts) local id_to_update = failover_collect_to_update(router) if #id_to_update == 0 then return false end - local curr_ts = fiber_clock() local replica_is_changed = false for _, id in pairs(id_to_update) do local rs = router.replicasets[id] @@ -1101,6 +1134,12 @@ local function failover_service_f(router, service) -- each min_timeout seconds. local prev_was_ok = false while module_version == M.module_version do + if M.errinj.ERRINJ_FAILOVER_DELAY then + M.errinj.ERRINJ_FAILOVER_DELAY = 'in' + repeat + lfiber.sleep(0.001) + until not M.errinj.ERRINJ_FAILOVER_DELAY + end service:next_iter() service:set_activity('updating replicas') local ok, replica_is_changed = pcall(failover_step, router)