Skip to content

Commit

Permalink
Read repair - configure to repair primary only (#1844)
Browse files Browse the repository at this point in the history
* Read repair - configure to repair primary only

By default, the behaviour should be unchanged.  however it is now configurable to read repair primary vnodes only - fallback vnodes will not be repaired on failing GETs, they will only receive new PUTs.

See schema change for more details.

* get_option returns value not {K, V}

* Add ability to suspend AAE

* Add logging of read repairs

Initially to troubleshoot in test - but perhaps of generic use.

* Handle handoff put through standard put code

Rather than replicate piecemeal the standard PUT code within do_diffobj_put/3, instead use do_handoff_put/3 and use standard prepare_put/2 and perform_put/3 functions used in normal PUTs.

The effect of this is that any optimisations in the normal PUT workflow, will now automatically be used for handoffs.  Of particular relevance at this point is the HEAD (not GET) before PUT optimisation available with leveled backend.  If there are large objects, and objects which already exist in the receiving vnode are to be handed off (such as in hinted handoff), then this increases efficiency.

Some spec improvements to help with some editors that do not like fun() type.  Some indent reductions to improve readability.

* Make HookReason part of PutArgs

This allows the same code to be used for both handoff and put.

* Revert defaulting of properties

As riak_core updated to ensure bucket types are exchanged prior to join committing

* Add helpful operator functions to riak_client

To make recovery of nodes easier, adding some helper functions to riak_client.

* Update branches

Remove legacy thumbs

* Update rebar.config
  • Loading branch information
martinsumner authored Jan 19, 2023
1 parent 1e19c32 commit fbb5363
Show file tree
Hide file tree
Showing 8 changed files with 262 additions and 201 deletions.
11 changes: 0 additions & 11 deletions .thumbs.yml

This file was deleted.

32 changes: 30 additions & 2 deletions priv/riak_kv.schema
Original file line number Diff line number Diff line change
Expand Up @@ -1461,7 +1461,35 @@
%% converted to an overflow queue as part of this release. Instead use
%% `replrtq_overflow_limit` to control the queue size, including on-disk size.
{mapping, "replrtq_srcqueuelimit", "riak_kv.replrtq_srcqueuelimit", [
{datatype, integer},
{default, 300000},
{datatype, integer},
{default, 300000},
hidden
]}.

%% @doc Choose to read repair to primary vnodes only
%% When fallback vnodes are elected, then read repair will by default repair
%% any missing data from the vnode - i.e. every GET while the fallback is in
%% play will lead to a PUT to add the rewuested object to the fallback vnode,
%% as the fallback by default starts empty.
%% If the expectation is that failed vnodes are replaced quickly, as would be
%% possible in a Cloud scenario, this may not be desirable. Read repair to
%% fallbacks reduce throughput in failure scenarios, and then the hinted
%% handoffs following recovery are impaired by the historic data which is
%% already in the recovered node, and has to be handed off as well as the
%% fresh updates received since the failure.
%% When fallback vnodes are expected to be in place for a long period, the
%% default setting of read repairing fallbacks may be preferred, as it will
%% provide additional data resilience, and potentially improved performance
%% where the same objects are repeatedly fetched.
{mapping, "read_repair_primaryonly", "riak_kv.read_repair_primaryonly", [
{datatype, {flag, enabled, disabled}},
{default, disabled}
]}.

%% @doc If reads discovers keys to be repaired, should each key
%% that is repaired be logged
{mapping, "read_repair_log", "riak_kv.read_repair_log", [
{datatype, {flag, enabled, disabled}},
{default, disabled},
hidden
]}.
8 changes: 4 additions & 4 deletions rebar.config
Original file line number Diff line number Diff line change
Expand Up @@ -42,16 +42,16 @@
]}.

{deps, [
{riak_core, {git, "https://github.com/basho/riak_core.git", {tag, "riak_kv-3.0.12"}}},
{riak_core, {git, "https://github.com/basho/riak_core.git", {branch, "develop-3.0"}}},
{sidejob, {git, "https://github.com/basho/sidejob.git", {tag, "2.1.0"}}},
{bitcask, {git, "https://github.com/basho/bitcask.git", {tag, "2.1.0"}}},
{redbug, {git, "https://github.com/massemanet/redbug", {tag, "1.2.2"}}},
{recon, {git, "https://github.com/ferd/recon", {tag, "2.5.2"}}},
{sext, {git, "https://github.com/uwiger/sext.git", {tag, "1.4.1"}}},
{riak_pipe, {git, "https://github.com/basho/riak_pipe.git", {tag, "riak_kv-3.0.12"}}},
{riak_pipe, {git, "https://github.com/basho/riak_pipe.git", {branch, "develop-3.0"}}},
{riak_dt, {git, "https://github.com/basho/riak_dt.git", {tag, "riak_kv-3.0.0"}}},
{riak_api, {git, "https://github.com/basho/riak_api.git", {tag, "riak_kv-3.0.12"}}},
{riak_api, {git, "https://github.com/basho/riak_api.git", {branch, "develop-3.0"}}},
{hyper, {git, "https://github.com/basho/hyper", {tag, "1.1.0"}}},
{kv_index_tictactree, {git, "https://github.com/martinsumner/kv_index_tictactree.git", {tag, "1.0.4"}}},
{kv_index_tictactree, {git, "https://github.com/martinsumner/kv_index_tictactree.git", {branch, "develop-3.0"}}},
{riakhttpc, {git, "https://github.com/basho/riak-erlang-http-client", {tag, "3.0.10"}}}
]}.
54 changes: 38 additions & 16 deletions src/riak_client.erl
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,9 @@
-export([ensemble/1]).
-export([fetch/2, push/4]).
-export([membership_request/1, replrtq_reset_all_peers/1, replrtq_reset_all_workercounts/2]).
-export([tictacaae_suspend_node/0, tictacaae_resume_node/0]).
-export([remove_node_from_coverage/0, reset_node_for_coverage/0]).
-export([repair_node/0]).

-compile({no_auto_import,[put/2]}).
%% @type default_timeout() = 60000
Expand Down Expand Up @@ -901,13 +903,14 @@ aae_fold(Query) ->
aae_fold(Query, {?MODULE, [Node, _ClientId]}) ->
Me = self(),
ReqId = mk_reqid(),
TimeOut = ?DEFAULT_FOLD_TIMEOUT,
TimeOut =
app_helper:get_env(
riak_kv, riak_client_aaefold_timeout, ?DEFAULT_FOLD_TIMEOUT),
Q0 = riak_kv_clusteraae_fsm:convert_fold(Query),
case riak_kv_clusteraae_fsm:is_valid_fold(Q0) of
true ->
riak_kv_clusteraae_fsm_sup:start_clusteraae_fsm(Node,
[{raw, ReqId, Me},
[Q0, TimeOut]]),
riak_kv_clusteraae_fsm_sup:start_clusteraae_fsm(
Node, [{raw, ReqId, Me}, [Q0, TimeOut]]),
wait_for_fold_results(ReqId, TimeOut);
false ->
{error, "Invalid AAE fold definition"}
Expand All @@ -929,9 +932,8 @@ ttaaefs_fullsync(WorkItem) ->
-spec ttaaefs_fullsync(riak_kv_ttaaefs_manager:work_item(), integer()) -> ok.
ttaaefs_fullsync(WorkItem, SecsTimeout) ->
ReqId = mk_reqid(),
riak_kv_ttaaefs_manager:process_workitem(WorkItem,
ReqId,
os:timestamp()),
riak_kv_ttaaefs_manager:process_workitem(
WorkItem, ReqId, os:timestamp()),
wait_for_reqid(ReqId, SecsTimeout * 1000).

%% @doc
Expand All @@ -941,22 +943,42 @@ ttaaefs_fullsync(WorkItem, SecsTimeout) ->
erlang:timestamp()) -> ok.
ttaaefs_fullsync(WorkItem, SecsTimeout, Now) ->
ReqId = mk_reqid(),
riak_kv_ttaaefs_manager:process_workitem(WorkItem,
ReqId,
Now),
riak_kv_ttaaefs_manager:process_workitem(WorkItem, ReqId, Now),
wait_for_reqid(ReqId, SecsTimeout * 1000).

-spec repair_node() -> ok.
repair_node() ->
{ok, Ring} = riak_core_ring_manager:get_my_ring(),
NodeToRepair = node(),
PartitionsToRepair =
lists:filtermap(
fun({P, Node}) ->
case Node of
NodeToRepair ->
{true, P};
_ ->
false
end
end,
riak_core_ring:all_owners(Ring)),
[riak_kv_vnode:repair(P) || P <- PartitionsToRepair],
ok.

-spec tictacaae_suspend_node() -> ok.
tictacaae_suspend_node() ->
application:set_env(riak_kv, tictacaae_suspend, true).

-spec tictacaae_resume_node() -> ok.
tictacaae_resume_node() ->
application:set_env(riak_kv, tictacaae_suspend, false).

-spec participate_in_coverage(boolean()) -> ok.
participate_in_coverage(Participate) ->
F =
fun(Ring, _) ->
fun(R, _) ->
{new_ring,
riak_core_ring:update_member_meta(node(),
Ring,
node(),
participate_in_coverage,
Participate)}
riak_core_ring:update_member_meta(
node(), R, node(), participate_in_coverage, Participate)}
end,
{ok, _FinalRing} = riak_core_ring_manager:ring_trans(F, undefined),
ok.
Expand Down
13 changes: 9 additions & 4 deletions src/riak_kv_get_core.erl
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,15 @@
{error, any()} |
{fetch, list()}.
-type repair_reason() :: notfound | outofdate.
-type final_action() :: nop |
{read_repair, [{non_neg_integer() | repair_reason()}], riak_object:riak_object()} |
{delete_repair, [{non_neg_integer() | repair_reason()}], riak_object:riak_object()} |
delete.
-type final_action() ::
nop |
{read_repair,
[{non_neg_integer(), repair_reason()}],
riak_object:riak_object()} |
{delete_repair,
[{non_neg_integer(), repair_reason()}],
riak_object:riak_object()} |
delete.
-type idxresult() :: {non_neg_integer(), result()}.
-type idx_type() :: [{non_neg_integer, 'primary' | 'fallback'}].

Expand Down
50 changes: 45 additions & 5 deletions src/riak_kv_get_fsm.erl
Original file line number Diff line number Diff line change
Expand Up @@ -689,24 +689,64 @@ roll_d100() ->
-endif.

%% Issue read repairs for any vnodes that are out of date
read_repair(Indices, RepairObj,
read_repair(GetCoreIndices, RepairObj,
#state{req_id = ReqId, starttime = StartTime,
preflist2 = Sent, bkey = BKey, crdt_op = CrdtOp,
bucket_props = BucketProps, trace = Trace}) ->
RepairPreflist = [{Idx, Node} || {{Idx, Node}, _Type} <- Sent,
get_option(Idx, Indices) /= undefined],
RepairPreflist =
lists:filtermap(
fun({{Idx, Node}, Type}) ->
read_repair_index({{Idx, Node}, Type}, GetCoreIndices)
end,
Sent),
DocIdxList =
lists:map(
fun({{Idx, Node}, _Type, Reason}) ->
case app_helper:get_env(riak_kv, read_repair_log, false) of
true ->
lager:info(
"Read repair of ~p on ~w ~w for reason ~w",
[BKey, Idx, Node, Reason]);
false ->
ok
end,
{Idx, Node}
end,
RepairPreflist),
case Trace of
true ->
Ps = preflist_for_tracing(RepairPreflist),
?DTRACE(?C_GET_FSM_RR, [], Ps);
_ ->
ok
end,
riak_kv_vnode:readrepair(RepairPreflist, BKey, RepairObj, ReqId,
riak_kv_vnode:readrepair(DocIdxList, BKey, RepairObj, ReqId,
StartTime, [{returnbody, false},
{bucket_props, BucketProps},
{crdt_op, CrdtOp}]),
ok = riak_kv_stat:update({read_repairs, Indices, Sent}).
ok = riak_kv_stat:update({read_repairs, RepairPreflist}).

-spec read_repair_index(
{{non_neg_integer(), node()}, primary|fallback},
list({non_neg_integer(), outofdate|notfound})) ->
boolean()|
{true,
{{non_neg_integer(), node()},
primary|fallback,
outofdate|notfound}}.
read_repair_index({{Idx, Node}, Type}, Indices) ->
case get_option(Idx, Indices) of
undefined ->
false;
Reason ->
RRP = app_helper:get_env(riak_kv, read_repair_primaryonly, false),
case {RRP, Type} of
{true, fallback} ->
false;
_ ->
{true, {{Idx, Node}, Type, Reason}}
end
end.

get_option(Name, Options) ->
get_option(Name, Options, undefined).
Expand Down
27 changes: 13 additions & 14 deletions src/riak_kv_stat.erl
Original file line number Diff line number Diff line change
Expand Up @@ -268,9 +268,9 @@ do_update({index_fsm_time, Microsecs, ResultCount}) ->
ok = exometer:update([P, ?APP, index, fsm, complete], 1),
ok = exometer:update([P, ?APP, index, fsm, results], ResultCount),
ok = exometer:update([P, ?APP, index, fsm, time], Microsecs);
do_update({read_repairs, Indices, Preflist}) ->
do_update({read_repairs, Preflist}) ->
ok = exometer:update([?PFX, ?APP, node, gets, read_repairs], 1),
do_repairs(Indices, Preflist);
do_repairs(Preflist);
do_update({tictac_aae, ExchangeState}) ->
ok = exometer:update([?PFX, ?APP, node, tictacaae, ExchangeState], 1);
do_update({tictac_aae, ExchangeType, RepairCount}) ->
Expand Down Expand Up @@ -507,19 +507,18 @@ do_stages(Path, [{Stage, Time}|Stages]) ->
do_stages(Path, Stages).

%% create dimensioned stats for read repairs.
%% The indexes are from get core [{Index, Reason::notfound|outofdate}]
%% preflist is a preflist of [{{Index, Node}, Type::primary|fallback}]
do_repairs(Indices, Preflist) ->
%% The preflist has been filtered to remove those that will not be subject to
%% repair
do_repairs(Preflist) ->
Pfx = riak_core_stat:prefix(),
lists:foreach(fun({{Idx, Node}, Type}) ->
case proplists:get_value(Idx, Indices) of
undefined ->
ok;
Reason ->
create_or_update([Pfx, ?APP, node, gets, read_repairs, Node, Type, Reason], 1, spiral)
end
end,
Preflist).
lists:foreach(
fun({{_Idx, Node}, Type, Reason}) ->
create_or_update(
[Pfx, ?APP, node, gets, read_repairs, Node, Type, Reason],
1,
spiral)
end,
Preflist).

%% for dynamically created / dimensioned stats
%% that can't be registered at start up
Expand Down
Loading

0 comments on commit fbb5363

Please sign in to comment.