From fbb53630645e53af053228d526caa3c86f304066 Mon Sep 17 00:00:00 2001 From: Martin Sumner Date: Thu, 19 Jan 2023 11:11:30 +0000 Subject: [PATCH] Read repair - configure to repair primary only (#1844) * Read repair - configure to repair primary only By default, the behaviour should be unchanged. however it is now configurable to read repair primary vnodes only - fallback vnodes will not be repaired on failing GETs, they will only receive new PUTs. See schema change for more details. * get_option returns value not {K, V} * Add ability to suspend AAE * Add logging of read repairs Initially to troubleshoot in test - but perhaps of generic use. * Handle handoff put through standard put code Rather than replicate piecemeal the standard PUT code within do_diffobj_put/3, instead use do_handoff_put/3 and use standard prepare_put/2 and perform_put/3 functions used in normal PUTs. The effect of this is that any optimisations in the normal PUT workflow, will now automatically be used for handoffs. Of particular relevance at this point is the HEAD (not GET) before PUT optimisation available with leveled backend. If there are large objects, and objects which already exist in the receiving vnode are to be handed off (such as in hinted handoff), then this increases efficiency. Some spec improvements to help with some editors that do not like fun() type. Some indent reductions to improve readability. * Make HookReason part of PutArgs This allows the same code to be used for both handoff and put. * Revert defaulting of properties As riak_core updated to ensure bucket types are exchanged prior to join committing * Add helpful operator functions to riak_client To make recovery of nodes easier, adding some helper functions to riak_client. * Update branches Remove legacy thumbs * Update rebar.config --- .thumbs.yml | 11 -- priv/riak_kv.schema | 32 ++++- rebar.config | 8 +- src/riak_client.erl | 54 +++++--- src/riak_kv_get_core.erl | 13 +- src/riak_kv_get_fsm.erl | 50 +++++++- src/riak_kv_stat.erl | 27 ++-- src/riak_kv_vnode.erl | 268 ++++++++++++++++++--------------------- 8 files changed, 262 insertions(+), 201 deletions(-) delete mode 100644 .thumbs.yml diff --git a/.thumbs.yml b/.thumbs.yml deleted file mode 100644 index f28fff7577..0000000000 --- a/.thumbs.yml +++ /dev/null @@ -1,11 +0,0 @@ -minimum_reviewers: 2 -merge: true -build_steps: - - make clean - - make deps - - make compile - - make test - - make xref - - make dialyzer -org_mode: true -timeout: 1800 diff --git a/priv/riak_kv.schema b/priv/riak_kv.schema index f19835e887..c8c6f2c1cb 100644 --- a/priv/riak_kv.schema +++ b/priv/riak_kv.schema @@ -1461,7 +1461,35 @@ %% converted to an overflow queue as part of this release. Instead use %% `replrtq_overflow_limit` to control the queue size, including on-disk size. {mapping, "replrtq_srcqueuelimit", "riak_kv.replrtq_srcqueuelimit", [ - {datatype, integer}, - {default, 300000}, + {datatype, integer}, + {default, 300000}, hidden ]}. + +%% @doc Choose to read repair to primary vnodes only +%% When fallback vnodes are elected, then read repair will by default repair +%% any missing data from the vnode - i.e. every GET while the fallback is in +%% play will lead to a PUT to add the rewuested object to the fallback vnode, +%% as the fallback by default starts empty. +%% If the expectation is that failed vnodes are replaced quickly, as would be +%% possible in a Cloud scenario, this may not be desirable. Read repair to +%% fallbacks reduce throughput in failure scenarios, and then the hinted +%% handoffs following recovery are impaired by the historic data which is +%% already in the recovered node, and has to be handed off as well as the +%% fresh updates received since the failure. +%% When fallback vnodes are expected to be in place for a long period, the +%% default setting of read repairing fallbacks may be preferred, as it will +%% provide additional data resilience, and potentially improved performance +%% where the same objects are repeatedly fetched. +{mapping, "read_repair_primaryonly", "riak_kv.read_repair_primaryonly", [ + {datatype, {flag, enabled, disabled}}, + {default, disabled} +]}. + +%% @doc If reads discovers keys to be repaired, should each key +%% that is repaired be logged +{mapping, "read_repair_log", "riak_kv.read_repair_log", [ + {datatype, {flag, enabled, disabled}}, + {default, disabled}, + hidden +]}. \ No newline at end of file diff --git a/rebar.config b/rebar.config index 0f9baeaafa..c37a83b06f 100644 --- a/rebar.config +++ b/rebar.config @@ -42,16 +42,16 @@ ]}. {deps, [ - {riak_core, {git, "https://github.com/basho/riak_core.git", {tag, "riak_kv-3.0.12"}}}, + {riak_core, {git, "https://github.com/basho/riak_core.git", {branch, "develop-3.0"}}}, {sidejob, {git, "https://github.com/basho/sidejob.git", {tag, "2.1.0"}}}, {bitcask, {git, "https://github.com/basho/bitcask.git", {tag, "2.1.0"}}}, {redbug, {git, "https://github.com/massemanet/redbug", {tag, "1.2.2"}}}, {recon, {git, "https://github.com/ferd/recon", {tag, "2.5.2"}}}, {sext, {git, "https://github.com/uwiger/sext.git", {tag, "1.4.1"}}}, - {riak_pipe, {git, "https://github.com/basho/riak_pipe.git", {tag, "riak_kv-3.0.12"}}}, + {riak_pipe, {git, "https://github.com/basho/riak_pipe.git", {branch, "develop-3.0"}}}, {riak_dt, {git, "https://github.com/basho/riak_dt.git", {tag, "riak_kv-3.0.0"}}}, - {riak_api, {git, "https://github.com/basho/riak_api.git", {tag, "riak_kv-3.0.12"}}}, + {riak_api, {git, "https://github.com/basho/riak_api.git", {branch, "develop-3.0"}}}, {hyper, {git, "https://github.com/basho/hyper", {tag, "1.1.0"}}}, - {kv_index_tictactree, {git, "https://github.com/martinsumner/kv_index_tictactree.git", {tag, "1.0.4"}}}, + {kv_index_tictactree, {git, "https://github.com/martinsumner/kv_index_tictactree.git", {branch, "develop-3.0"}}}, {riakhttpc, {git, "https://github.com/basho/riak-erlang-http-client", {tag, "3.0.10"}}} ]}. diff --git a/src/riak_client.erl b/src/riak_client.erl index 7c18dd9bf0..9a0e441df1 100644 --- a/src/riak_client.erl +++ b/src/riak_client.erl @@ -50,7 +50,9 @@ -export([ensemble/1]). -export([fetch/2, push/4]). -export([membership_request/1, replrtq_reset_all_peers/1, replrtq_reset_all_workercounts/2]). +-export([tictacaae_suspend_node/0, tictacaae_resume_node/0]). -export([remove_node_from_coverage/0, reset_node_for_coverage/0]). +-export([repair_node/0]). -compile({no_auto_import,[put/2]}). %% @type default_timeout() = 60000 @@ -901,13 +903,14 @@ aae_fold(Query) -> aae_fold(Query, {?MODULE, [Node, _ClientId]}) -> Me = self(), ReqId = mk_reqid(), - TimeOut = ?DEFAULT_FOLD_TIMEOUT, + TimeOut = + app_helper:get_env( + riak_kv, riak_client_aaefold_timeout, ?DEFAULT_FOLD_TIMEOUT), Q0 = riak_kv_clusteraae_fsm:convert_fold(Query), case riak_kv_clusteraae_fsm:is_valid_fold(Q0) of true -> - riak_kv_clusteraae_fsm_sup:start_clusteraae_fsm(Node, - [{raw, ReqId, Me}, - [Q0, TimeOut]]), + riak_kv_clusteraae_fsm_sup:start_clusteraae_fsm( + Node, [{raw, ReqId, Me}, [Q0, TimeOut]]), wait_for_fold_results(ReqId, TimeOut); false -> {error, "Invalid AAE fold definition"} @@ -929,9 +932,8 @@ ttaaefs_fullsync(WorkItem) -> -spec ttaaefs_fullsync(riak_kv_ttaaefs_manager:work_item(), integer()) -> ok. ttaaefs_fullsync(WorkItem, SecsTimeout) -> ReqId = mk_reqid(), - riak_kv_ttaaefs_manager:process_workitem(WorkItem, - ReqId, - os:timestamp()), + riak_kv_ttaaefs_manager:process_workitem( + WorkItem, ReqId, os:timestamp()), wait_for_reqid(ReqId, SecsTimeout * 1000). %% @doc @@ -941,22 +943,42 @@ ttaaefs_fullsync(WorkItem, SecsTimeout) -> erlang:timestamp()) -> ok. ttaaefs_fullsync(WorkItem, SecsTimeout, Now) -> ReqId = mk_reqid(), - riak_kv_ttaaefs_manager:process_workitem(WorkItem, - ReqId, - Now), + riak_kv_ttaaefs_manager:process_workitem(WorkItem, ReqId, Now), wait_for_reqid(ReqId, SecsTimeout * 1000). +-spec repair_node() -> ok. +repair_node() -> + {ok, Ring} = riak_core_ring_manager:get_my_ring(), + NodeToRepair = node(), + PartitionsToRepair = + lists:filtermap( + fun({P, Node}) -> + case Node of + NodeToRepair -> + {true, P}; + _ -> + false + end + end, + riak_core_ring:all_owners(Ring)), + [riak_kv_vnode:repair(P) || P <- PartitionsToRepair], + ok. + +-spec tictacaae_suspend_node() -> ok. +tictacaae_suspend_node() -> + application:set_env(riak_kv, tictacaae_suspend, true). + +-spec tictacaae_resume_node() -> ok. +tictacaae_resume_node() -> + application:set_env(riak_kv, tictacaae_suspend, false). -spec participate_in_coverage(boolean()) -> ok. participate_in_coverage(Participate) -> F = - fun(Ring, _) -> + fun(R, _) -> {new_ring, - riak_core_ring:update_member_meta(node(), - Ring, - node(), - participate_in_coverage, - Participate)} + riak_core_ring:update_member_meta( + node(), R, node(), participate_in_coverage, Participate)} end, {ok, _FinalRing} = riak_core_ring_manager:ring_trans(F, undefined), ok. diff --git a/src/riak_kv_get_core.erl b/src/riak_kv_get_core.erl index 43e49df9b2..ffd8df5756 100644 --- a/src/riak_kv_get_core.erl +++ b/src/riak_kv_get_core.erl @@ -37,10 +37,15 @@ {error, any()} | {fetch, list()}. -type repair_reason() :: notfound | outofdate. --type final_action() :: nop | - {read_repair, [{non_neg_integer() | repair_reason()}], riak_object:riak_object()} | - {delete_repair, [{non_neg_integer() | repair_reason()}], riak_object:riak_object()} | - delete. +-type final_action() :: + nop | + {read_repair, + [{non_neg_integer(), repair_reason()}], + riak_object:riak_object()} | + {delete_repair, + [{non_neg_integer(), repair_reason()}], + riak_object:riak_object()} | + delete. -type idxresult() :: {non_neg_integer(), result()}. -type idx_type() :: [{non_neg_integer, 'primary' | 'fallback'}]. diff --git a/src/riak_kv_get_fsm.erl b/src/riak_kv_get_fsm.erl index 1a1ad42f21..8dd1937fa8 100644 --- a/src/riak_kv_get_fsm.erl +++ b/src/riak_kv_get_fsm.erl @@ -689,12 +689,30 @@ roll_d100() -> -endif. %% Issue read repairs for any vnodes that are out of date -read_repair(Indices, RepairObj, +read_repair(GetCoreIndices, RepairObj, #state{req_id = ReqId, starttime = StartTime, preflist2 = Sent, bkey = BKey, crdt_op = CrdtOp, bucket_props = BucketProps, trace = Trace}) -> - RepairPreflist = [{Idx, Node} || {{Idx, Node}, _Type} <- Sent, - get_option(Idx, Indices) /= undefined], + RepairPreflist = + lists:filtermap( + fun({{Idx, Node}, Type}) -> + read_repair_index({{Idx, Node}, Type}, GetCoreIndices) + end, + Sent), + DocIdxList = + lists:map( + fun({{Idx, Node}, _Type, Reason}) -> + case app_helper:get_env(riak_kv, read_repair_log, false) of + true -> + lager:info( + "Read repair of ~p on ~w ~w for reason ~w", + [BKey, Idx, Node, Reason]); + false -> + ok + end, + {Idx, Node} + end, + RepairPreflist), case Trace of true -> Ps = preflist_for_tracing(RepairPreflist), @@ -702,11 +720,33 @@ read_repair(Indices, RepairObj, _ -> ok end, - riak_kv_vnode:readrepair(RepairPreflist, BKey, RepairObj, ReqId, + riak_kv_vnode:readrepair(DocIdxList, BKey, RepairObj, ReqId, StartTime, [{returnbody, false}, {bucket_props, BucketProps}, {crdt_op, CrdtOp}]), - ok = riak_kv_stat:update({read_repairs, Indices, Sent}). + ok = riak_kv_stat:update({read_repairs, RepairPreflist}). + +-spec read_repair_index( + {{non_neg_integer(), node()}, primary|fallback}, + list({non_neg_integer(), outofdate|notfound})) -> + boolean()| + {true, + {{non_neg_integer(), node()}, + primary|fallback, + outofdate|notfound}}. +read_repair_index({{Idx, Node}, Type}, Indices) -> + case get_option(Idx, Indices) of + undefined -> + false; + Reason -> + RRP = app_helper:get_env(riak_kv, read_repair_primaryonly, false), + case {RRP, Type} of + {true, fallback} -> + false; + _ -> + {true, {{Idx, Node}, Type, Reason}} + end + end. get_option(Name, Options) -> get_option(Name, Options, undefined). diff --git a/src/riak_kv_stat.erl b/src/riak_kv_stat.erl index 403a4e770f..f21679aa20 100644 --- a/src/riak_kv_stat.erl +++ b/src/riak_kv_stat.erl @@ -268,9 +268,9 @@ do_update({index_fsm_time, Microsecs, ResultCount}) -> ok = exometer:update([P, ?APP, index, fsm, complete], 1), ok = exometer:update([P, ?APP, index, fsm, results], ResultCount), ok = exometer:update([P, ?APP, index, fsm, time], Microsecs); -do_update({read_repairs, Indices, Preflist}) -> +do_update({read_repairs, Preflist}) -> ok = exometer:update([?PFX, ?APP, node, gets, read_repairs], 1), - do_repairs(Indices, Preflist); + do_repairs(Preflist); do_update({tictac_aae, ExchangeState}) -> ok = exometer:update([?PFX, ?APP, node, tictacaae, ExchangeState], 1); do_update({tictac_aae, ExchangeType, RepairCount}) -> @@ -507,19 +507,18 @@ do_stages(Path, [{Stage, Time}|Stages]) -> do_stages(Path, Stages). %% create dimensioned stats for read repairs. -%% The indexes are from get core [{Index, Reason::notfound|outofdate}] -%% preflist is a preflist of [{{Index, Node}, Type::primary|fallback}] -do_repairs(Indices, Preflist) -> +%% The preflist has been filtered to remove those that will not be subject to +%% repair +do_repairs(Preflist) -> Pfx = riak_core_stat:prefix(), - lists:foreach(fun({{Idx, Node}, Type}) -> - case proplists:get_value(Idx, Indices) of - undefined -> - ok; - Reason -> - create_or_update([Pfx, ?APP, node, gets, read_repairs, Node, Type, Reason], 1, spiral) - end - end, - Preflist). + lists:foreach( + fun({{_Idx, Node}, Type, Reason}) -> + create_or_update( + [Pfx, ?APP, node, gets, read_repairs, Node, Type, Reason], + 1, + spiral) + end, + Preflist). %% for dynamically created / dimensioned stats %% that can't be registered at start up diff --git a/src/riak_kv_vnode.erl b/src/riak_kv_vnode.erl index 377eebed22..1fc0a18469 100644 --- a/src/riak_kv_vnode.erl +++ b/src/riak_kv_vnode.erl @@ -252,7 +252,8 @@ is_index=false :: boolean(), %% set if the b/end supports indexes crdt_op = undefined :: undefined | term(), %% if set this is a crdt operation hash_ops = no_hash_ops, - sync_on_write = undefined :: undefined | atom() + sync_on_write = undefined :: undefined | atom(), + reason = riak_kv_update_hook:reason() }). -type putargs() :: #putargs{}. @@ -520,7 +521,9 @@ test_vnode(I) -> riak_core_vnode:start_link(riak_kv_vnode, I, infinity). --spec aae_send(tuple()) -> fun(). +-spec aae_send( + tuple()) -> + fun((term(), list(riak_core_apl:preflist()), blue|pink) -> ok). %% @doc %% Return a function which will send an aae request to a given vnode, and can %% prompt the response to be received by sender @@ -911,53 +914,40 @@ handle_command({aae, AAERequest, IndexNs, Colour}, Sender, State) -> fun(R) -> riak_core_vnode:reply(Sender, {reply, R, Colour}) end, - case State#state.tictac_aae of + MaybeSuspend = app_helper:get_env(riak_kv, tictacaae_suspend, false), + case State#state.tictac_aae and (not MaybeSuspend) of false -> ReturnFun(not_supported); true -> Cntrl = State#state.aae_controller, case AAERequest of fetch_root -> - aae_controller:aae_mergeroot(Cntrl, - IndexNs, - ReturnFun); + aae_controller:aae_mergeroot( + Cntrl, IndexNs, ReturnFun); {fetch_branches, BranchIDs} -> - aae_controller:aae_mergebranches(Cntrl, - IndexNs, - BranchIDs, - ReturnFun); + aae_controller:aae_mergebranches( + Cntrl, IndexNs, BranchIDs, ReturnFun); {fetch_clocks, SegmentIDs} -> IndexNFun = fun(B, K) -> riak_kv_util:get_index_n({B, K}) end, - aae_controller:aae_fetchclocks(Cntrl, - IndexNs, - SegmentIDs, - ReturnFun, - IndexNFun); + aae_controller:aae_fetchclocks( + Cntrl, IndexNs, SegmentIDs, ReturnFun, IndexNFun); {fetch_clocks, SegmentIDs, MR} -> IndexNFun = fun(B, K) -> riak_kv_util:get_index_n({B, K}) end, ModifiedLimiter = aaefold_setmodifiedlimiter(MR), - aae_controller:aae_fetchclocks(Cntrl, - IndexNs, - all, - SegmentIDs, - ModifiedLimiter, - ReturnFun, - IndexNFun); + aae_controller:aae_fetchclocks( + Cntrl, IndexNs, all, SegmentIDs, + ModifiedLimiter, ReturnFun, IndexNFun); {fetch_clocks_range, Bucket, KR, SF, MR} -> IndexNFun = fun(B, K) -> riak_kv_util:get_index_n({B, K}) end, RangeLimiter = aaefold_setrangelimiter(Bucket, KR), ModifiedLimiter = aaefold_setmodifiedlimiter(MR), SegmentIDs = element(2, SF), - aae_controller:aae_fetchclocks(Cntrl, - IndexNs, - RangeLimiter, - SegmentIDs, - ModifiedLimiter, - ReturnFun, - IndexNFun) + aae_controller:aae_fetchclocks( + Cntrl, IndexNs, RangeLimiter, SegmentIDs, + ModifiedLimiter, ReturnFun, IndexNFun) end end, @@ -1191,7 +1181,19 @@ handle_command(tictacaae_exchangepoke, _Sender, State) -> XTick = app_helper:get_env(riak_kv, tictacaae_exchangetick), riak_core_vnode:send_command_after(XTick, tictacaae_exchangepoke), Idx = State#state.idx, - case {State#state.tictac_exchangequeue, State#state.tictac_skiptick} of + CurrentSkipCount = + case State#state.tictac_skiptick of + 0 -> + case app_helper:get_env(riak_kv, tictacaae_suspend, false) of + true -> + 1; + _ -> + 0 + end; + SC when SC > 0 -> + SC + end, + case {State#state.tictac_exchangequeue, CurrentSkipCount} of {[], _} -> {ok, Ring} = riak_core_ring_manager:get_my_ring(), @@ -2371,16 +2373,16 @@ handle_handoff_data(BinObj, State) -> try {BKey, Val} = decode_binary_object(BinObj), {B, K} = BKey, - case do_diffobj_put(BKey, riak_object:from_binary(B, K, Val), - State) of + case do_handoff_put( + BKey, riak_object:from_binary(B, K, Val), State) of {ok, State2} -> {reply, ok, State2}; {error, Reason, State2} -> {reply, {error, Reason}, State2} end catch Error:Reason2 -> - lager:warning("Unreadable object discarded in handoff: ~p:~p", - [Error, Reason2]), + lager:warning( + "Unreadable object discarded in handoff: ~p:~p", [Error, Reason2]), {reply, ok, State} end. @@ -2701,18 +2703,22 @@ do_put(Sender, {Bucket, _Key}=BKey, RObj, ReqID, StartTime, Options, State) -> Coord = proplists:get_value(coord, Options, false), SyncOnWrite = proplists:get_value(sync_on_write, Options, undefined), CRDTOp = proplists:get_value(counter_op, Options, proplists:get_value(crdt_op, Options, undefined)), - PutArgs = #putargs{returnbody=proplists:get_value(returnbody,Options,false) orelse Coord, - coord=Coord, - lww=proplists:get_value(last_write_wins, BProps, false), - bkey=BKey, - robj=RObj, - reqid=ReqID, - bprops=BProps, - starttime=StartTime, - readrepair = ReadRepair, - prunetime=PruneTime, - crdt_op = CRDTOp, - sync_on_write = SyncOnWrite}, + PutArgs = + #putargs{ + returnbody = + proplists:get_value(returnbody,Options,false) orelse Coord, + coord=Coord, + lww=proplists:get_value(last_write_wins, BProps, false), + bkey=BKey, + robj=RObj, + reqid=ReqID, + bprops=BProps, + starttime=StartTime, + readrepair = ReadRepair, + prunetime=PruneTime, + crdt_op = CRDTOp, + sync_on_write = SyncOnWrite, + reason = put}, {PrepPutRes, UpdPutArgs, State2} = prepare_put(State, PutArgs), {Reply, UpdState} = perform_put(PrepPutRes, State2, UpdPutArgs), riak_core_vnode:reply(Sender, Reply), @@ -2823,39 +2829,31 @@ prepare_blind_put(Coord, RObj, VId, StartTime, PutArgs, State) -> {{true, {ObjToStore, unknown_no_old_object}}, PutArgs#putargs{is_index = false}, State}. -prepare_read_before_write_put(#state{mod = Mod, - modstate = ModState, - md_cache = MDCache}=State, - #putargs{bkey={Bucket, Key}=BKey, - robj=RObj, - coord=Coord}=PutArgs, - IndexBackend, IsSearchable) -> - {CacheClock, CacheData} = maybefetch_clock_and_indexdata(MDCache, - BKey, - Mod, - ModState, - Coord, - IsSearchable), +prepare_read_before_write_put( + #state{mod = Mod, modstate = ModState, md_cache = MDCache}=State, + #putargs{bkey={Bucket, Key}=BKey, robj=RObj, coord=Coord}=PutArgs, + IndexBackend, + IsSearchable) -> + {CacheClock, CacheData} = + maybefetch_clock_and_indexdata( + MDCache, BKey, Mod, ModState, Coord, IsSearchable), {GetReply, RequiresGet} = case CacheClock of not_found -> {not_found, false}; _ -> - ReqGet = determine_requires_get(CacheClock, - RObj, - IsSearchable), - {get_old_object_or_fake(ReqGet, - Bucket, Key, - Mod, ModState, - CacheClock), + ReqGet = + determine_requires_get(CacheClock, RObj, IsSearchable), + {get_old_object_or_fake( + ReqGet, Bucket, Key, Mod, ModState, CacheClock), ReqGet} end, case GetReply of not_found -> prepare_put_new_object(State, PutArgs, IndexBackend); {ok, OldObj} -> - prepare_put_existing_object(State, PutArgs, OldObj, - IndexBackend, CacheData, RequiresGet) + prepare_put_existing_object( + State, PutArgs, OldObj, IndexBackend, CacheData, RequiresGet) end. prepare_put_existing_object(#state{idx =Idx} = State, @@ -3016,7 +3014,8 @@ perform_put({true, {_Obj, _OldObj}=Objects}, coord=Coord, index_specs=IndexSpecs, readrepair=ReadRepair, - sync_on_write=SyncOnWrite}) -> + sync_on_write=SyncOnWrite, + reason=HookReason}) -> case ReadRepair of true -> MaxCheckFlag = no_max_check; @@ -3038,13 +3037,15 @@ perform_put({true, {_Obj, _OldObj}=Objects}, false end, {Reply, State2} = - actual_put(BKey, Objects, IndexSpecs, RB, ReqID, MaxCheckFlag, - {Coord, Sync}, State), + actual_put( + BKey, Objects, IndexSpecs, RB, ReqID, MaxCheckFlag, + {Coord, Sync}, HookReason, State), {Reply, State2}. actual_put(BKey, {Obj, OldObj}, IndexSpecs, RB, ReqID, State) -> - actual_put(BKey, {Obj, OldObj}, IndexSpecs, RB, ReqID, do_max_check, - {false, false}, State). + actual_put( + BKey, {Obj, OldObj}, IndexSpecs, RB, ReqID, do_max_check, + {false, false}, put, State). actual_put(BKey={Bucket, Key}, {Obj, OldObj}, @@ -3052,6 +3053,7 @@ actual_put(BKey={Bucket, Key}, RB, ReqID, MaxCheckFlag, {Coord, Sync}, + HookReason, State=#state{idx=Idx, mod=Mod, modstate=ModState, @@ -3065,7 +3067,8 @@ actual_put(BKey={Bucket, Key}, State#state.enable_nextgenreplsrc, State#state.sizelimit_nextgenreplsrc), maybe_cache_object(BKey, Obj, State), - maybe_update(UpdateHook, {Obj, maybe_old_object(OldObj)}, put, Idx), + maybe_update( + UpdateHook, {Obj, maybe_old_object(OldObj)}, HookReason, Idx), Reply = case RB of true -> {dw, Idx, Obj, ReqID}; @@ -3095,13 +3098,16 @@ do_reformat({Bucket, Key}=BKey, State=#state{mod=Mod, modstate=ModState}) -> %% to the desired version, to reformat, all we need to do %% is submit a new write ST = riak_core_util:moment(), - PutArgs = #putargs{hash_ops = update, - returnbody = false, - bkey = BKey, - index_specs = [], - coord = false, - lww = false, - starttime = ST}, + PutArgs = + #putargs{ + hash_ops = update, + returnbody = false, + bkey = BKey, + index_specs = [], + coord = false, + lww = false, + starttime = ST, + reason = put}, case perform_put({true, {RObj, unchanged_no_old_object}}, State, PutArgs) of {{fail, _, Reason}, UpdState} -> Reply = {error, Reason}; @@ -3529,7 +3535,10 @@ do_fold(Fun, Acc0, Sender, ReqOpts, State=#state{async_folding=AsyncFolding, {reply, ER, State} end. --spec maybe_use_fold_heads(list(), list(), atom()) -> fun(). +-type foldfun() :: + fun((riak_kv_backend:fold_objects_fun(), any(), list(), term()) -> + {ok, any()}|{async, fun(() -> any())}|{error, term()}). +-spec maybe_use_fold_heads(list(), list(), atom()) -> foldfun(). %% @private %% If the fold can potential service requests through headers of objects alone, %% then the fold_heads function can be used on the backend if it suppports that @@ -3607,69 +3616,38 @@ do_get_vclock({Bucket, Key}, Mod, ModState) -> %% @private %% upon receipt of a handoff datum, there is no client FSM -do_diffobj_put({Bucket, Key}=BKey, DiffObj, - StateData=#state{mod=Mod, - modstate=ModState, - idx=Idx, - update_hook=UpdateHook}) -> - StartTS = os:timestamp(), - {ok, Capabilities} = Mod:capabilities(Bucket, ModState), - IndexBackend = lists:member(indexes, Capabilities), - maybe_cache_evict(BKey, StateData), - case do_get_object(Bucket, Key, Mod, ModState) of - {error, not_found, _UpdModState} -> - case IndexBackend of - true -> - IndexSpecs = riak_object:index_specs(DiffObj); - false -> - IndexSpecs = [] - end, - {_, State2, DiffObj2} = maybe_new_actor_epoch(DiffObj, StateData), - case encode_and_put(DiffObj2, Mod, Bucket, Key, - IndexSpecs, ModState, no_max_check, false) of - {{ok, UpdModState}, EncodedVal} -> - aae_update(Bucket, Key, - DiffObj2, confirmed_no_old_object, EncodedVal, - StateData), - update_index_write_stats(IndexBackend, IndexSpecs), - update_vnode_stats(vnode_put, Idx, StartTS), - maybe_update(UpdateHook, {DiffObj, no_old_object}, handoff, Idx), - {ok, State2#state{modstate=UpdModState}}; - {{error, Reason, UpdModState}, _Val} -> - {error, Reason, State2#state{modstate=UpdModState}} - end; - {ok, OldObj, _UpdModState} -> - %% Merge handoff values with the current - possibly - %% discarding if out of date. - {IsNewEpoch, ActorId, State2} = maybe_new_key_epoch(false, StateData, OldObj, DiffObj), - case put_merge(false, false, OldObj, DiffObj, {IsNewEpoch, ActorId}, undefined) of - {oldobj, _} -> - {ok, State2}; - {newobj, NewObj} -> - {ok, AMObj} = enforce_allow_mult(NewObj, OldObj, riak_core_bucket:get_bucket(Bucket)), - case IndexBackend of - true -> - IndexSpecs = riak_object:diff_index_specs(AMObj, OldObj); - false -> - IndexSpecs = [] - end, - case encode_and_put(AMObj, Mod, Bucket, Key, - IndexSpecs, ModState, - no_max_check, false) of - {{ok, UpdModState}, EncodedVal} -> - aae_update(Bucket, Key, - AMObj, OldObj, EncodedVal, - StateData), - update_index_write_stats(IndexBackend, IndexSpecs), - update_vnode_stats(vnode_put, Idx, StartTS), - maybe_update(UpdateHook, - {AMObj, maybe_old_object(OldObj)}, - handoff, Idx), - {ok, State2#state{modstate=UpdModState}}; - {{error, Reason, UpdModState}, _Val} -> - {error, Reason, State2#state{modstate=UpdModState}} - end - end +-spec do_handoff_put( + {riak_object:bucket(), riak_object:key()}, + riak_object:riak_object(), + state()) -> {error, term(), state()}|{ok, state()}. +do_handoff_put({Bucket, _Key}=BKey, HandoffObj, State) -> + BProps = riak_core_bucket:get_bucket(Bucket), + PutArgs = + #putargs{ + returnbody = false, + coord = false, + lww = proplists:get_value(last_write_wins, BProps, false), + bkey = BKey, + robj = HandoffObj, + reqid = erlang:phash2({self(), os:timestamp()}), + bprops = BProps, + starttime = riak_core_util:moment(), + readrepair = false, + prunetime = undefined, + crdt_op = undefined, + % crdt_op is used only to increment stats counters for CRDT + % changes. These stats were not incremented when using the + % do_diffobj_put/3, and so this is undefined to maintain + % that behaviour + sync_on_write = undefined, + reason = handoff}, + {PrepPutRes, UpdPutArgs, State2} = prepare_put(State, PutArgs), + {Reply, UpdState} = perform_put(PrepPutRes, State2, UpdPutArgs), + case Reply of + {fail, _Idx, Reason} -> + {error, Reason, UpdState}; + _ -> + {ok, UpdState} end.