Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prioritisation support on selecting from pending changes #3

Open
wants to merge 1 commit into
base: nhse-develop-3.0
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions priv/riak_core.schema
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,29 @@
%% low value for the handoff_batch_threshold_count (e.g. 200), then raise the
%% handoff_timeout. Further, if using the leveled backend in Riak KV
%% investigate raising the the backend_pause.
%% Note that there is an additional cluster_transfer_limit configuration option
%% which may prevent this limit from being reached.
{mapping, "transfer_limit", "riak_core.handoff_concurrency", [
{datatype, integer},
{default, 2},
{commented, 2}
]}.

%% @doc Number of concurrent cluster-wide prompted transfers allowed.
%% Ownership handoffs may be prompted by vnode inactivity, or by the vnode
%% manager's scheduled tick activity. Should the vnodes never become inactive
%% only the manager will prompt, and this configuration option acts as a
%% cluster-wide limit on concurrent handoffs prompted by the management tick.
%% Unless the current number of ongoing, or blocked, handoffs is below this
%% limit in the cluster (regardless of the prompt for the handoff), the
%% management tick on all nodes will be blocked from prompting further
%% transfers.
{mapping, "cluster_transfer_limit", "riak_core.forced_ownership_handoff", [
{datatype, integer},
{default, 8},
{commented, 8}
]}.

%% @doc Handoff batch threshold count
%% The maximum number of objects allowed in a single handoff batch. If there
%% are issues with handoff timeouts, then the first change should be to reduce
Expand Down
280 changes: 269 additions & 11 deletions src/riak_core_vnode_manager.erl
Original file line number Diff line number Diff line change
Expand Up @@ -558,22 +558,95 @@ schedule_management_timer() ->
10000),
erlang:send_after(ManagementTick, ?MODULE, management_tick).


-type transfer() ::
{non_neg_integer(), node(), node(), [module()], awaiting | complete}.

-spec trigger_ownership_handoff(
list(transfer()),
list(module()),
riak_core_ring:riak_core_ring(),
#state{}) -> ok.
trigger_ownership_handoff(Transfers, Mods, Ring, State) ->
Limit =
app_helper:get_env(
riak_core, forced_ownership_handoff, ?DEFAULT_OWNERSHIP_TRIGGER),
Awaiting =
select_candidate_transfers(Transfers, Mods, Ring, Limit, node()),
_ = [maybe_trigger_handoff(Mod, Idx, State) || {Mod, Idx} <- Awaiting],
ok.

-spec select_candidate_transfers(
list(transfer()),
list(module()),
riak_core_ring:riak_core_ring(),
non_neg_integer(),
node()) -> list({module(), non_neg_integer()}).
select_candidate_transfers(Transfers, Mods, Ring, Limit, ThisNode) ->
IsResizing = riak_core_ring:is_resizing(Ring),
Throttle = limit_ownership_handoff(Transfers, IsResizing),
Awaiting = [{Mod, Idx} || {Idx, Node, _, CMods, S} <- Throttle,
Throttle =
limit_ownership_handoff(
Limit,
order_transfers(Transfers, Mods, Ring, IsResizing),
IsResizing),
[{Mod, Idx} || {Idx, Node, _, CMods, S} <- Throttle,
Mod <- Mods,
S =:= awaiting,
Node =:= node(),
not lists:member(Mod, CMods)],
_ = [maybe_trigger_handoff(Mod, Idx, State) || {Mod, Idx} <- Awaiting],
ok.
Node =:= ThisNode,
not lists:member(Mod, CMods)].

-spec order_transfers(
list(transfer()),
list(module()),
riak_core_ring:riak_core_ring(),
boolean()) -> list(transfer()).
order_transfers(Transfers, _Mods, _Ring, true) ->
Transfers;
order_transfers(Transfers, Mods, Ring, false) ->
Members = riak_core_ring:all_members(Ring),
Owners = riak_core_ring:all_owners(Ring),
MinVnodes = length(Owners) div length(Members),
MinWants =
lists:map(
fun(N) -> {N, MinVnodes - length(owned_partitions(Owners, N))} end,
Members
),
PriorityTransfers =
lists:foldl(
fun({N, NC}, Acc) ->
case NC > 0 of
true ->
CandidateTs = receiving_transfers(Transfers, N, Mods),
lists:sublist(CandidateTs, NC) ++ Acc;
false ->
Acc
end
end,
[],
MinWants
),
lists:keysort(1, PriorityTransfers)
++ lists:keysort(1, Transfers -- PriorityTransfers).

-spec receiving_transfers(
list(transfer()), node(), list(module())) -> list(transfer()).
receiving_transfers(Transfers, RcvNode, Mods) ->
lists:filter(
fun({_Idx, _SndNode, TrgNode, CMods, S}) ->
case {TrgNode, lists:subtract(Mods, CMods), S} of
{RcvNode, ToDoMods, awaiting} when length(ToDoMods) > 0 ->
true;
_ ->
false
end
end,
Transfers
).

limit_ownership_handoff(Transfers, IsResizing) ->
Limit = app_helper:get_env(riak_core,
forced_ownership_handoff,
?DEFAULT_OWNERSHIP_TRIGGER),
limit_ownership_handoff(Limit, Transfers, IsResizing).
%% Return the list of partitions owned by a node
-spec owned_partitions(list({integer(), node()}), node()) -> list(integer()).
owned_partitions(Owners, Node) ->
[P || {P, Owner} <- Owners, Owner =:= Node].

limit_ownership_handoff(Limit, Transfers, false) ->
lists:sublist(Transfers, Limit);
Expand Down Expand Up @@ -1058,3 +1131,188 @@ register_vnode_stats(Mod, Index, Pid) ->

unregister_vnode_stats(Mod, Index) ->
riak_core_stat:unregister_vnode_stats(Mod, Index).


%% ===================================================================
%% Unit tests
%% ===================================================================
-ifdef(TEST).

order_transfer_test_() ->
{timeout, 60, fun order_transfer_tester/0}.

order_transfer_tester() ->
N1 = node1,
N1Loc = loc1,
TargetN = 4,
RingSize = 256,
InitJoiningNodes =
[{node2, loc2},
{node3, loc3},
{node4, loc1},
{node5, loc2},
{node6, loc3},
{node7, loc1},
{node8, loc2},
{node9, loc1}],

Params = [{target_n_val, TargetN}, {target_location_n_val, 3}],
R1 =
riak_core_ring:set_node_location(
N1,
N1Loc,
riak_core_ring:fresh(RingSize, N1)),

RClaimInit =
lists:foldl(
fun({N, L}, AccR) ->
AccR0 = riak_core_ring:add_member(N1, AccR, N),
riak_core_ring:set_node_location(N, L, AccR0)
end,
R1,
InitJoiningNodes),
%% Use the entry for ?MODULE here:
CurrentRing0 =
riak_core_membership_claim:claim(
RClaimInit,
{riak_core_membership_claim, default_wants_claim},
{riak_core_claim_swapping, choose_claim_v4, Params}),

CurrentRing1 =
lists:foldl(
fun(N, R) -> riak_core_ring:set_member(node1, R, N, valid) end,
CurrentRing0,
[node2, node3, node4, node5, node6, node7, node8, node9]
),

NextRingClaimInit =
lists:foldl(
fun({N, L}, AccR) ->
AccR0 = riak_core_ring:add_member(N1, AccR, N),
riak_core_ring:set_node_location(N, L, AccR0)
end,
CurrentRing1,
[{node10, loc3}, {node11, loc4}, {node12, loc4}]),

NextRing0 =
riak_core_membership_claim:claim(
NextRingClaimInit,
{riak_core_membership_claim, default_wants_claim},
{riak_core_claim_swapping, choose_claim_v4, Params}),

CurrentRing2 =
lists:foldl(
fun(N, R) -> riak_core_ring:set_member(node1, R, N, valid) end,
NextRingClaimInit,
[node10, node11, node12]
),

Changes =
lists:map(
fun({{Idx, CN}, {Idx, FN}}) ->
{Idx, CN, FN, [], awaiting}
end,
lists:filter(
fun({Current, Future}) -> Current =/= Future end,
lists:zip(
riak_core_ring:all_owners(CurrentRing1),
riak_core_ring:all_owners(NextRing0))
)
),

NodeList =
[node1, node2, node3, node4, node5, node6, node7, node8, node9,
node10, node11, node12],

CurrentRing3 = riak_core_ring:set_pending_changes(CurrentRing2, Changes),
TransferLimit = 4,
FirstTransfers =
fetch_transfers_all_nodes(CurrentRing3, TransferLimit, NodeList),

test_destinations(
FirstTransfers, TransferLimit, [node10, node11, node12], Changes),

MinVnodes = RingSize div length(NodeList),
NearlyAllTransferLimit = (MinVnodes * 3) - 1,
NearlyAllTransfersToNew =
fetch_transfers_all_nodes(
CurrentRing3, NearlyAllTransferLimit, NodeList),

test_destinations(
NearlyAllTransfersToNew,
NearlyAllTransferLimit,
[node10, node11, node12],
Changes
),

{CurrentRing4, UpdChanges} =
lists:foldl(
fun({riak_kv, Idx}, {R, Cs}) ->
{Idx, _SndNode, RcvNode, [], awaiting} =
lists:keyfind(Idx, 1, Cs),
{riak_core_ring:transfer_node(Idx, RcvNode, R),
lists:keydelete(Idx, 1, Cs)}
end,
{CurrentRing3, Changes},
NearlyAllTransfersToNew
),
NextTransfers =
fetch_transfers_all_nodes(
riak_core_ring:set_pending_changes(CurrentRing4, UpdChanges),
4,
NodeList
),
?assertMatch(4, length(NextTransfers)),
NextTransfersDetails =
lists:keysort(
1,
lists:map(
fun({riak_kv, Idx}) -> lists:keyfind(Idx, 1, UpdChanges) end,
NextTransfers
)
),
%% If a node has sent more than 3 vnodes as part of the first wave, it will
%% now be potentially treated as a joiner - a node with outstanding wants
%% So cannot now be really deterministic in what to expect, as one joiner
%% may still have wants, as well as non-joining nodes
%% In the next 4 changes, probably all will be to non-joining nodes, but at
%% least the second, third and fourth will be
lists:foreach(
fun({_Idx, _SndNode, RcvNode, [], awaiting}) ->
?assert(not lists:member(RcvNode, [node10, node11, node12]))
end,
lists:sublist(NextTransfersDetails, 2, 3)
).


fetch_transfers_all_nodes(Ring, TransferLimit, NodeList) ->
lists:foldl(
fun(N, Acc) ->
lists:append(
Acc,
select_candidate_transfers(
riak_core_ring:pending_changes(Ring),
[riak_kv],
Ring,
TransferLimit,
N)
)
end,
[],
NodeList
).


test_destinations(Transfers, ExpectedLength, ValidDestinations, Changes) ->
?assertMatch(ExpectedLength, length(Transfers)),
lists:foreach(
fun({riak_kv, Idx}) ->
{Idx, _SndNode, RcvNode, [], awaiting} =
lists:keyfind(Idx, 1, Changes),
?assert(lists:member(RcvNode, ValidDestinations))
end,
Transfers
).

-endif.

Loading