Skip to content

Commit

Permalink
Merge branch 'mas-i1771-foldcounter' into mas-i1765-develop30
Browse files Browse the repository at this point in the history
  • Loading branch information
martinsumner committed Aug 12, 2020
2 parents ccd1e36 + 6cb709b commit f9c5351
Show file tree
Hide file tree
Showing 5 changed files with 346 additions and 130 deletions.
43 changes: 34 additions & 9 deletions priv/riak_kv.schema
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,17 @@
{default, passive}
]}.

%% @doc Use hashtree tokens for anti-entropy throttling
%% To hold-up the vnode when there is a backlog of activity on the AAE store
%% hashtree token bucket may be used to block the vnode every 90 puts until
%% the PUT has been completed. This use aae_ping with tictac_aae, and a full
%% sync block with legacy anti-entropy
{mapping, "aae_tokenbucket", "riak_kv.aae_tokenbucket", [
{datatype, {flag, enabled, disabled}},
{default, enabled},
{commented, enabled}
]}.

%% @doc A path under which aae data files will be stored.
{mapping, "tictacaae_dataroot", "riak_kv.tictacaae_dataroot", [
{default, "$(platform_data_dir)/tictac_aae"},
Expand Down Expand Up @@ -69,7 +80,7 @@
%% @doc Store heads in parallel key stores
%% If running a parallel key store, the whole "head" object may be stored to
%% allow for fold_heads queries to be run against the parallel store.
%% Alternatively, the cost of the paralle key store can be reduced by storing
%% Alternatively, the cost of the parallel key store can be reduced by storing
%% only a minimal data set necessary for AAE and monitoring
{mapping, "tictacaae_storeheads", "riak_kv.tictacaae_storeheads", [
{datatype, {flag, enabled, disabled}},
Expand All @@ -79,23 +90,37 @@

%% @doc Frequency to prompt exchange per vnode
%% The number of milliseconds which the vnode must wait between self-pokes to
%% maybe prompt the next exchange. Default is 2 minutes 30 seconds.
%% Note if this is to be reduced below this value the riak_core
%% vnode_inactivity_timeout should also be reduced or handoffs may be
%% blocked. To be safe the vnode_inactivity_timeout must be < 0.5 * the
%% tictacaae_exchangetick.
%% maybe prompt the next exchange. Default is 4 minutes - check all partitions
%% when n=3 every 30 minutes.
%% Note if this is to be reduced further the riak_core vnode_inactivity_timeout
%% should also be reduced or handoffs may be blocked. To be safe the
%% vnode_inactivity_timeout must be < 0.5 * the tictacaae_exchangetick.
{mapping, "tictacaae_exchangetick", "riak_kv.tictacaae_exchangetick", [
{datatype, integer},
{default, 150000},
{default, 240000},
hidden
]}.

%% @doc Frequency to prompt rebuild check per vnode
%% The number of milliseconds which the vnode must wait between self-pokes to
%% maybe prompt the next rebuild. Default is 30 minutes.
%% maybe prompt the next rebuild. Default is 60 minutes.
%% When a node is being re-introduced to a cluster following a long delay, then
%% increase this tick prior to the reintroduction. This will reduce
%% the concurrency of some activity e.g. handoffs and rebuilds
{mapping, "tictacaae_rebuildtick", "riak_kv.tictacaae_rebuildtick", [
{datatype, integer},
{default, 1800000},
{default, 3600000},
hidden
]}.

%% @doc Max number of leaf IDs per exchange
%% To control the length of time for each exchange, only a subset of the
%% conflicting leaves will be compared on each exchange. If there are issues
%% with query timeouts this may be halved. Large backlogs may be reduced
%% faster by doubling. There are 1M segments in a standard tree overall.
{mapping, "tictacaae_maxresults", "riak_kv.tictacaae_maxresults", [
{datatype, integer},
{default, 256},
hidden
]}.

Expand Down
3 changes: 2 additions & 1 deletion src/riak_kv_exchange_fsm.erl
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@
%% FSM states
-export([prepare_exchange/2,
update_trees/2,
key_exchange/2]).
key_exchange/2,
repair_consistent/1]).

%% gen_fsm callbacks
-export([init/1, handle_event/3, handle_sync_event/4, handle_info/3,
Expand Down
53 changes: 44 additions & 9 deletions src/riak_kv_get_fsm.erl
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,8 @@
-define(DEFAULT_RT, head).
-define(DEFAULT_NC, 0).
-define(QUEUE_EMPTY_LOOPS, 8).
-define(MIN_REPAIRTIME_MS, 10000).
-define(MIN_REPAIRPAUSE_MS, 10).

%% ===================================================================
%% Public API
Expand Down Expand Up @@ -641,23 +643,56 @@ finalize(StateData=#state{get_core = GetCore, trace = Trace, req_id = ReqID,
end,
{stop,normal,StateData}.


-spec prompt_readrepair([{riak_core_ring:partition_id(), node()}]) ->
fun((list({{riak_object:bucket(), riak_object:key()},
{vclock:vclock(), vclock:vclock()}})) -> ok).
prompt_readrepair(VnodeList) ->
prompt_readrepair(VnodeList,
app_helper:get_env(riak_kv, log_readrepair, false)).

prompt_readrepair(VnodeList, LogRepair) ->
{ok, C} = riak:local_client(),
FetchFun =
fun({{B, K}, {_BlueClock, _PinkClock}}) ->
riak_client:get(B, K, C)
case riak_kv_util:consistent_object(B) of
true ->
riak_kv_exchange_fsm:repair_consistent({B, K});
false ->
riak_client:get(B, K, C)
end
end,
RehashFun =
fun({{B, K}, {_BlueClock, _PinkClock}}) ->
riak_kv_vnode:rehash(VnodeList, B, K)
LogFun =
fun({{B, K}, {BlueClock, PinkClock}}) ->
lager:info(
"Prompted read repair Bucket=~w Key=~w Clocks ~w ~w",
[B, K, BlueClock, PinkClock])
end,
fun(RepairList) ->
lager:info("Repairing ~w keys between ~w",
[length(RepairList), VnodeList]),
SW = os:timestamp(),
RepairCount = length(RepairList),
lager:info("Repairing key_count=~w between ~w",
[RepairCount, VnodeList]),
Pause =
max(?MIN_REPAIRPAUSE_MS,
?MIN_REPAIRTIME_MS div max(1, RepairCount)),
RehashFun =
fun({{B, K}, {_BlueClock, _PinkClock}}) ->
timer:sleep(Pause),
riak_kv_vnode:rehash(VnodeList, B, K)
end,
lists:foreach(FetchFun, RepairList),
timer:sleep(1000), % Sleep for repairs to complete
lists:foreach(RehashFun, RepairList)
lists:foreach(RehashFun, RepairList),
case LogRepair of
true ->
lists:foreach(LogFun, RepairList);
false ->
ok
end,
lager:info("Repaired key_count=~w " ++
"in repair_time=~w ms with pause_time=~w ms",
[RepairCount,
timer:now_diff(os:timestamp(), SW) div 1000,
RepairCount * Pause])
end.

reply_fun({EndStateName, DeltaCount}) ->
Expand Down
2 changes: 1 addition & 1 deletion src/riak_kv_ttaaefs_manager.erl
Original file line number Diff line number Diff line change
Expand Up @@ -782,7 +782,7 @@ take_next_workitem([], Wants, ScheduleStartTime, SlotInfo, SliceCount) ->
take_next_workitem([NextAlloc|T], Wants,
ScheduleStartTime, SlotInfo, SliceCount) ->
{NodeNumber, NodeCount} = SlotInfo,
SliceSeconds = ?SECONDS_IN_DAY div SliceCount,
SliceSeconds = ?SECONDS_IN_DAY div max(SliceCount, 1),
SlotSeconds = (NodeNumber - 1) * (SliceSeconds div max(NodeCount, 1)),
{SliceNumber, NextAction} = NextAlloc,
{Mega, Sec, _Micro} = ScheduleStartTime,
Expand Down
Loading

0 comments on commit f9c5351

Please sign in to comment.