From f1aad0b11f4ae0e4ff1d0f229a12489683035cde Mon Sep 17 00:00:00 2001 From: Martin Sumner Date: Thu, 19 Oct 2023 16:21:55 +0100 Subject: [PATCH] Prioritisation support on selecting from pending changes Always send changes to nodes with less nodes first. Try and populate joining nodes before shuffling, and avoid temporary excesses caused by shuffling --- priv/riak_core.schema | 17 ++ src/riak_core_vnode_manager.erl | 280 ++++++++++++++++++++++++++++++-- 2 files changed, 286 insertions(+), 11 deletions(-) diff --git a/priv/riak_core.schema b/priv/riak_core.schema index 4c799a263..a09dc61c1 100644 --- a/priv/riak_core.schema +++ b/priv/riak_core.schema @@ -45,12 +45,29 @@ %% low value for the handoff_batch_threshold_count (e.g. 200), then raise the %% handoff_timeout. Further, if using the leveled backend in Riak KV %% investigate raising the the backend_pause. +%% Note that there is an additional cluster_transfer_limit configuration option +%% which may prevent this limit from being reached. {mapping, "transfer_limit", "riak_core.handoff_concurrency", [ {datatype, integer}, {default, 2}, {commented, 2} ]}. +%% @doc Number of concurrent cluster-wide prompted transfers allowed. +%% Ownership handoffs may be prompted by vnode inactivity, or by the vnode +%% manager's scheduled tick activity. Should the vnodes never become inactive +%% only the manager will prompt, and this configuration option acts as a +%% cluster-wide limit on concurrent handoffs prompted by the management tick. +%% Unless the current number of ongoing, or blocked, handoffs is below this +%% limit in the cluster (regardless of the prompt for the handoff), the +%% management tick on all nodes will be blocked from prompting further +%% transfers. +{mapping, "cluster_transfer_limit", "riak_core.forced_ownership_handoff", [ + {datatype, integer}, + {default, 8}, + {commented, 8} +]}. + %% @doc Handoff batch threshold count %% The maximum number of objects allowed in a single handoff batch. If there %% are issues with handoff timeouts, then the first change should be to reduce diff --git a/src/riak_core_vnode_manager.erl b/src/riak_core_vnode_manager.erl index d2c0874b0..32eb98d73 100644 --- a/src/riak_core_vnode_manager.erl +++ b/src/riak_core_vnode_manager.erl @@ -558,22 +558,95 @@ schedule_management_timer() -> 10000), erlang:send_after(ManagementTick, ?MODULE, management_tick). + +-type transfer() :: + {non_neg_integer(), node(), node(), [module()], awaiting | complete}. + +-spec trigger_ownership_handoff( + list(transfer()), + list(module()), + riak_core_ring:riak_core_ring(), + #state{}) -> ok. trigger_ownership_handoff(Transfers, Mods, Ring, State) -> + Limit = + app_helper:get_env( + riak_core, forced_ownership_handoff, ?DEFAULT_OWNERSHIP_TRIGGER), + Awaiting = + select_candidate_transfers(Transfers, Mods, Ring, Limit, node()), + _ = [maybe_trigger_handoff(Mod, Idx, State) || {Mod, Idx} <- Awaiting], + ok. + +-spec select_candidate_transfers( + list(transfer()), + list(module()), + riak_core_ring:riak_core_ring(), + non_neg_integer(), + node()) -> list({module(), non_neg_integer()}). +select_candidate_transfers(Transfers, Mods, Ring, Limit, ThisNode) -> IsResizing = riak_core_ring:is_resizing(Ring), - Throttle = limit_ownership_handoff(Transfers, IsResizing), - Awaiting = [{Mod, Idx} || {Idx, Node, _, CMods, S} <- Throttle, + Throttle = + limit_ownership_handoff( + Limit, + order_transfers(Transfers, Mods, Ring, IsResizing), + IsResizing), + [{Mod, Idx} || {Idx, Node, _, CMods, S} <- Throttle, Mod <- Mods, S =:= awaiting, - Node =:= node(), - not lists:member(Mod, CMods)], - _ = [maybe_trigger_handoff(Mod, Idx, State) || {Mod, Idx} <- Awaiting], - ok. + Node =:= ThisNode, + not lists:member(Mod, CMods)]. + +-spec order_transfers( + list(transfer()), + list(module()), + riak_core_ring:riak_core_ring(), + boolean()) -> list(transfer()). +order_transfers(Transfers, _Mods, _Ring, true) -> + Transfers; +order_transfers(Transfers, Mods, Ring, false) -> + Members = riak_core_ring:all_members(Ring), + Owners = riak_core_ring:all_owners(Ring), + MinVnodes = length(Owners) div length(Members), + MinWants = + lists:map( + fun(N) -> {N, MinVnodes - length(owned_partitions(Owners, N))} end, + Members + ), + PriorityTransfers = + lists:foldl( + fun({N, NC}, Acc) -> + case NC > 0 of + true -> + CandidateTs = receiving_transfers(Transfers, N, Mods), + lists:sublist(CandidateTs, NC) ++ Acc; + false -> + Acc + end + end, + [], + MinWants + ), + lists:keysort(1, PriorityTransfers) + ++ lists:keysort(1, Transfers -- PriorityTransfers). + +-spec receiving_transfers( + list(transfer()), node(), list(module())) -> list(transfer()). +receiving_transfers(Transfers, RcvNode, Mods) -> + lists:filter( + fun({_Idx, _SndNode, TrgNode, CMods, S}) -> + case {TrgNode, lists:subtract(Mods, CMods), S} of + {RcvNode, ToDoMods, awaiting} when length(ToDoMods) > 0 -> + true; + _ -> + false + end + end, + Transfers + ). -limit_ownership_handoff(Transfers, IsResizing) -> - Limit = app_helper:get_env(riak_core, - forced_ownership_handoff, - ?DEFAULT_OWNERSHIP_TRIGGER), - limit_ownership_handoff(Limit, Transfers, IsResizing). +%% Return the list of partitions owned by a node +-spec owned_partitions(list({integer(), node()}), node()) -> list(integer()). +owned_partitions(Owners, Node) -> + [P || {P, Owner} <- Owners, Owner =:= Node]. limit_ownership_handoff(Limit, Transfers, false) -> lists:sublist(Transfers, Limit); @@ -1058,3 +1131,188 @@ register_vnode_stats(Mod, Index, Pid) -> unregister_vnode_stats(Mod, Index) -> riak_core_stat:unregister_vnode_stats(Mod, Index). + + +%% =================================================================== +%% Unit tests +%% =================================================================== +-ifdef(TEST). + +order_transfer_test_() -> + {timeout, 60, fun order_transfer_tester/0}. + +order_transfer_tester() -> + N1 = node1, + N1Loc = loc1, + TargetN = 4, + RingSize = 256, + InitJoiningNodes = + [{node2, loc2}, + {node3, loc3}, + {node4, loc1}, + {node5, loc2}, + {node6, loc3}, + {node7, loc1}, + {node8, loc2}, + {node9, loc1}], + + Params = [{target_n_val, TargetN}, {target_location_n_val, 3}], + R1 = + riak_core_ring:set_node_location( + N1, + N1Loc, + riak_core_ring:fresh(RingSize, N1)), + + RClaimInit = + lists:foldl( + fun({N, L}, AccR) -> + AccR0 = riak_core_ring:add_member(N1, AccR, N), + riak_core_ring:set_node_location(N, L, AccR0) + end, + R1, + InitJoiningNodes), + %% Use the entry for ?MODULE here: + CurrentRing0 = + riak_core_membership_claim:claim( + RClaimInit, + {riak_core_membership_claim, default_wants_claim}, + {riak_core_claim_swapping, choose_claim_v4, Params}), + + CurrentRing1 = + lists:foldl( + fun(N, R) -> riak_core_ring:set_member(node1, R, N, valid) end, + CurrentRing0, + [node2, node3, node4, node5, node6, node7, node8, node9] + ), + + NextRingClaimInit = + lists:foldl( + fun({N, L}, AccR) -> + AccR0 = riak_core_ring:add_member(N1, AccR, N), + riak_core_ring:set_node_location(N, L, AccR0) + end, + CurrentRing1, + [{node10, loc3}, {node11, loc4}, {node12, loc4}]), + + NextRing0 = + riak_core_membership_claim:claim( + NextRingClaimInit, + {riak_core_membership_claim, default_wants_claim}, + {riak_core_claim_swapping, choose_claim_v4, Params}), + + CurrentRing2 = + lists:foldl( + fun(N, R) -> riak_core_ring:set_member(node1, R, N, valid) end, + NextRingClaimInit, + [node10, node11, node12] + ), + + Changes = + lists:map( + fun({{Idx, CN}, {Idx, FN}}) -> + {Idx, CN, FN, [], awaiting} + end, + lists:filter( + fun({Current, Future}) -> Current =/= Future end, + lists:zip( + riak_core_ring:all_owners(CurrentRing1), + riak_core_ring:all_owners(NextRing0)) + ) + ), + + NodeList = + [node1, node2, node3, node4, node5, node6, node7, node8, node9, + node10, node11, node12], + + CurrentRing3 = riak_core_ring:set_pending_changes(CurrentRing2, Changes), + TransferLimit = 4, + FirstTransfers = + fetch_transfers_all_nodes(CurrentRing3, TransferLimit, NodeList), + + test_destinations( + FirstTransfers, TransferLimit, [node10, node11, node12], Changes), + + MinVnodes = RingSize div length(NodeList), + NearlyAllTransferLimit = (MinVnodes * 3) - 1, + NearlyAllTransfersToNew = + fetch_transfers_all_nodes( + CurrentRing3, NearlyAllTransferLimit, NodeList), + + test_destinations( + NearlyAllTransfersToNew, + NearlyAllTransferLimit, + [node10, node11, node12], + Changes + ), + + {CurrentRing4, UpdChanges} = + lists:foldl( + fun({riak_kv, Idx}, {R, Cs}) -> + {Idx, _SndNode, RcvNode, [], awaiting} = + lists:keyfind(Idx, 1, Cs), + {riak_core_ring:transfer_node(Idx, RcvNode, R), + lists:keydelete(Idx, 1, Cs)} + end, + {CurrentRing3, Changes}, + NearlyAllTransfersToNew + ), + NextTransfers = + fetch_transfers_all_nodes( + riak_core_ring:set_pending_changes(CurrentRing4, UpdChanges), + 4, + NodeList + ), + ?assertMatch(4, length(NextTransfers)), + NextTransfersDetails = + lists:keysort( + 1, + lists:map( + fun({riak_kv, Idx}) -> lists:keyfind(Idx, 1, UpdChanges) end, + NextTransfers + ) + ), + %% If a node has sent more than 3 vnodes as part of the first wave, it will + %% now be potentially treated as a joiner - a node with outstanding wants + %% So cannot now be really deterministic in what to expect, as one joiner + %% may still have wants, as well as non-joining nodes + %% In the next 4 changes, probably all will be to non-joining nodes, but at + %% least the second, third and fourth will be + lists:foreach( + fun({_Idx, _SndNode, RcvNode, [], awaiting}) -> + ?assert(not lists:member(RcvNode, [node10, node11, node12])) + end, + lists:sublist(NextTransfersDetails, 2, 3) + ). + + +fetch_transfers_all_nodes(Ring, TransferLimit, NodeList) -> + lists:foldl( + fun(N, Acc) -> + lists:append( + Acc, + select_candidate_transfers( + riak_core_ring:pending_changes(Ring), + [riak_kv], + Ring, + TransferLimit, + N) + ) + end, + [], + NodeList + ). + + +test_destinations(Transfers, ExpectedLength, ValidDestinations, Changes) -> + ?assertMatch(ExpectedLength, length(Transfers)), + lists:foreach( + fun({riak_kv, Idx}) -> + {Idx, _SndNode, RcvNode, [], awaiting} = + lists:keyfind(Idx, 1, Changes), + ?assert(lists:member(RcvNode, ValidDestinations)) + end, + Transfers + ). + +-endif. +