From 53ce67c0afd04e8181d32e6e9e8c760b440a0dfc Mon Sep 17 00:00:00 2001 From: Martin Sumner Date: Mon, 22 Jun 2020 17:06:21 +0100 Subject: [PATCH 01/30] Update Tictac AAE tag --- rebar.config | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rebar.config b/rebar.config index 834cab548f..1dceac28ef 100644 --- a/rebar.config +++ b/rebar.config @@ -37,7 +37,7 @@ {riak_dt, ".*", {git, "git://github.com/basho/riak_dt.git", {tag, "riak_kv-2.9.1"}}}, {eunit_formatters, ".*", {git, "git://github.com/seancribbs/eunit_formatters", {tag, "0.1.2"}}}, {leveled, ".*", {git, "https://github.com/martinsumner/leveled.git", {tag, "0.9.21"}}}, - {kv_index_tictactree, ".*", {git, "https://github.com/martinsumner/kv_index_tictactree.git", {branch, "develop-2.9"}}}, + {kv_index_tictactree, ".*", {git, "https://github.com/martinsumner/kv_index_tictactree.git", {tag, "0.9.13"}}}, {riak_core, ".*", {git, "https://github.com/basho/riak_core.git", {tag, "riak_kv-2.9.2"}}}, {riak_api, ".*", {git, "git://github.com/basho/riak_api.git", {tag, "riak_kv-2.9.2"}}}, {hyper, ".*", {git, "git://github.com/basho/hyper", {tag, "1.0.1"}}}, From 8314392a7c605e5fa3459c89087132dc68806800 Mon Sep 17 00:00:00 2001 From: Martin Sumner Date: Fri, 3 Jul 2020 12:23:47 +0100 Subject: [PATCH 02/30] Bump kv_index_tictactree --- rebar.config | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rebar.config b/rebar.config index 1dceac28ef..a50d503f57 100644 --- a/rebar.config +++ b/rebar.config @@ -37,7 +37,7 @@ {riak_dt, ".*", {git, "git://github.com/basho/riak_dt.git", {tag, "riak_kv-2.9.1"}}}, {eunit_formatters, ".*", {git, "git://github.com/seancribbs/eunit_formatters", {tag, "0.1.2"}}}, {leveled, ".*", {git, "https://github.com/martinsumner/leveled.git", {tag, "0.9.21"}}}, - {kv_index_tictactree, ".*", {git, "https://github.com/martinsumner/kv_index_tictactree.git", {tag, "0.9.13"}}}, + {kv_index_tictactree, ".*", {git, "https://github.com/martinsumner/kv_index_tictactree.git", {tag, "0.9.14"}}}, {riak_core, ".*", {git, "https://github.com/basho/riak_core.git", {tag, "riak_kv-2.9.2"}}}, {riak_api, ".*", {git, "git://github.com/basho/riak_api.git", {tag, "riak_kv-2.9.2"}}}, {hyper, ".*", {git, "git://github.com/basho/hyper", {tag, "1.0.1"}}}, From b005659eab4f3b1d086ef0f33885b82b988cf106 Mon Sep 17 00:00:00 2001 From: Martin Sumner Date: Tue, 7 Jul 2020 15:58:42 +0100 Subject: [PATCH 03/30] Add exchange skipping to tictacaae Skip an exchange if the last exchange is not yet complete. Improve logging --- rebar.config | 2 +- src/riak_kv_vnode.erl | 116 +++++++++++++++++++++++++++++------------- 2 files changed, 82 insertions(+), 36 deletions(-) diff --git a/rebar.config b/rebar.config index a50d503f57..dfdb70544c 100644 --- a/rebar.config +++ b/rebar.config @@ -37,7 +37,7 @@ {riak_dt, ".*", {git, "git://github.com/basho/riak_dt.git", {tag, "riak_kv-2.9.1"}}}, {eunit_formatters, ".*", {git, "git://github.com/seancribbs/eunit_formatters", {tag, "0.1.2"}}}, {leveled, ".*", {git, "https://github.com/martinsumner/leveled.git", {tag, "0.9.21"}}}, - {kv_index_tictactree, ".*", {git, "https://github.com/martinsumner/kv_index_tictactree.git", {tag, "0.9.14"}}}, + {kv_index_tictactree, ".*", {git, "https://github.com/martinsumner/kv_index_tictactree.git", {branch, "mas-i1765-asynchandling"}}}, {riak_core, ".*", {git, "https://github.com/basho/riak_core.git", {tag, "riak_kv-2.9.2"}}}, {riak_api, ".*", {git, "git://github.com/basho/riak_api.git", {tag, "riak_kv-2.9.2"}}}, {hyper, ".*", {git, "git://github.com/basho/hyper", {tag, "1.0.1"}}}, diff --git a/src/riak_kv_vnode.erl b/src/riak_kv_vnode.erl index 76ce8a8438..cc0408c037 100644 --- a/src/riak_kv_vnode.erl +++ b/src/riak_kv_vnode.erl @@ -149,8 +149,10 @@ :: list(riak_kv_entropy_manager:exchange()), tictac_exchangecount = 0 :: integer(), tictac_deltacount = 0 :: integer(), + tictac_exchangetime = 0 :: integer(), tictac_startqueue = os:timestamp() :: erlang:timestamp(), tictac_rebuilding = false :: erlang:timestamp()|false, + tictac_skiptick = 0 :: non_neg_integer(), worker_pool_strategy = single :: none|single|dscp, vnode_pool_pid :: undefined|pid(), update_hook :: update_hook(), @@ -205,6 +207,7 @@ -define(MAX_AAE_QUEUE_TIME, 1000). %% Queue time in ms to prompt a sync ping. +-define(AAE_SKIP_COUNT, 10). -define(AF1_QUEUE, riak_core_node_worker_pool:af1()). %% Assured Forwarding - pool 1 @@ -376,16 +379,18 @@ preflistfun(Bucket, Key) -> riak_kv_util:get_index_n({Bucket, Key}). %% Function to be passed to return a response once an operation is complete tictac_returnfun(Partition, exchange) -> Vnode = {Partition, node()}, + StartTime = os:timestamp(), ReturnFun = fun(ExchangeResult) -> - ok = tictacexchange_complete(Vnode, ExchangeResult) + ok = tictacexchange_complete(Vnode, StartTime, ExchangeResult) end, ReturnFun; tictac_returnfun(Partition, RebuildType) -> Vnode = {Partition, node()}, + StartTime = os:timestamp(), ReturnFun = fun(ok) -> - ok = tictacrebuild_complete(Vnode, RebuildType) + ok = tictacrebuild_complete(Vnode, StartTime, RebuildType) end, ReturnFun. @@ -472,21 +477,28 @@ aae_send(Preflist) -> riak_kv_vnode_master) end. --spec tictacrebuild_complete({partition(), node()}, store|trees) -> ok. +-spec tictacrebuild_complete({partition(), node()}, + erlang:timestamp(), + store|trees) -> ok. %% @doc %% Inform the vnode that an aae rebuild is complete -tictacrebuild_complete(Vnode, ProcessType) -> +tictacrebuild_complete(Vnode, StartTime, ProcessType) -> riak_core_vnode_master:command(Vnode, - {rebuild_complete, ProcessType}, + {rebuild_complete, + ProcessType, + StartTime}, riak_kv_vnode_master). --spec tictacexchange_complete({partition(), node()}, +-spec tictacexchange_complete({partition(), node()}, + erlang:timestamp(), {atom(), non_neg_integer()}) -> ok. %% @doc %% Infor the vnode that an aae exchange is complete -tictacexchange_complete(Vnode, ExchangeResult) -> +tictacexchange_complete(Vnode, StartTime, ExchangeResult) -> riak_core_vnode_master:command(Vnode, - {exchange_complete, ExchangeResult}, + {exchange_complete, + ExchangeResult, + StartTime}, riak_kv_vnode_master). get(Preflist, BKey, ReqId) -> @@ -1013,23 +1025,34 @@ handle_command({fold_indexes, FoldIndexFun, Acc}, Sender, State=#state{mod=Mod, end; -handle_command({rebuild_complete, store}, _Sender, State) -> +handle_command({rebuild_complete, store, ST}, _Sender, State) -> %% If store rebuild complete - then need to rebuild trees AAECntrl = State#state.aae_controller, Partition = State#state.idx, Preflists = riak_kv_util:responsible_preflists(Partition), + lager:info("AAE pid=~w partition=~w rebuild store complete " ++ + "in duration=~w seconds", + [AAECntrl, + Partition, + timer:now_diff(os:timestamp(), ST) div (1000 * 1000)]), aae_controller:aae_rebuildtrees(AAECntrl, Preflists, fun preflistfun/2, rebuildtrees_workerfun(Partition, State), false), - lager:info("AAE Controller rebuild trees started with pid ~w", [AAECntrl]), + lager:info("AAE pid=~w rebuild trees started", [AAECntrl]), {noreply, State}; -handle_command({rebuild_complete, trees}, _Sender, State) -> +handle_command({rebuild_complete, trees, ST}, _Sender, State) -> % Rebuilding the trees now complete, so change the status of the % rebuilding state so other rebuilds may be prompted + AAECntrl = State#state.aae_controller, Partition = State#state.idx, + lager:info("AAE pid=~w partition=~w rebuild trees complete " ++ + "in duration=~w seconds", + [AAECntrl, + Partition, + timer:now_diff(os:timestamp(), ST) div (1000 * 1000)]), case State#state.tictac_rebuilding of false -> lager:warning("Rebuild complete for Partition=~w but not expected", @@ -1043,11 +1066,18 @@ handle_command({rebuild_complete, trees}, _Sender, State) -> {noreply, State#state{tictac_rebuilding = false}} end; -handle_command({exchange_complete, {_EndState, DeltaCount}}, _Sender, State) -> +handle_command({exchange_complete, {_EndState, DeltaCount}, ST}, + _Sender, State) -> %% Record how many deltas were seen in the exchange + %% Revert the skip_count to 0 so that exchanges can be made at the next + %% prompt. XC = State#state.tictac_exchangecount + 1, DC = State#state.tictac_deltacount + DeltaCount, - {noreply, State#state{tictac_exchangecount = XC, tictac_deltacount = DC}}; + XT = State#state.tictac_exchangetime + timer:now_diff(os:timestamp(), ST), + {noreply, State#state{tictac_exchangecount = XC, + tictac_deltacount = DC, + tictac_exchangetime = XT, + tictac_skiptick = 0}}; handle_command({upgrade_hashtree, Node}, _, State=#state{hashtrees=HT}) -> %% Make sure we dont kick off an upgrade during a possible handoff @@ -1084,8 +1114,8 @@ handle_command(tictacaae_exchangepoke, _Sender, State) -> XTick = app_helper:get_env(riak_kv, tictacaae_exchangetick), riak_core_vnode:send_command_after(XTick, tictacaae_exchangepoke), Idx = State#state.idx, - case State#state.tictac_exchangequeue of - [] -> + case {State#state.tictac_exchangequeue, State#state.tictac_skiptick} of + {[], _} -> {ok, Ring} = riak_core_ring_manager:get_my_ring(), Exchanges = @@ -1103,17 +1133,20 @@ handle_command(tictacaae_exchangepoke, _Sender, State) -> ++ "exchanges expected=~w " ++ "exchanges completed=~w " ++ "total deltas=~w " + ++ "total exchange_time=~w seconds" ++ "loop duration=~w seconds (elapsed)", [Idx, length(Exchanges), State#state.tictac_exchangecount, State#state.tictac_deltacount, + State#state.tictac_exchangetime div (1000 * 1000), LoopDuration div (1000 * 1000)]), {noreply, State#state{tictac_exchangequeue = Exchanges, tictac_exchangecount = 0, tictac_deltacount = 0, + tictac_exchangetime = 0, tictac_startqueue = Now}}; - [{Local, Remote, {DocIdx, N}}|Rest] -> + {[{Local, Remote, {DocIdx, N}}|Rest], 0} -> PrimaryOnly = app_helper:get_env(riak_kv, tictacaae_primaryonly, true), % By default TictacAAE exchanges are run only between primary @@ -1130,32 +1163,45 @@ handle_command(tictacaae_exchangepoke, _Sender, State) -> false -> riak_core_apl:get_apl(PlLup, N, riak_kv) end, - case {lists:keyfind(Local, 1, PL), lists:keyfind(Remote, 1, PL)} of - {{Local, LN}, {Remote, RN}} -> - IndexN = {DocIdx, N}, - BlueList = - [{riak_kv_vnode:aae_send({Local, LN}), [IndexN]}], - PinkList = - [{riak_kv_vnode:aae_send({Remote, RN}), [IndexN]}], - RepairFun = - riak_kv_get_fsm:prompt_readrepair([{Local, LN}, - {Remote, RN}]), - ReplyFun = tictac_returnfun(Idx, exchange), - aae_exchange:start(BlueList, - PinkList, - RepairFun, - ReplyFun); + SkipCount = + case {lists:keyfind(Local, 1, PL), + lists:keyfind(Remote, 1, PL)} of + {{Local, LN}, {Remote, RN}} -> + IndexN = {DocIdx, N}, + BlueList = + [{riak_kv_vnode:aae_send({Local, LN}), [IndexN]}], + PinkList = + [{riak_kv_vnode:aae_send({Remote, RN}), [IndexN]}], + RepairFun = + riak_kv_get_fsm:prompt_readrepair([{Local, LN}, + {Remote, RN}]), + ReplyFun = tictac_returnfun(Idx, exchange), + ScanTimeout = ?AAE_SKIP_COUNT * XTick, + aae_exchange:start(full, + BlueList, + PinkList, + RepairFun, + ReplyFun, + none, + [{scan_timeout, ScanTimeout}]), + ?AAE_SKIP_COUNT; _ -> lager:warning("Proposed exchange between ~w and ~w " ++ "not currently supported within " ++ - "preflist for IndexN=~w possibly due to " ++ - "node failure", - [Local, Remote, {DocIdx, N}]) + "preflist for IndexN=~w possibly " ++ + "due to node failure", + [Local, Remote, {DocIdx, N}]), + 0 end, ok = aae_controller:aae_ping(State#state.aae_controller, os:timestamp(), self()), - {noreply, State#state{tictac_exchangequeue = Rest}} + {noreply, State#state{tictac_exchangequeue = Rest, + tictac_skiptick = SkipCount}}; + {_, SkipCount} -> + lager:warning("Skipping a tick due to non_zero " ++ + "skip_count=~w", [SkipCount]), + {noreply, State#state{tictac_skiptick = SkipCount - 1}} end; handle_command(tictacaae_rebuildpoke, Sender, State) -> From 4ad1d05327fd02e3c6ebe853cddf45022c1996c3 Mon Sep 17 00:00:00 2001 From: Martin Sumner Date: Tue, 7 Jul 2020 20:59:21 +0100 Subject: [PATCH 04/30] Correct spacing in log --- src/riak_kv_vnode.erl | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/riak_kv_vnode.erl b/src/riak_kv_vnode.erl index cc0408c037..049487e165 100644 --- a/src/riak_kv_vnode.erl +++ b/src/riak_kv_vnode.erl @@ -347,7 +347,9 @@ maybe_start_aaecontroller(active, State=#state{mod=Mod, end, InitD = erlang:phash2(Partition, 256), - % Space out the initial poke to avoid over-coordination between vnodes + % Space out the initial poke to avoid over-coordination between vnodes, + % each of up to 256 vnodes will end on a different point in the slot, with + % the points wrpaping every 256 vnodes (assuming coordinated restart) FirstRebuildDelay = (RTick div 256) * InitD, FirstExchangeDelay = (XTick div 256) * InitD, riak_core_vnode:send_command_after(FirstRebuildDelay, @@ -1133,7 +1135,7 @@ handle_command(tictacaae_exchangepoke, _Sender, State) -> ++ "exchanges expected=~w " ++ "exchanges completed=~w " ++ "total deltas=~w " - ++ "total exchange_time=~w seconds" + ++ "total exchange_time=~w seconds " ++ "loop duration=~w seconds (elapsed)", [Idx, length(Exchanges), From acc9663c0c9f76e65933f3e036cccfaffa585f82 Mon Sep 17 00:00:00 2001 From: Martin Sumner Date: Wed, 8 Jul 2020 11:08:20 +0100 Subject: [PATCH 05/30] Don't AAE exchange immediately on restart Skip ticks depending on partition ID to stagger restart of AAE. --- src/riak_kv_vnode.erl | 20 ++++++++++++++++++-- 1 file changed, 18 insertions(+), 2 deletions(-) diff --git a/src/riak_kv_vnode.erl b/src/riak_kv_vnode.erl index 049487e165..6650d25dfb 100644 --- a/src/riak_kv_vnode.erl +++ b/src/riak_kv_vnode.erl @@ -320,6 +320,9 @@ maybe_start_aaecontroller(active, State=#state{mod=Mod, XTick = app_helper:get_env(riak_kv, tictacaae_exchangetick), RTick = app_helper:get_env(riak_kv, tictacaae_rebuildtick), + StepInitialTick = + app_helper:get_env(riak_kv, tictacaae_stepinitialtick, true), + StoreHead = app_helper:get_env(riak_kv, tictacaae_storeheads), ObjSplitFun = riak_object:aae_from_object_binary(StoreHead), @@ -349,18 +352,31 @@ maybe_start_aaecontroller(active, State=#state{mod=Mod, InitD = erlang:phash2(Partition, 256), % Space out the initial poke to avoid over-coordination between vnodes, % each of up to 256 vnodes will end on a different point in the slot, with - % the points wrpaping every 256 vnodes (assuming coordinated restart) + % the points wrpaping every 256 vnodes (assuming coordinated restart) FirstRebuildDelay = (RTick div 256) * InitD, FirstExchangeDelay = (XTick div 256) * InitD, riak_core_vnode:send_command_after(FirstRebuildDelay, tictacaae_rebuildpoke), riak_core_vnode:send_command_after(FirstExchangeDelay, tictacaae_exchangepoke), + + InitalStep = + case StepInitialTick of + true -> + % Stops each vnode from re-filling the AAE work queue at the + % same time, creating a pause in AAE across the cluster if all + % nodes in the cluster were started concurrently + erlang:phash2(Partition, 8); + false -> + % During riak_test we set this to false + 0 + end, State#state{tictac_aae = true, aae_controller = AAECntrl, modstate = ModState, - tictac_rebuilding = Rebuilding}. + tictac_rebuilding = Rebuilding, + tictac_skiptick = InitalStep}. -spec determine_aaedata_root(integer()) -> list(). From a60e0e663ff3d1f24abdc1cf0d515414aae6e37b Mon Sep 17 00:00:00 2001 From: Martin Sumner Date: Wed, 8 Jul 2020 12:47:28 +0100 Subject: [PATCH 06/30] Queue rebuild_trees without taking snapshot Only take the snapshot necessary to rebuild the trees, when the rebuild request is worked on the queue --- src/riak_kv_vnode.erl | 102 +++++++++++++++++++++--------------------- 1 file changed, 50 insertions(+), 52 deletions(-) diff --git a/src/riak_kv_vnode.erl b/src/riak_kv_vnode.erl index 6650d25dfb..a391516a2f 100644 --- a/src/riak_kv_vnode.erl +++ b/src/riak_kv_vnode.erl @@ -333,21 +333,9 @@ maybe_start_aaecontroller(active, State=#state{mod=Mod, Preflists, RootPath, ObjSplitFun), - R = aae_controller:aae_rebuildtrees(AAECntrl, - Preflists, - fun preflistfun/2, - rebuildtrees_workerfun(Partition, - State), - true), - lager:info("AAE Controller started with pid ~w and rebuild state ~w", - [AAECntrl, R]), - Rebuilding = - case R of - ok -> - os:timestamp(); - skipped -> - false - end, + queue_tictactreerebuild(AAECntrl, Partition, true, State), + lager:info("AAE Controller started with pid=~w", [AAECntrl]), + Rebuilding = os:timestamp(), InitD = erlang:phash2(Partition, 256), % Space out the initial poke to avoid over-coordination between vnodes, @@ -423,26 +411,48 @@ tictac_rebuild(B, K, V) -> Clock = element(1, riak_object:summary_from_binary(V)), {IndexN, Clock}. --spec rebuildtrees_workerfun(partition(), state()) -> fun(). %% @doc -%% A wrapper around the node_worker_pool to provide workers for tictac aae -%% folds. As the pool is a named pool started with riak_core, should be safe -%% to used in init of inidivdual vnodes. -rebuildtrees_workerfun(Partition, State) -> - fun(FoldFun, FinishFun) -> - Sender = self(), - ReturnFun = tictac_returnfun(Partition, trees), - FinishPlusReturnFun = - fun(WorkOutput) -> - FinishFun(WorkOutput), - ReturnFun(ok) +%% Queue a tictac tree rebuild. There are occasions when all vnodes queue this +%% at the same time, so important that the snapshot for the rebuild is taken +%% only when the fold is initiated. Otherwise the snapshot may expire whilst +%% sat on the queue +-spec queue_tictactreerebuild(pid(), partition(), boolean(), state()) -> ok. +queue_tictactreerebuild(AAECntrl, Partition, OnlyIfBroken, State) -> + Preflists = riak_kv_util:responsible_preflists(Partition), + Sender = self(), + ReturnFun = tictac_returnfun(Partition, trees), + FoldFun = + fun() -> + lager:info("Starting tree rebuild for partition=~w", [Partition]), + SW = os:timestamp(), + case aae_controller:aae_rebuildtrees(AAECntrl, + Preflists, + fun preflistfun/2, + OnlyIfBroken) of + {ok, StoreFold, FinishFun} -> + Output = StoreFold(), + FinishFun(Output), + Duration = + timer:now_diff(os:timestamp(), SW) div (1000 * 1000), + lager:info("Tree rebuild complete for partition=~w" ++ + " in duration=~w seconds", + [Partition, Duration]); + skipped -> + lager:info("tree rebuild skipped for partition=~w", + [Partition]) end, - Pool = select_queue(be_pool, State), - riak_core_vnode:queue_work(Pool, - {fold, FoldFun, FinishPlusReturnFun}, - Sender, - State#state.vnode_pool_pid) - end. + ok + end, + JustReturnFun = + fun(ok) -> + ReturnFun(ok) + end, + Pool = select_queue(be_pool, State), + riak_core_vnode:queue_work(Pool, + {fold, FoldFun, JustReturnFun}, + Sender, + State#state.vnode_pool_pid). + %% @doc Reveal the underlying module state for testing -spec(get_modstate(state()) -> {atom(), state()}). @@ -1047,40 +1057,28 @@ handle_command({rebuild_complete, store, ST}, _Sender, State) -> %% If store rebuild complete - then need to rebuild trees AAECntrl = State#state.aae_controller, Partition = State#state.idx, - Preflists = riak_kv_util:responsible_preflists(Partition), lager:info("AAE pid=~w partition=~w rebuild store complete " ++ "in duration=~w seconds", [AAECntrl, Partition, timer:now_diff(os:timestamp(), ST) div (1000 * 1000)]), - aae_controller:aae_rebuildtrees(AAECntrl, - Preflists, - fun preflistfun/2, - rebuildtrees_workerfun(Partition, State), - false), - lager:info("AAE pid=~w rebuild trees started", [AAECntrl]), + queue_tictactreerebuild(AAECntrl, Partition, false, State), + lager:info("AAE pid=~w rebuild trees queued", [AAECntrl]), {noreply, State}; -handle_command({rebuild_complete, trees, ST}, _Sender, State) -> +handle_command({rebuild_complete, trees, _ST}, _Sender, State) -> % Rebuilding the trees now complete, so change the status of the % rebuilding state so other rebuilds may be prompted - AAECntrl = State#state.aae_controller, Partition = State#state.idx, - lager:info("AAE pid=~w partition=~w rebuild trees complete " ++ - "in duration=~w seconds", - [AAECntrl, - Partition, - timer:now_diff(os:timestamp(), ST) div (1000 * 1000)]), case State#state.tictac_rebuilding of false -> lager:warning("Rebuild complete for Partition=~w but not expected", - [Partition]), + [Partition]), {noreply, State}; TS -> - RebuildTime = timer:now_diff(os:timestamp(), TS) / (1000 * 1000), - lager:info("Rebuild complete for " ++ - "Partition=~w in duration=~w seconds", - [Partition, RebuildTime]), + ProcessTime = timer:now_diff(os:timestamp(), TS) div (1000 * 1000), + lager:info("Rebuild process for partition=~w complete in " ++ + "duration=~w seconds", [Partition, ProcessTime]), {noreply, State#state{tictac_rebuilding = false}} end; From 014bbdbba5ffc8edd1c1957359258b801c5da8f0 Mon Sep 17 00:00:00 2001 From: Martin Sumner Date: Wed, 8 Jul 2020 21:46:44 +0100 Subject: [PATCH 07/30] Add logs --- src/riak_kv_vnode.erl | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/riak_kv_vnode.erl b/src/riak_kv_vnode.erl index a391516a2f..33e0afe0f8 100644 --- a/src/riak_kv_vnode.erl +++ b/src/riak_kv_vnode.erl @@ -438,7 +438,7 @@ queue_tictactreerebuild(AAECntrl, Partition, OnlyIfBroken, State) -> " in duration=~w seconds", [Partition, Duration]); skipped -> - lager:info("tree rebuild skipped for partition=~w", + lager:info("Tree rebuild skipped for partition=~w", [Partition]) end, ok @@ -1250,6 +1250,8 @@ handle_command(tictacaae_rebuildpoke, Sender, State) -> {noreply, State}; {true, false} -> % Next Rebuild Time is in the past - prompt a rebuild + lager:info("Prompting tictac_aae rebuild for controller=~w", + [State#state.aae_controller]), ReturnFun = tictac_returnfun(State#state.idx, store), State0 = State#state{tictac_rebuilding = os:timestamp()}, case aae_controller:aae_rebuildstore(State#state.aae_controller, From 6c2f4d30068b23ba3c3450483eeaec27ae867872 Mon Sep 17 00:00:00 2001 From: Martin Sumner Date: Wed, 15 Jul 2020 18:37:10 +0100 Subject: [PATCH 08/30] Protect from div 0 errors --- src/riak_kv_ttaaefs_manager.erl | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/riak_kv_ttaaefs_manager.erl b/src/riak_kv_ttaaefs_manager.erl index 1e324e2528..b934f49a06 100644 --- a/src/riak_kv_ttaaefs_manager.erl +++ b/src/riak_kv_ttaaefs_manager.erl @@ -775,8 +775,8 @@ take_next_workitem([], Wants, ScheduleStartTime, SlotInfo, SliceCount) -> take_next_workitem([NextAlloc|T], Wants, ScheduleStartTime, SlotInfo, SliceCount) -> {NodeNumber, NodeCount} = SlotInfo, - SliceSeconds = ?SECONDS_IN_DAY div SliceCount, - SlotSeconds = (NodeNumber - 1) * (SliceSeconds div NodeCount), + SliceSeconds = ?SECONDS_IN_DAY div max(SliceCount, 1), + SlotSeconds = (NodeNumber - 1) * (SliceSeconds div max(NodeCount, 1)), {SliceNumber, NextAction} = NextAlloc, {Mega, Sec, _Micro} = ScheduleStartTime, ScheduleSeconds = From 057ee4c0e4e4a2dc0cad9a95e85c8a2587ae3202 Mon Sep 17 00:00:00 2001 From: Martin Sumner Date: Wed, 22 Jul 2020 00:38:33 +0100 Subject: [PATCH 09/30] Wait for loading to complete --- src/riak_kv_vnode.erl | 23 +++++++++++++++++++---- 1 file changed, 19 insertions(+), 4 deletions(-) diff --git a/src/riak_kv_vnode.erl b/src/riak_kv_vnode.erl index 33e0afe0f8..63c7d3c87e 100644 --- a/src/riak_kv_vnode.erl +++ b/src/riak_kv_vnode.erl @@ -425,10 +425,10 @@ queue_tictactreerebuild(AAECntrl, Partition, OnlyIfBroken, State) -> fun() -> lager:info("Starting tree rebuild for partition=~w", [Partition]), SW = os:timestamp(), - case aae_controller:aae_rebuildtrees(AAECntrl, - Preflists, - fun preflistfun/2, - OnlyIfBroken) of + case when_loading_complete(AAECntrl, + Preflists, + fun preflistfun/2, + OnlyIfBroken) of {ok, StoreFold, FinishFun} -> Output = StoreFold(), FinishFun(Output), @@ -453,6 +453,21 @@ queue_tictactreerebuild(AAECntrl, Partition, OnlyIfBroken, State) -> Sender, State#state.vnode_pool_pid). +when_loading_complate(AAECntrl, Preflists, PreflistFun, OnlyIfBroken) -> + R = aae_controller:aae_rebuildtrees(AAECntrl, + Preflists, PreflistFun, + OnlyIfBroken), + case R of + loading -> + timer:sleep(1000), + when_loading_complete(AAECntrl, + Preflists, + PreflistFun, + OnlyIfBroken); + _ -> + R + end. + %% @doc Reveal the underlying module state for testing -spec(get_modstate(state()) -> {atom(), state()}). From 3c597801d652c166c8b9bb9ddf7dde4746b192cd Mon Sep 17 00:00:00 2001 From: Martin Sumner Date: Wed, 22 Jul 2020 00:44:44 +0100 Subject: [PATCH 10/30] Update riak_kv_vnode.erl --- src/riak_kv_vnode.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/riak_kv_vnode.erl b/src/riak_kv_vnode.erl index 63c7d3c87e..780c700e36 100644 --- a/src/riak_kv_vnode.erl +++ b/src/riak_kv_vnode.erl @@ -453,7 +453,7 @@ queue_tictactreerebuild(AAECntrl, Partition, OnlyIfBroken, State) -> Sender, State#state.vnode_pool_pid). -when_loading_complate(AAECntrl, Preflists, PreflistFun, OnlyIfBroken) -> +when_loading_complete(AAECntrl, Preflists, PreflistFun, OnlyIfBroken) -> R = aae_controller:aae_rebuildtrees(AAECntrl, Preflists, PreflistFun, OnlyIfBroken), From 822227cc483cd9d1e996da7bb840c630b696c654 Mon Sep 17 00:00:00 2001 From: Martin Sumner Date: Thu, 23 Jul 2020 15:09:23 +0100 Subject: [PATCH 11/30] Update rebar.config --- rebar.config | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/rebar.config b/rebar.config index dfdb70544c..9ba5373ba8 100644 --- a/rebar.config +++ b/rebar.config @@ -36,8 +36,8 @@ {riak_pipe, ".*", {git, "git://github.com/basho/riak_pipe.git", {tag, "riak_kv-2.9.1"}}}, {riak_dt, ".*", {git, "git://github.com/basho/riak_dt.git", {tag, "riak_kv-2.9.1"}}}, {eunit_formatters, ".*", {git, "git://github.com/seancribbs/eunit_formatters", {tag, "0.1.2"}}}, - {leveled, ".*", {git, "https://github.com/martinsumner/leveled.git", {tag, "0.9.21"}}}, - {kv_index_tictactree, ".*", {git, "https://github.com/martinsumner/kv_index_tictactree.git", {branch, "mas-i1765-asynchandling"}}}, + {leveled, ".*", {git, "https://github.com/martinsumner/leveled.git", {branch, "develop-2.9"}}}, + {kv_index_tictactree, ".*", {git, "https://github.com/martinsumner/kv_index_tictactree.git", {branch, "develop-2.9"}}}, {riak_core, ".*", {git, "https://github.com/basho/riak_core.git", {tag, "riak_kv-2.9.2"}}}, {riak_api, ".*", {git, "git://github.com/basho/riak_api.git", {tag, "riak_kv-2.9.2"}}}, {hyper, ".*", {git, "git://github.com/basho/hyper", {tag, "1.0.1"}}}, From 45ff58c124431a75b32b2a173bde2fa9c160e38f Mon Sep 17 00:00:00 2001 From: Martin Sumner Date: Fri, 24 Jul 2020 14:15:02 +0100 Subject: [PATCH 12/30] Icrease initial tick delay on startup Wait at least tick plus offset, not just offset. This will allow for other startup activity to complete (e.g. handoffs before AAE activity commences). --- priv/riak_kv.schema | 9 ++++++--- src/riak_kv_vnode.erl | 6 +++--- 2 files changed, 9 insertions(+), 6 deletions(-) diff --git a/priv/riak_kv.schema b/priv/riak_kv.schema index 58cf934d12..d818c7dc44 100644 --- a/priv/riak_kv.schema +++ b/priv/riak_kv.schema @@ -69,7 +69,7 @@ %% @doc Store heads in parallel key stores %% If running a parallel key store, the whole "head" object may be stored to %% allow for fold_heads queries to be run against the parallel store. -%% Alternatively, the cost of the paralle key store can be reduced by storing +%% Alternatively, the cost of the parallel key store can be reduced by storing %% only a minimal data set necessary for AAE and monitoring {mapping, "tictacaae_storeheads", "riak_kv.tictacaae_storeheads", [ {datatype, {flag, enabled, disabled}}, @@ -92,10 +92,13 @@ %% @doc Frequency to prompt rebuild check per vnode %% The number of milliseconds which the vnode must wait between self-pokes to -%% maybe prompt the next rebuild. Default is 30 minutes. +%% maybe prompt the next rebuild. Default is 60 minutes. +%% When a node is being re-introduced to a cluster following a long delay, then +%% increase this tick prior to the reintroduction. This will reduce +%% the concurrency of some activity e.g. handoffs and rebuilds {mapping, "tictacaae_rebuildtick", "riak_kv.tictacaae_rebuildtick", [ {datatype, integer}, - {default, 1800000}, + {default, 3600000}, hidden ]}. diff --git a/src/riak_kv_vnode.erl b/src/riak_kv_vnode.erl index 780c700e36..6199201cf2 100644 --- a/src/riak_kv_vnode.erl +++ b/src/riak_kv_vnode.erl @@ -340,9 +340,9 @@ maybe_start_aaecontroller(active, State=#state{mod=Mod, InitD = erlang:phash2(Partition, 256), % Space out the initial poke to avoid over-coordination between vnodes, % each of up to 256 vnodes will end on a different point in the slot, with - % the points wrpaping every 256 vnodes (assuming coordinated restart) - FirstRebuildDelay = (RTick div 256) * InitD, - FirstExchangeDelay = (XTick div 256) * InitD, + % the points wrapping every 256 vnodes (assuming coordinated restart) + FirstRebuildDelay = RTick + ((RTick div 256) * InitD), + FirstExchangeDelay = XTick + ((XTick div 256) * InitD), riak_core_vnode:send_command_after(FirstRebuildDelay, tictacaae_rebuildpoke), riak_core_vnode:send_command_after(FirstExchangeDelay, From 0b4bd203b9d65cbde4a0fcd60dcd6ba1f41d9f3d Mon Sep 17 00:00:00 2001 From: Martin Sumner Date: Fri, 24 Jul 2020 14:55:35 +0100 Subject: [PATCH 13/30] Increase default exchange tick --- priv/riak_kv.schema | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/priv/riak_kv.schema b/priv/riak_kv.schema index d818c7dc44..7e7107e074 100644 --- a/priv/riak_kv.schema +++ b/priv/riak_kv.schema @@ -79,14 +79,14 @@ %% @doc Frequency to prompt exchange per vnode %% The number of milliseconds which the vnode must wait between self-pokes to -%% maybe prompt the next exchange. Default is 2 minutes 30 seconds. -%% Note if this is to be reduced below this value the riak_core +%% maybe prompt the next exchange. Default is 5 minutes. +%% Note if this is to be reduced to 2 minutes or below the riak_core %% vnode_inactivity_timeout should also be reduced or handoffs may be %% blocked. To be safe the vnode_inactivity_timeout must be < 0.5 * the %% tictacaae_exchangetick. {mapping, "tictacaae_exchangetick", "riak_kv.tictacaae_exchangetick", [ {datatype, integer}, - {default, 150000}, + {default, 300000}, hidden ]}. From 596f427a527a8a98693b76716ac518c06c35d1bb Mon Sep 17 00:00:00 2001 From: Martin Sumner Date: Sat, 25 Jul 2020 05:47:20 +0100 Subject: [PATCH 14/30] Handle cluster change May queue a rebuild, but due to cluster change the vnode might not exist when the rebuild is complete. In this case shouldn't crash the worker (and lose the contents of the queue) - just skip. --- src/riak_kv_vnode.erl | 30 +++++++++++++++++++----------- 1 file changed, 19 insertions(+), 11 deletions(-) diff --git a/src/riak_kv_vnode.erl b/src/riak_kv_vnode.erl index 6199201cf2..d6dd4be877 100644 --- a/src/riak_kv_vnode.erl +++ b/src/riak_kv_vnode.erl @@ -208,6 +208,7 @@ -define(MAX_AAE_QUEUE_TIME, 1000). %% Queue time in ms to prompt a sync ping. -define(AAE_SKIP_COUNT, 10). +-define(AAE_LOADING_WAIT, 5000). -define(AF1_QUEUE, riak_core_node_worker_pool:af1()). %% Assured Forwarding - pool 1 @@ -454,18 +455,25 @@ queue_tictactreerebuild(AAECntrl, Partition, OnlyIfBroken, State) -> State#state.vnode_pool_pid). when_loading_complete(AAECntrl, Preflists, PreflistFun, OnlyIfBroken) -> - R = aae_controller:aae_rebuildtrees(AAECntrl, - Preflists, PreflistFun, - OnlyIfBroken), - case R of - loading -> - timer:sleep(1000), - when_loading_complete(AAECntrl, - Preflists, - PreflistFun, - OnlyIfBroken); + case is_process_alive(AAECntrl) of + true -> + R = aae_controller:aae_rebuildtrees(AAECntrl, + Preflists, PreflistFun, + OnlyIfBroken), + case R of + loading -> + timer:sleep(?AAE_LOADING_WAIT), + when_loading_complete(AAECntrl, + Preflists, + PreflistFun, + OnlyIfBroken); + _ -> + R + end; _ -> - R + % May have queued a rebuild for a vnode aae controller for an + % exited vnode (e.g. one which has completed handoff) + skipped end. From cf15834d1743c5bf4cb9baccd2b2d3e0d714cb4b Mon Sep 17 00:00:00 2001 From: Martin Sumner Date: Mon, 27 Jul 2020 22:42:42 +0100 Subject: [PATCH 15/30] Switch first OnlyIfBrokenBuild to first poke This avoids trying to queue a tictactree rebuild before the vnode worker pool has been initiated. Otherwise, will cause issues starting node 3 in `verfiy_tictacrebuild_via_statsfold` --- src/riak_kv_vnode.erl | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/src/riak_kv_vnode.erl b/src/riak_kv_vnode.erl index d6dd4be877..cd27f5e87c 100644 --- a/src/riak_kv_vnode.erl +++ b/src/riak_kv_vnode.erl @@ -153,6 +153,7 @@ tictac_startqueue = os:timestamp() :: erlang:timestamp(), tictac_rebuilding = false :: erlang:timestamp()|false, tictac_skiptick = 0 :: non_neg_integer(), + tictac_startup = true :: boolean(), worker_pool_strategy = single :: none|single|dscp, vnode_pool_pid :: undefined|pid(), update_hook :: update_hook(), @@ -334,9 +335,7 @@ maybe_start_aaecontroller(active, State=#state{mod=Mod, Preflists, RootPath, ObjSplitFun), - queue_tictactreerebuild(AAECntrl, Partition, true, State), lager:info("AAE Controller started with pid=~w", [AAECntrl]), - Rebuilding = os:timestamp(), InitD = erlang:phash2(Partition, 256), % Space out the initial poke to avoid over-coordination between vnodes, @@ -364,7 +363,7 @@ maybe_start_aaecontroller(active, State=#state{mod=Mod, State#state{tictac_aae = true, aae_controller = AAECntrl, modstate = ModState, - tictac_rebuilding = Rebuilding, + tictac_rebuilding = false, tictac_skiptick = InitalStep}. @@ -1243,14 +1242,23 @@ handle_command(tictacaae_exchangepoke, _Sender, State) -> {noreply, State#state{tictac_skiptick = SkipCount - 1}} end; +handle_command(tictacaae_rebuildpoke, Sender, State=#state{tictac_startup=TS}) + when TS == true -> + AAECntrl = State#state.aae_controller, + Partition = State#state.idx, + queue_tictactreerebuild(AAECntrl, Partition, true, State), + {noreply, State#state{tictac_rebuilding = os:timestamp(), + tictac_startup=false}}; + + handle_command(tictacaae_rebuildpoke, Sender, State) -> NRT = aae_controller:aae_nextrebuild(State#state.aae_controller), RTick = app_helper:get_env(riak_kv, tictacaae_rebuildtick), riak_core_vnode:send_command_after(RTick, tictacaae_rebuildpoke), TimeToRebuild = timer:now_diff(NRT, os:timestamp()), RebuildPending = State#state.tictac_rebuilding =/= false, - - case {TimeToRebuild < 0, RebuildPending} of + + case {TimeToRebuild < 0, RebuildPending, TriggerRebuild} of {false, _} -> lager:info("No rebuild as next_rebuild=~w seconds in the future", [TimeToRebuild / (1000 * 1000)]), From e39340473834883955640e29b996472ee1138765 Mon Sep 17 00:00:00 2001 From: Martin Sumner Date: Mon, 27 Jul 2020 22:44:11 +0100 Subject: [PATCH 16/30] Update riak_kv_vnode.erl --- src/riak_kv_vnode.erl | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/riak_kv_vnode.erl b/src/riak_kv_vnode.erl index cd27f5e87c..8e2e13bf94 100644 --- a/src/riak_kv_vnode.erl +++ b/src/riak_kv_vnode.erl @@ -1242,7 +1242,7 @@ handle_command(tictacaae_exchangepoke, _Sender, State) -> {noreply, State#state{tictac_skiptick = SkipCount - 1}} end; -handle_command(tictacaae_rebuildpoke, Sender, State=#state{tictac_startup=TS}) +handle_command(tictacaae_rebuildpoke, _Sender, State=#state{tictac_startup=TS}) when TS == true -> AAECntrl = State#state.aae_controller, Partition = State#state.idx, @@ -1258,7 +1258,7 @@ handle_command(tictacaae_rebuildpoke, Sender, State) -> TimeToRebuild = timer:now_diff(NRT, os:timestamp()), RebuildPending = State#state.tictac_rebuilding =/= false, - case {TimeToRebuild < 0, RebuildPending, TriggerRebuild} of + case {TimeToRebuild < 0, RebuildPending} of {false, _} -> lager:info("No rebuild as next_rebuild=~w seconds in the future", [TimeToRebuild / (1000 * 1000)]), From 1a2423bcc77fdd2175c92c56d46e7e409ce076a6 Mon Sep 17 00:00:00 2001 From: Martin Sumner Date: Mon, 27 Jul 2020 23:26:34 +0100 Subject: [PATCH 17/30] Don't kill tick on initial rebuild --- src/riak_kv_vnode.erl | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/riak_kv_vnode.erl b/src/riak_kv_vnode.erl index 8e2e13bf94..3298830ba5 100644 --- a/src/riak_kv_vnode.erl +++ b/src/riak_kv_vnode.erl @@ -1244,11 +1244,13 @@ handle_command(tictacaae_exchangepoke, _Sender, State) -> handle_command(tictacaae_rebuildpoke, _Sender, State=#state{tictac_startup=TS}) when TS == true -> + RTick = app_helper:get_env(riak_kv, tictacaae_rebuildtick), + riak_core_vnode:send_command_after(RTick, tictacaae_rebuildpoke), AAECntrl = State#state.aae_controller, Partition = State#state.idx, queue_tictactreerebuild(AAECntrl, Partition, true, State), {noreply, State#state{tictac_rebuilding = os:timestamp(), - tictac_startup=false}}; + tictac_startup = false}}; handle_command(tictacaae_rebuildpoke, Sender, State) -> From bbf21640959f4c283518c6a72a6be0c24b57e386 Mon Sep 17 00:00:00 2001 From: Martin Sumner Date: Tue, 28 Jul 2020 11:54:08 +0100 Subject: [PATCH 18/30] Comments and rollback slightly exchange poke increase --- priv/riak_kv.schema | 12 ++++++------ src/riak_kv_vnode.erl | 4 ++++ 2 files changed, 10 insertions(+), 6 deletions(-) diff --git a/priv/riak_kv.schema b/priv/riak_kv.schema index 7e7107e074..bf343633b1 100644 --- a/priv/riak_kv.schema +++ b/priv/riak_kv.schema @@ -79,14 +79,14 @@ %% @doc Frequency to prompt exchange per vnode %% The number of milliseconds which the vnode must wait between self-pokes to -%% maybe prompt the next exchange. Default is 5 minutes. -%% Note if this is to be reduced to 2 minutes or below the riak_core -%% vnode_inactivity_timeout should also be reduced or handoffs may be -%% blocked. To be safe the vnode_inactivity_timeout must be < 0.5 * the -%% tictacaae_exchangetick. +%% maybe prompt the next exchange. Default is 4 minutes - check all partitions +%% when n=3 every 30 minutes. +%% Note if this is to be reduced further the riak_core vnode_inactivity_timeout +%% should also be reduced or handoffs may be blocked. To be safe the +%% vnode_inactivity_timeout must be < 0.5 * the tictacaae_exchangetick. {mapping, "tictacaae_exchangetick", "riak_kv.tictacaae_exchangetick", [ {datatype, integer}, - {default, 300000}, + {default, 240000}, hidden ]}. diff --git a/src/riak_kv_vnode.erl b/src/riak_kv_vnode.erl index 3298830ba5..f2097c78ec 100644 --- a/src/riak_kv_vnode.erl +++ b/src/riak_kv_vnode.erl @@ -1244,6 +1244,10 @@ handle_command(tictacaae_exchangepoke, _Sender, State) -> handle_command(tictacaae_rebuildpoke, _Sender, State=#state{tictac_startup=TS}) when TS == true -> + % On startup the first poke should check if the trees need rebuilding, e.g. + % as the tree was not persisted when shutdown. This won't rebuild unless + % the trees have been marked as broken. Trees are marked as broken in a + % non-empty store where trees do not exist. RTick = app_helper:get_env(riak_kv, tictacaae_rebuildtick), riak_core_vnode:send_command_after(RTick, tictacaae_rebuildpoke), AAECntrl = State#state.aae_controller, From 28933783944628a8762392199fab38ef4ed83abc Mon Sep 17 00:00:00 2001 From: Martin Sumner Date: Thu, 30 Jul 2020 13:42:53 +0100 Subject: [PATCH 19/30] Allow debug of repairs Allow an environment variable to be set to log repairs (e.g. to debug exchanges prompting repairs) --- src/riak_kv_get_fsm.erl | 22 ++++++++++++++++++++-- 1 file changed, 20 insertions(+), 2 deletions(-) diff --git a/src/riak_kv_get_fsm.erl b/src/riak_kv_get_fsm.erl index dbfb758e18..84e2f1e399 100644 --- a/src/riak_kv_get_fsm.erl +++ b/src/riak_kv_get_fsm.erl @@ -635,8 +635,14 @@ finalize(StateData=#state{get_core = GetCore, trace = Trace, req_id = ReqID, end, {stop,normal,StateData}. - +-spec prompt_readrepair([{riak_core_ring:partition_id(), node()}]) -> + fun((list({{riak_object:bucket(), riak_object:key()}, + {vclock:vclock(), vclock:vclock()}})) -> ok). prompt_readrepair(VnodeList) -> + prompt_readrepair(VnodeList, + app_helper:get_env(riak_kv, log_read_repair, false)). + +prompt_readrepair(VnodeList, LogRepair) -> {ok, C} = riak:local_client(), FetchFun = fun({{B, K}, {_BlueClock, _PinkClock}}) -> @@ -646,12 +652,24 @@ prompt_readrepair(VnodeList) -> fun({{B, K}, {_BlueClock, _PinkClock}}) -> riak_kv_vnode:rehash(VnodeList, B, K) end, + LogFun = + fun({{B, K}, {BlueClock, PinkClock}}) -> + lager:info( + "Prompted read repair Bucket=~w Key=~w Clocks ~w ~w", + [B, K, BlueClock, PinkClock]) + end, fun(RepairList) -> lager:info("Repairing ~w keys between ~w", [length(RepairList), VnodeList]), lists:foreach(FetchFun, RepairList), timer:sleep(1000), % Sleep for repairs to complete - lists:foreach(RehashFun, RepairList) + lists:foreach(RehashFun, RepairList), + case LogRepair of + true -> + lists:foreach(LogFun, RepairList); + false -> + ok + end end. reply_fun({EndStateName, DeltaCount}) -> From 3ac9ccf722f9ddad506b3abdcab8fe265e1554e7 Mon Sep 17 00:00:00 2001 From: Martin Sumner Date: Thu, 30 Jul 2020 13:49:44 +0100 Subject: [PATCH 20/30] Refine name --- src/riak_kv_get_fsm.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/riak_kv_get_fsm.erl b/src/riak_kv_get_fsm.erl index 84e2f1e399..6a17d49c87 100644 --- a/src/riak_kv_get_fsm.erl +++ b/src/riak_kv_get_fsm.erl @@ -640,7 +640,7 @@ finalize(StateData=#state{get_core = GetCore, trace = Trace, req_id = ReqID, {vclock:vclock(), vclock:vclock()}})) -> ok). prompt_readrepair(VnodeList) -> prompt_readrepair(VnodeList, - app_helper:get_env(riak_kv, log_read_repair, false)). + app_helper:get_env(riak_kv, log_readrepair, false)). prompt_readrepair(VnodeList, LogRepair) -> {ok, C} = riak:local_client(), From 5df988a22739b72225c9f3cc0da0d3e1d49420e9 Mon Sep 17 00:00:00 2001 From: Martin Sumner Date: Thu, 30 Jul 2020 17:55:00 +0100 Subject: [PATCH 21/30] Remove tabs --- src/riak_kv_vnode.erl | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/riak_kv_vnode.erl b/src/riak_kv_vnode.erl index f2097c78ec..113139926c 100644 --- a/src/riak_kv_vnode.erl +++ b/src/riak_kv_vnode.erl @@ -3422,19 +3422,19 @@ fold_type_for_query(Query) -> maybe_enable_async_fold(AsyncFolding, Capabilities, Opts) -> AsyncBackend = lists:member(async_fold, Capabilities), options_for_folding_and_backend(Opts, - AsyncFolding andalso AsyncBackend, - async_fold). + AsyncFolding andalso AsyncBackend, + async_fold). -spec maybe_enable_snap_prefold(boolean(), list(), list()) -> list(). maybe_enable_snap_prefold(SnapFolding, Capabilities, Opts) -> SnapBackend = lists:member(snap_prefold, Capabilities), options_for_folding_and_backend(Opts, - SnapFolding andalso SnapBackend, - snap_prefold). + SnapFolding andalso SnapBackend, + snap_prefold). -spec options_for_folding_and_backend(list(), - UseAsyncFolding :: boolean(), - atom()) -> list(). + UseAsyncFolding :: boolean(), + atom()) -> list(). options_for_folding_and_backend(Opts, true, async_fold) -> [async_fold | Opts]; options_for_folding_and_backend(Opts, true, snap_prefold) -> From 12f659389b28475d0f8fb40c772ba2a7274a9347 Mon Sep 17 00:00:00 2001 From: Martin Sumner Date: Sat, 1 Aug 2020 18:17:12 +0100 Subject: [PATCH 22/30] Use Token Bucket for both AAE With option to disable --- priv/riak_kv.schema | 11 ++++++ src/riak_kv_vnode.erl | 79 +++++++++++++++++++++++++++++++++---------- 2 files changed, 73 insertions(+), 17 deletions(-) diff --git a/priv/riak_kv.schema b/priv/riak_kv.schema index bf343633b1..f4e819b053 100644 --- a/priv/riak_kv.schema +++ b/priv/riak_kv.schema @@ -30,6 +30,17 @@ {default, passive} ]}. +%% @doc Use hashtree tokens for anti-entropy throttling +%% To hold-up the vnode when there is a backlog of activity on the AAE store +%% hashtree token bucket may be used to block the vnode every 90 puts until +%% the PUT has been completed. This use aae_ping with tictac_aae, and a full +%% sync block with legacy anti-entropy +{mapping, "aae_tokenbucket", "riak_kv.aae_tokenbucket", [ + {datatype, {flag, enabled, disabled}}, + {default, enabled}, + {commented, enabled} +]}. + %% @doc A path under which aae data files will be stored. {mapping, "tictacaae_dataroot", "riak_kv.tictacaae_dataroot", [ {default, "$(platform_data_dir)/tictac_aae"}, diff --git a/src/riak_kv_vnode.erl b/src/riak_kv_vnode.erl index 113139926c..0b4e924f84 100644 --- a/src/riak_kv_vnode.erl +++ b/src/riak_kv_vnode.erl @@ -154,6 +154,7 @@ tictac_rebuilding = false :: erlang:timestamp()|false, tictac_skiptick = 0 :: non_neg_integer(), tictac_startup = true :: boolean(), + aae_tokenbucket = true :: boolean(), worker_pool_strategy = single :: none|single|dscp, vnode_pool_pid :: undefined|pid(), update_hook :: update_hook(), @@ -801,6 +802,8 @@ init([Index]) -> app_helper:get_env(riak_kv, tictacaae_active, passive), WorkerPoolStrategy = app_helper:get_env(riak_kv, worker_pool_strategy), + TokenBucket = + app_helper:get_env(riak_kv, aae_tokenbucket, true), MaxAAEQueueTime = app_helper:get_env(riak_kv, max_aae_queue_time, ?MAX_AAE_QUEUE_TIME), EnableNextGenReplSrc = @@ -835,6 +838,7 @@ init([Index]) -> worker_pool_strategy=WorkerPoolStrategy, update_hook=update_hook(), max_aae_queue_time=MaxAAEQueueTime, + aae_tokenbucket=TokenBucket, enable_nextgenreplsrc = EnableNextGenReplSrc, sizelimit_nextgenreplsrc = SizeLimitNextGenReplSrc}, try_set_vnode_lock_limit(Index), @@ -3577,8 +3581,14 @@ nextgenrepl(_B, _K, _Obj, _Size, _Coord, _Enabled, _Limit) -> %% @doc %% Update both the AAE controller (tictac aae) and old school hashtree aae %% if either or both are enabled. -aae_update(Bucket, Key, UpdObj, PrevObj, UpdObjBin, State) -> - case State#state.hashtrees of +aae_update(_Bucket, _Key, _UpdObj, _PrevObj, _UpdObjBin, + #state{hashtrees = HTs, tictac_aae = TAAE} = _State) + when HTs == undefined, TAAE == false -> + ok; +aae_update(Bucket, Key, UpdObj, PrevObj, UpdObjBin, + #state{hashtrees = HTs, tictac_aae = TAAE} = State) -> + Async = async_aae(State#state.aae_tokenbucket), + case HTs of undefined -> ok; Trees -> @@ -3589,9 +3599,9 @@ aae_update(Bucket, Key, UpdObj, PrevObj, UpdObjBin, State) -> _ -> UpdObj end, - update_hashtree(Bucket, Key, RObj, Trees) + update_hashtree(Bucket, Key, RObj, Trees, Async) end, - case {State#state.tictac_aae, PrevObj} of + case {TAAE, PrevObj} of {false, _} -> ok; {_, unchanged_no_old_object} -> @@ -3615,6 +3625,14 @@ aae_update(Bucket, Key, UpdObj, PrevObj, UpdObjBin, State) -> _ -> UpdObjBin end, + case Async of + true -> + ok; + false -> + aae_controller:aae_ping(State#state.aae_controller, + os:timestamp(), + self()) + end, aae_controller:aae_put(State#state.aae_controller, IndexN, Bucket, Key, @@ -3626,19 +3644,33 @@ aae_update(Bucket, Key, UpdObj, PrevObj, UpdObjBin, State) -> -spec aae_delete(binary(), binary(), old_object(), state()) -> ok. %% @doc %% Remove an item from the AAE store, where AAE has been enabled -aae_delete(Bucket, Key, PrevObj, State) -> - case State#state.hashtrees of +aae_delete(_Bucket, _Key, _PrevObj, + #state{hashtrees = HTs, tictac_aae = TAAE} = _State) + when HTs == undefined, TAAE == false -> + ok; +aae_delete(Bucket, Key, PrevObj, + #state{hashtrees = HTs, tictac_aae = TAAE} = State) -> + Async = async_aae(State#state.aae_tokenbucket), + case HTs of undefined -> ok; Trees -> - delete_from_hashtree(Bucket, Key, Trees) + delete_from_hashtree(Bucket, Key, Trees, Async) end, - case State#state.tictac_aae of + case TAAE of false -> ok; true -> PrevClock = get_clock(PrevObj), IndexN = riak_kv_util:get_index_n({Bucket, Key}), + case Async of + true -> + ok; + false -> + aae_controller:aae_ping(State#state.aae_controller, + os:timestamp(), + self()) + end, aae_controller:aae_put(State#state.aae_controller, IndexN, Bucket, Key, @@ -3646,6 +3678,21 @@ aae_delete(Bucket, Key, PrevObj, State) -> <<>>) end. +%% @doc +%% Normally should use an async aae call, unless using a token bucket when a +%% non-async call may be requested (false) each time the bucket is empty +-spec async_aae(boolean()) -> boolean(). +async_aae(false) -> + true; +async_aae(true) -> + case get_hashtree_token() of + true -> + true; + false -> + put(hashtree_tokens, max_hashtree_tokens()), + false + end. + -spec get_clock(old_object()) -> aae_controller:version_vector(). %% @doc @@ -3674,39 +3721,37 @@ maybe_old_object(OldObject) -> OldObject. --spec update_hashtree(binary(), binary(), riak_object:riak_object(), pid()) - -> ok. +-spec update_hashtree(binary(), binary(), riak_object:riak_object(), pid(), + boolean()) -> ok. %% @doc %% Update hashtree based AAE when enabled. %% Note that this requires an object copy - the object has been converted from %% a binary before being sent to another pid. Also, all information on the %% object is ignored other than that necessary to hash the object. There is %% scope for greater efficiency here, even without moving to Tictac AAE -update_hashtree(Bucket, Key, RObj, Trees) -> +update_hashtree(Bucket, Key, RObj, Trees, Async) -> Items = [{object, {Bucket, Key}, RObj}], - case get_hashtree_token() of + case Async of true -> riak_kv_index_hashtree:async_insert(Items, [], Trees), ok; false -> riak_kv_index_hashtree:insert(Items, [], Trees), - put(hashtree_tokens, max_hashtree_tokens()), ok end. --spec delete_from_hashtree(binary(), binary(), pid()) -> ok. +-spec delete_from_hashtree(binary(), binary(), pid(), boolean()) -> ok. %% @doc %% Remove an object from the hashtree based AAE -delete_from_hashtree(Bucket, Key, Trees)-> +delete_from_hashtree(Bucket, Key, Trees, Async)-> Items = [{object, {Bucket, Key}}], - case get_hashtree_token() of + case Async of true -> riak_kv_index_hashtree:async_delete(Items, Trees), ok; false -> riak_kv_index_hashtree:delete(Items, Trees), - put(hashtree_tokens, max_hashtree_tokens()), ok end. From a95d9ea4e85bf909d5db605818abb25a88a04f54 Mon Sep 17 00:00:00 2001 From: Martin Sumner Date: Mon, 3 Aug 2020 10:24:57 +0100 Subject: [PATCH 23/30] Add ability to override tictac exchange max_results --- src/riak_kv_vnode.erl | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/src/riak_kv_vnode.erl b/src/riak_kv_vnode.erl index 0b4e924f84..ca801bdb01 100644 --- a/src/riak_kv_vnode.erl +++ b/src/riak_kv_vnode.erl @@ -1219,13 +1219,22 @@ handle_command(tictacaae_exchangepoke, _Sender, State) -> {Remote, RN}]), ReplyFun = tictac_returnfun(Idx, exchange), ScanTimeout = ?AAE_SKIP_COUNT * XTick, + ExchangeOptions = + case app_helper:get_env(riak_kv, + tictacaae_maxresults) of + MR when is_integer(MR) -> + [{scan_timeout, ScanTimeout}, + {max_results, MR}]; + _ -> + [{scan_timeout, ScanTimeout}] + end, aae_exchange:start(full, BlueList, PinkList, RepairFun, ReplyFun, none, - [{scan_timeout, ScanTimeout}]), + ExchangeOptions), ?AAE_SKIP_COUNT; _ -> lager:warning("Proposed exchange between ~w and ~w " ++ From 6d7b9c6727db2b5b8f3b914b6db6ddca00761775 Mon Sep 17 00:00:00 2001 From: Martin Sumner Date: Mon, 3 Aug 2020 20:19:49 +0100 Subject: [PATCH 24/30] Add tictacaae constraint To control size of fold_clocks queries when large deltas exist --- priv/riak_kv.schema | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/priv/riak_kv.schema b/priv/riak_kv.schema index f4e819b053..eef16080b3 100644 --- a/priv/riak_kv.schema +++ b/priv/riak_kv.schema @@ -113,6 +113,17 @@ hidden ]}. +%% @doc Max number of leaf IDs per exchange +%% To control the length of time for each exchange, only a subset of the +%% conflicting leaves will be compared on each exchange. If there are issues +%% with query timeouts this may be halved. Large backlogs may be reduced +%% faster by doubling. There are 1M segments in a standard tree overall. +{mapping, "tictacaae_maxresults", "riak_kv.tictacaae_maxresults", [ + {datatype, integer}, + {default, 256}, + hidden +]}. + %% @doc Exchange only between primary vnodes %% Setting this to false allows Tictac AAE exchanges between both primary and %% fallback vnodes. From d6f57484301439034f22acf2f9e0bcb68ee328ab Mon Sep 17 00:00:00 2001 From: Martin Sumner Date: Tue, 4 Aug 2020 01:55:44 +0100 Subject: [PATCH 25/30] Extend repair for consistency (with exchange_fsm) Repairing consistent objects is untested with tictacaae, but included just in case. Added sleep. Avoid flooding the vnodes with rehash requests, spread them out. Will allow for a close it interleave. --- src/riak_kv_exchange_fsm.erl | 3 ++- src/riak_kv_get_fsm.erl | 33 ++++++++++++++++++++++++--------- 2 files changed, 26 insertions(+), 10 deletions(-) diff --git a/src/riak_kv_exchange_fsm.erl b/src/riak_kv_exchange_fsm.erl index fc8573e1fc..5ab78d0c7d 100644 --- a/src/riak_kv_exchange_fsm.erl +++ b/src/riak_kv_exchange_fsm.erl @@ -27,7 +27,8 @@ %% FSM states -export([prepare_exchange/2, update_trees/2, - key_exchange/2]). + key_exchange/2, + repair_consistent/1]). %% gen_fsm callbacks -export([init/1, handle_event/3, handle_sync_event/4, handle_info/3, diff --git a/src/riak_kv_get_fsm.erl b/src/riak_kv_get_fsm.erl index 6a17d49c87..6a305c9f80 100644 --- a/src/riak_kv_get_fsm.erl +++ b/src/riak_kv_get_fsm.erl @@ -101,6 +101,8 @@ -define(DEFAULT_RT, head). -define(DEFAULT_NC, 0). -define(QUEUE_EMPTY_LOOPS, 8). +-define(MIN_REPAIRTIME_MS, 10000). +-define(MIN_REPAIRPAUSE_MS, 10). %% =================================================================== %% Public API @@ -646,11 +648,12 @@ prompt_readrepair(VnodeList, LogRepair) -> {ok, C} = riak:local_client(), FetchFun = fun({{B, K}, {_BlueClock, _PinkClock}}) -> - riak_client:get(B, K, C) - end, - RehashFun = - fun({{B, K}, {_BlueClock, _PinkClock}}) -> - riak_kv_vnode:rehash(VnodeList, B, K) + case riak_kv_util:consistent_object(B) of + true -> + riak_kv_exchange_fsm:repair_consistent({B, K}); + false -> + riak_client:get(B, K, C) + end end, LogFun = fun({{B, K}, {BlueClock, PinkClock}}) -> @@ -659,17 +662,29 @@ prompt_readrepair(VnodeList, LogRepair) -> [B, K, BlueClock, PinkClock]) end, fun(RepairList) -> - lager:info("Repairing ~w keys between ~w", - [length(RepairList), VnodeList]), + SW = os:timestamp(), + RepairCount = length(RepairList), + lager:info("Repairing key_count=~w between ~w", + [RepairCount, VnodeList]), + Pause = max(?MIN_REPAIRPAUSE_MS, ?MIN_REPAIRTIME_MS div RepairCount), + RehashFun = + fun({{B, K}, {_BlueClock, _PinkClock}}) -> + timer:sleep(Pause), + riak_kv_vnode:rehash(VnodeList, B, K) + end, lists:foreach(FetchFun, RepairList), - timer:sleep(1000), % Sleep for repairs to complete lists:foreach(RehashFun, RepairList), case LogRepair of true -> lists:foreach(LogFun, RepairList); false -> ok - end + end, + lager:info("Repaired key_count=~w " ++ + "in repair_time=~w ms with pause_time=~w ms", + [RepairCount, + timer:now_diff(os:timestamp(), SW) div 1000, + RepairCount * Pause]) end. reply_fun({EndStateName, DeltaCount}) -> From 1a1d34e045f4d7b285fe2471e5fcd4c7bdf1c5e8 Mon Sep 17 00:00:00 2001 From: Martin Sumner Date: Tue, 4 Aug 2020 02:52:22 +0100 Subject: [PATCH 26/30] Avoid div 0 --- src/riak_kv_get_fsm.erl | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/src/riak_kv_get_fsm.erl b/src/riak_kv_get_fsm.erl index 6a305c9f80..0adbcf07fa 100644 --- a/src/riak_kv_get_fsm.erl +++ b/src/riak_kv_get_fsm.erl @@ -664,14 +664,16 @@ prompt_readrepair(VnodeList, LogRepair) -> fun(RepairList) -> SW = os:timestamp(), RepairCount = length(RepairList), - lager:info("Repairing key_count=~w between ~w", + lager:info("Repairing key_count=~w between ~w", [RepairCount, VnodeList]), - Pause = max(?MIN_REPAIRPAUSE_MS, ?MIN_REPAIRTIME_MS div RepairCount), - RehashFun = - fun({{B, K}, {_BlueClock, _PinkClock}}) -> - timer:sleep(Pause), - riak_kv_vnode:rehash(VnodeList, B, K) - end, + Pause = + max(?MIN_REPAIRPAUSE_MS, + ?MIN_REPAIRTIME_MS div max(1, RepairCount)), + RepairFun = + fun({{B, K}, {_BlueClock, _PinkClock}}) -> + timer:sleep(Pause), + riak_kv_vnode:rehash(VnodeList, B, K) + end, lists:foreach(FetchFun, RepairList), lists:foreach(RehashFun, RepairList), case LogRepair of From 639140901955fb06a73c177833f7831d13264b54 Mon Sep 17 00:00:00 2001 From: Martin Sumner Date: Tue, 4 Aug 2020 02:58:15 +0100 Subject: [PATCH 27/30] Typo --- src/riak_kv_get_fsm.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/riak_kv_get_fsm.erl b/src/riak_kv_get_fsm.erl index 0adbcf07fa..518508a878 100644 --- a/src/riak_kv_get_fsm.erl +++ b/src/riak_kv_get_fsm.erl @@ -669,7 +669,7 @@ prompt_readrepair(VnodeList, LogRepair) -> Pause = max(?MIN_REPAIRPAUSE_MS, ?MIN_REPAIRTIME_MS div max(1, RepairCount)), - RepairFun = + RehashFun = fun({{B, K}, {_BlueClock, _PinkClock}}) -> timer:sleep(Pause), riak_kv_vnode:rehash(VnodeList, B, K) From 69c774c0d5f2f53482fa4df8d48a2e8a916378ce Mon Sep 17 00:00:00 2001 From: Martin Sumner Date: Sat, 8 Aug 2020 12:51:47 +0100 Subject: [PATCH 28/30] Update src/riak_kv_vnode.erl Co-authored-by: Thomas Arts --- src/riak_kv_vnode.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/riak_kv_vnode.erl b/src/riak_kv_vnode.erl index ca801bdb01..9c7318c5d2 100644 --- a/src/riak_kv_vnode.erl +++ b/src/riak_kv_vnode.erl @@ -1252,7 +1252,7 @@ handle_command(tictacaae_exchangepoke, _Sender, State) -> {_, SkipCount} -> lager:warning("Skipping a tick due to non_zero " ++ "skip_count=~w", [SkipCount]), - {noreply, State#state{tictac_skiptick = SkipCount - 1}} + {noreply, State#state{tictac_skiptick = max(0, SkipCount - 1)}} end; handle_command(tictacaae_rebuildpoke, _Sender, State=#state{tictac_startup=TS}) From e135df6f2dc92d4eb8765fb82dbe8cd64d8fa44a Mon Sep 17 00:00:00 2001 From: Martin Sumner Date: Sat, 8 Aug 2020 12:53:23 +0100 Subject: [PATCH 29/30] Correct indentation --- src/riak_kv_vnode.erl | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/src/riak_kv_vnode.erl b/src/riak_kv_vnode.erl index ca801bdb01..59a8b78ce1 100644 --- a/src/riak_kv_vnode.erl +++ b/src/riak_kv_vnode.erl @@ -1236,14 +1236,14 @@ handle_command(tictacaae_exchangepoke, _Sender, State) -> none, ExchangeOptions), ?AAE_SKIP_COUNT; - _ -> - lager:warning("Proposed exchange between ~w and ~w " ++ - "not currently supported within " ++ - "preflist for IndexN=~w possibly " ++ - "due to node failure", - [Local, Remote, {DocIdx, N}]), - 0 - end, + _ -> + lager:warning("Proposed exchange between ~w and ~w " ++ + "not currently supported within " ++ + "preflist for IndexN=~w possibly " ++ + "due to node failure", + [Local, Remote, {DocIdx, N}]), + 0 + end, ok = aae_controller:aae_ping(State#state.aae_controller, os:timestamp(), self()), From 6cb709bfe77980ba13944c89679c5cf2f4f21869 Mon Sep 17 00:00:00 2001 From: Martin Sumner Date: Tue, 11 Aug 2020 14:50:43 +0100 Subject: [PATCH 30/30] Count in folds for comparison See i11771 --- rebar.config | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rebar.config b/rebar.config index 9ba5373ba8..87744170b1 100644 --- a/rebar.config +++ b/rebar.config @@ -37,7 +37,7 @@ {riak_dt, ".*", {git, "git://github.com/basho/riak_dt.git", {tag, "riak_kv-2.9.1"}}}, {eunit_formatters, ".*", {git, "git://github.com/seancribbs/eunit_formatters", {tag, "0.1.2"}}}, {leveled, ".*", {git, "https://github.com/martinsumner/leveled.git", {branch, "develop-2.9"}}}, - {kv_index_tictactree, ".*", {git, "https://github.com/martinsumner/kv_index_tictactree.git", {branch, "develop-2.9"}}}, + {kv_index_tictactree, ".*", {git, "https://github.com/martinsumner/kv_index_tictactree.git", {branch, "mas-i1771-foldcounter"}}}, {riak_core, ".*", {git, "https://github.com/basho/riak_core.git", {tag, "riak_kv-2.9.2"}}}, {riak_api, ".*", {git, "git://github.com/basho/riak_api.git", {tag, "riak_kv-2.9.2"}}}, {hyper, ".*", {git, "git://github.com/basho/hyper", {tag, "1.0.1"}}},