diff --git a/priv/riak_kv.schema b/priv/riak_kv.schema index c91bf311df..31ff02c2b2 100644 --- a/priv/riak_kv.schema +++ b/priv/riak_kv.schema @@ -30,6 +30,17 @@ {default, passive} ]}. +%% @doc Use hashtree tokens for anti-entropy throttling +%% To hold-up the vnode when there is a backlog of activity on the AAE store +%% hashtree token bucket may be used to block the vnode every 90 puts until +%% the PUT has been completed. This use aae_ping with tictac_aae, and a full +%% sync block with legacy anti-entropy +{mapping, "aae_tokenbucket", "riak_kv.aae_tokenbucket", [ + {datatype, {flag, enabled, disabled}}, + {default, enabled}, + {commented, enabled} +]}. + %% @doc A path under which aae data files will be stored. {mapping, "tictacaae_dataroot", "riak_kv.tictacaae_dataroot", [ {default, "$(platform_data_dir)/tictac_aae"}, @@ -69,7 +80,7 @@ %% @doc Store heads in parallel key stores %% If running a parallel key store, the whole "head" object may be stored to %% allow for fold_heads queries to be run against the parallel store. -%% Alternatively, the cost of the paralle key store can be reduced by storing +%% Alternatively, the cost of the parallel key store can be reduced by storing %% only a minimal data set necessary for AAE and monitoring {mapping, "tictacaae_storeheads", "riak_kv.tictacaae_storeheads", [ {datatype, {flag, enabled, disabled}}, @@ -79,23 +90,37 @@ %% @doc Frequency to prompt exchange per vnode %% The number of milliseconds which the vnode must wait between self-pokes to -%% maybe prompt the next exchange. Default is 2 minutes 30 seconds. -%% Note if this is to be reduced below this value the riak_core -%% vnode_inactivity_timeout should also be reduced or handoffs may be -%% blocked. To be safe the vnode_inactivity_timeout must be < 0.5 * the -%% tictacaae_exchangetick. +%% maybe prompt the next exchange. Default is 4 minutes - check all partitions +%% when n=3 every 30 minutes. +%% Note if this is to be reduced further the riak_core vnode_inactivity_timeout +%% should also be reduced or handoffs may be blocked. To be safe the +%% vnode_inactivity_timeout must be < 0.5 * the tictacaae_exchangetick. {mapping, "tictacaae_exchangetick", "riak_kv.tictacaae_exchangetick", [ {datatype, integer}, - {default, 150000}, + {default, 240000}, hidden ]}. %% @doc Frequency to prompt rebuild check per vnode %% The number of milliseconds which the vnode must wait between self-pokes to -%% maybe prompt the next rebuild. Default is 30 minutes. +%% maybe prompt the next rebuild. Default is 60 minutes. +%% When a node is being re-introduced to a cluster following a long delay, then +%% increase this tick prior to the reintroduction. This will reduce +%% the concurrency of some activity e.g. handoffs and rebuilds {mapping, "tictacaae_rebuildtick", "riak_kv.tictacaae_rebuildtick", [ {datatype, integer}, - {default, 1800000}, + {default, 3600000}, + hidden +]}. + +%% @doc Max number of leaf IDs per exchange +%% To control the length of time for each exchange, only a subset of the +%% conflicting leaves will be compared on each exchange. If there are issues +%% with query timeouts this may be halved. Large backlogs may be reduced +%% faster by doubling. There are 1M segments in a standard tree overall. +{mapping, "tictacaae_maxresults", "riak_kv.tictacaae_maxresults", [ + {datatype, integer}, + {default, 256}, hidden ]}. diff --git a/src/riak_kv_exchange_fsm.erl b/src/riak_kv_exchange_fsm.erl index 78aa72f33a..93e347865c 100644 --- a/src/riak_kv_exchange_fsm.erl +++ b/src/riak_kv_exchange_fsm.erl @@ -33,7 +33,8 @@ %% FSM states -export([prepare_exchange/2, update_trees/2, - key_exchange/2]). + key_exchange/2, + repair_consistent/1]). %% gen_fsm callbacks -export([init/1, handle_event/3, handle_sync_event/4, handle_info/3, diff --git a/src/riak_kv_get_fsm.erl b/src/riak_kv_get_fsm.erl index 58da747d9e..26f103d0ce 100644 --- a/src/riak_kv_get_fsm.erl +++ b/src/riak_kv_get_fsm.erl @@ -101,6 +101,8 @@ -define(DEFAULT_RT, head). -define(DEFAULT_NC, 0). -define(QUEUE_EMPTY_LOOPS, 8). +-define(MIN_REPAIRTIME_MS, 10000). +-define(MIN_REPAIRPAUSE_MS, 10). %% =================================================================== %% Public API @@ -641,23 +643,56 @@ finalize(StateData=#state{get_core = GetCore, trace = Trace, req_id = ReqID, end, {stop,normal,StateData}. - +-spec prompt_readrepair([{riak_core_ring:partition_id(), node()}]) -> + fun((list({{riak_object:bucket(), riak_object:key()}, + {vclock:vclock(), vclock:vclock()}})) -> ok). prompt_readrepair(VnodeList) -> + prompt_readrepair(VnodeList, + app_helper:get_env(riak_kv, log_readrepair, false)). + +prompt_readrepair(VnodeList, LogRepair) -> {ok, C} = riak:local_client(), FetchFun = fun({{B, K}, {_BlueClock, _PinkClock}}) -> - riak_client:get(B, K, C) + case riak_kv_util:consistent_object(B) of + true -> + riak_kv_exchange_fsm:repair_consistent({B, K}); + false -> + riak_client:get(B, K, C) + end end, - RehashFun = - fun({{B, K}, {_BlueClock, _PinkClock}}) -> - riak_kv_vnode:rehash(VnodeList, B, K) + LogFun = + fun({{B, K}, {BlueClock, PinkClock}}) -> + lager:info( + "Prompted read repair Bucket=~w Key=~w Clocks ~w ~w", + [B, K, BlueClock, PinkClock]) end, fun(RepairList) -> - lager:info("Repairing ~w keys between ~w", - [length(RepairList), VnodeList]), + SW = os:timestamp(), + RepairCount = length(RepairList), + lager:info("Repairing key_count=~w between ~w", + [RepairCount, VnodeList]), + Pause = + max(?MIN_REPAIRPAUSE_MS, + ?MIN_REPAIRTIME_MS div max(1, RepairCount)), + RehashFun = + fun({{B, K}, {_BlueClock, _PinkClock}}) -> + timer:sleep(Pause), + riak_kv_vnode:rehash(VnodeList, B, K) + end, lists:foreach(FetchFun, RepairList), - timer:sleep(1000), % Sleep for repairs to complete - lists:foreach(RehashFun, RepairList) + lists:foreach(RehashFun, RepairList), + case LogRepair of + true -> + lists:foreach(LogFun, RepairList); + false -> + ok + end, + lager:info("Repaired key_count=~w " ++ + "in repair_time=~w ms with pause_time=~w ms", + [RepairCount, + timer:now_diff(os:timestamp(), SW) div 1000, + RepairCount * Pause]) end. reply_fun({EndStateName, DeltaCount}) -> diff --git a/src/riak_kv_ttaaefs_manager.erl b/src/riak_kv_ttaaefs_manager.erl index 114fdd87c3..55bb98da61 100644 --- a/src/riak_kv_ttaaefs_manager.erl +++ b/src/riak_kv_ttaaefs_manager.erl @@ -782,7 +782,7 @@ take_next_workitem([], Wants, ScheduleStartTime, SlotInfo, SliceCount) -> take_next_workitem([NextAlloc|T], Wants, ScheduleStartTime, SlotInfo, SliceCount) -> {NodeNumber, NodeCount} = SlotInfo, - SliceSeconds = ?SECONDS_IN_DAY div SliceCount, + SliceSeconds = ?SECONDS_IN_DAY div max(SliceCount, 1), SlotSeconds = (NodeNumber - 1) * (SliceSeconds div max(NodeCount, 1)), {SliceNumber, NextAction} = NextAlloc, {Mega, Sec, _Micro} = ScheduleStartTime, diff --git a/src/riak_kv_vnode.erl b/src/riak_kv_vnode.erl index 5819e8bcba..2caf5e8fb7 100644 --- a/src/riak_kv_vnode.erl +++ b/src/riak_kv_vnode.erl @@ -152,8 +152,12 @@ :: list(riak_kv_entropy_manager:exchange()), tictac_exchangecount = 0 :: integer(), tictac_deltacount = 0 :: integer(), + tictac_exchangetime = 0 :: integer(), tictac_startqueue = os:timestamp() :: erlang:timestamp(), tictac_rebuilding = false :: erlang:timestamp()|false, + tictac_skiptick = 0 :: non_neg_integer(), + tictac_startup = true :: boolean(), + aae_tokenbucket = true :: boolean(), worker_pool_strategy = single :: none|single|dscp, vnode_pool_pid :: undefined|pid(), update_hook :: update_hook(), @@ -208,6 +212,8 @@ -define(MAX_AAE_QUEUE_TIME, 1000). %% Queue time in ms to prompt a sync ping. +-define(AAE_SKIP_COUNT, 10). +-define(AAE_LOADING_WAIT, 5000). -define(AF1_QUEUE, riak_core_node_worker_pool:af1()). %% Assured Forwarding - pool 1 @@ -320,6 +326,9 @@ maybe_start_aaecontroller(active, State=#state{mod=Mod, XTick = app_helper:get_env(riak_kv, tictacaae_exchangetick), RTick = app_helper:get_env(riak_kv, tictacaae_rebuildtick), + StepInitialTick = + app_helper:get_env(riak_kv, tictacaae_stepinitialtick, true), + StoreHead = app_helper:get_env(riak_kv, tictacaae_storeheads), ObjSplitFun = riak_object:aae_from_object_binary(StoreHead), @@ -330,35 +339,36 @@ maybe_start_aaecontroller(active, State=#state{mod=Mod, Preflists, RootPath, ObjSplitFun), - R = aae_controller:aae_rebuildtrees(AAECntrl, - Preflists, - fun preflistfun/2, - rebuildtrees_workerfun(Partition, - State), - true), - lager:info("AAE Controller started with pid ~w and rebuild state ~w", - [AAECntrl, R]), - Rebuilding = - case R of - ok -> - os:timestamp(); - skipped -> - false - end, + lager:info("AAE Controller started with pid=~w", [AAECntrl]), InitD = erlang:phash2(Partition, 256), - % Space out the initial poke to avoid over-coordination between vnodes - FirstRebuildDelay = (RTick div 256) * InitD, - FirstExchangeDelay = (XTick div 256) * InitD, + % Space out the initial poke to avoid over-coordination between vnodes, + % each of up to 256 vnodes will end on a different point in the slot, with + % the points wrapping every 256 vnodes (assuming coordinated restart) + FirstRebuildDelay = RTick + ((RTick div 256) * InitD), + FirstExchangeDelay = XTick + ((XTick div 256) * InitD), riak_core_vnode:send_command_after(FirstRebuildDelay, tictacaae_rebuildpoke), riak_core_vnode:send_command_after(FirstExchangeDelay, tictacaae_exchangepoke), + + InitalStep = + case StepInitialTick of + true -> + % Stops each vnode from re-filling the AAE work queue at the + % same time, creating a pause in AAE across the cluster if all + % nodes in the cluster were started concurrently + erlang:phash2(Partition, 8); + false -> + % During riak_test we set this to false + 0 + end, State#state{tictac_aae = true, aae_controller = AAECntrl, modstate = ModState, - tictac_rebuilding = Rebuilding}. + tictac_rebuilding = false, + tictac_skiptick = InitalStep}. -spec determine_aaedata_root(integer()) -> list(). @@ -379,16 +389,18 @@ preflistfun(Bucket, Key) -> riak_kv_util:get_index_n({Bucket, Key}). %% Function to be passed to return a response once an operation is complete tictac_returnfun(Partition, exchange) -> Vnode = {Partition, node()}, + StartTime = os:timestamp(), ReturnFun = fun(ExchangeResult) -> - ok = tictacexchange_complete(Vnode, ExchangeResult) + ok = tictacexchange_complete(Vnode, StartTime, ExchangeResult) end, ReturnFun; tictac_returnfun(Partition, RebuildType) -> Vnode = {Partition, node()}, + StartTime = os:timestamp(), ReturnFun = fun(ok) -> - ok = tictacrebuild_complete(Vnode, RebuildType) + ok = tictacrebuild_complete(Vnode, StartTime, RebuildType) end, ReturnFun. @@ -403,27 +415,71 @@ tictac_rebuild(B, K, V) -> Clock = element(1, riak_object:summary_from_binary(V)), {IndexN, Clock}. --spec rebuildtrees_workerfun(partition(), state()) -> fun(). %% @doc -%% A wrapper around the node_worker_pool to provide workers for tictac aae -%% folds. As the pool is a named pool started with riak_core, should be safe -%% to used in init of inidivdual vnodes. -rebuildtrees_workerfun(Partition, State) -> - fun(FoldFun, FinishFun) -> - Sender = self(), - ReturnFun = tictac_returnfun(Partition, trees), - FinishPlusReturnFun = - fun(WorkOutput) -> - FinishFun(WorkOutput), - ReturnFun(ok) +%% Queue a tictac tree rebuild. There are occasions when all vnodes queue this +%% at the same time, so important that the snapshot for the rebuild is taken +%% only when the fold is initiated. Otherwise the snapshot may expire whilst +%% sat on the queue +-spec queue_tictactreerebuild(pid(), partition(), boolean(), state()) -> ok. +queue_tictactreerebuild(AAECntrl, Partition, OnlyIfBroken, State) -> + Preflists = riak_kv_util:responsible_preflists(Partition), + Sender = self(), + ReturnFun = tictac_returnfun(Partition, trees), + FoldFun = + fun() -> + lager:info("Starting tree rebuild for partition=~w", [Partition]), + SW = os:timestamp(), + case when_loading_complete(AAECntrl, + Preflists, + fun preflistfun/2, + OnlyIfBroken) of + {ok, StoreFold, FinishFun} -> + Output = StoreFold(), + FinishFun(Output), + Duration = + timer:now_diff(os:timestamp(), SW) div (1000 * 1000), + lager:info("Tree rebuild complete for partition=~w" ++ + " in duration=~w seconds", + [Partition, Duration]); + skipped -> + lager:info("Tree rebuild skipped for partition=~w", + [Partition]) end, - Pool = select_queue(be_pool, State), - riak_core_vnode:queue_work(Pool, - {fold, FoldFun, FinishPlusReturnFun}, - Sender, - State#state.vnode_pool_pid) + ok + end, + JustReturnFun = + fun(ok) -> + ReturnFun(ok) + end, + Pool = select_queue(be_pool, State), + riak_core_vnode:queue_work(Pool, + {fold, FoldFun, JustReturnFun}, + Sender, + State#state.vnode_pool_pid). + +when_loading_complete(AAECntrl, Preflists, PreflistFun, OnlyIfBroken) -> + case is_process_alive(AAECntrl) of + true -> + R = aae_controller:aae_rebuildtrees(AAECntrl, + Preflists, PreflistFun, + OnlyIfBroken), + case R of + loading -> + timer:sleep(?AAE_LOADING_WAIT), + when_loading_complete(AAECntrl, + Preflists, + PreflistFun, + OnlyIfBroken); + _ -> + R + end; + _ -> + % May have queued a rebuild for a vnode aae controller for an + % exited vnode (e.g. one which has completed handoff) + skipped end. + %% @doc Reveal the underlying module state for testing -spec(get_modstate(state()) -> {atom(), state()}). get_modstate(_State=#state{mod=Mod, modstate=ModState}) -> @@ -475,21 +531,28 @@ aae_send(Preflist) -> riak_kv_vnode_master) end. --spec tictacrebuild_complete({partition(), node()}, store|trees) -> ok. +-spec tictacrebuild_complete({partition(), node()}, + erlang:timestamp(), + store|trees) -> ok. %% @doc %% Inform the vnode that an aae rebuild is complete -tictacrebuild_complete(Vnode, ProcessType) -> +tictacrebuild_complete(Vnode, StartTime, ProcessType) -> riak_core_vnode_master:command(Vnode, - {rebuild_complete, ProcessType}, + {rebuild_complete, + ProcessType, + StartTime}, riak_kv_vnode_master). --spec tictacexchange_complete({partition(), node()}, +-spec tictacexchange_complete({partition(), node()}, + erlang:timestamp(), {atom(), non_neg_integer()}) -> ok. %% @doc %% Infor the vnode that an aae exchange is complete -tictacexchange_complete(Vnode, ExchangeResult) -> +tictacexchange_complete(Vnode, StartTime, ExchangeResult) -> riak_core_vnode_master:command(Vnode, - {exchange_complete, ExchangeResult}, + {exchange_complete, + ExchangeResult, + StartTime}, riak_kv_vnode_master). get(Preflist, BKey, ReqId) -> @@ -742,6 +805,8 @@ init([Index]) -> app_helper:get_env(riak_kv, tictacaae_active, passive), WorkerPoolStrategy = app_helper:get_env(riak_kv, worker_pool_strategy), + TokenBucket = + app_helper:get_env(riak_kv, aae_tokenbucket, true), MaxAAEQueueTime = app_helper:get_env(riak_kv, max_aae_queue_time, ?MAX_AAE_QUEUE_TIME), EnableNextGenReplSrc = @@ -776,6 +841,7 @@ init([Index]) -> worker_pool_strategy=WorkerPoolStrategy, update_hook=update_hook(), max_aae_queue_time=MaxAAEQueueTime, + aae_tokenbucket=TokenBucket, enable_nextgenreplsrc = EnableNextGenReplSrc, sizelimit_nextgenreplsrc = SizeLimitNextGenReplSrc}, try_set_vnode_lock_limit(Index), @@ -1016,41 +1082,47 @@ handle_command({fold_indexes, FoldIndexFun, Acc}, Sender, State=#state{mod=Mod, end; -handle_command({rebuild_complete, store}, _Sender, State) -> +handle_command({rebuild_complete, store, ST}, _Sender, State) -> %% If store rebuild complete - then need to rebuild trees AAECntrl = State#state.aae_controller, Partition = State#state.idx, - Preflists = riak_kv_util:responsible_preflists(Partition), - aae_controller:aae_rebuildtrees(AAECntrl, - Preflists, - fun preflistfun/2, - rebuildtrees_workerfun(Partition, State), - false), - lager:info("AAE Controller rebuild trees started with pid ~w", [AAECntrl]), + lager:info("AAE pid=~w partition=~w rebuild store complete " ++ + "in duration=~w seconds", + [AAECntrl, + Partition, + timer:now_diff(os:timestamp(), ST) div (1000 * 1000)]), + queue_tictactreerebuild(AAECntrl, Partition, false, State), + lager:info("AAE pid=~w rebuild trees queued", [AAECntrl]), {noreply, State}; -handle_command({rebuild_complete, trees}, _Sender, State) -> +handle_command({rebuild_complete, trees, _ST}, _Sender, State) -> % Rebuilding the trees now complete, so change the status of the % rebuilding state so other rebuilds may be prompted Partition = State#state.idx, case State#state.tictac_rebuilding of false -> lager:warning("Rebuild complete for Partition=~w but not expected", - [Partition]), + [Partition]), {noreply, State}; TS -> - RebuildTime = timer:now_diff(os:timestamp(), TS) / (1000 * 1000), - lager:info("Rebuild complete for " ++ - "Partition=~w in duration=~w seconds", - [Partition, RebuildTime]), + ProcessTime = timer:now_diff(os:timestamp(), TS) div (1000 * 1000), + lager:info("Rebuild process for partition=~w complete in " ++ + "duration=~w seconds", [Partition, ProcessTime]), {noreply, State#state{tictac_rebuilding = false}} end; -handle_command({exchange_complete, {_EndState, DeltaCount}}, _Sender, State) -> +handle_command({exchange_complete, {_EndState, DeltaCount}, ST}, + _Sender, State) -> %% Record how many deltas were seen in the exchange + %% Revert the skip_count to 0 so that exchanges can be made at the next + %% prompt. XC = State#state.tictac_exchangecount + 1, DC = State#state.tictac_deltacount + DeltaCount, - {noreply, State#state{tictac_exchangecount = XC, tictac_deltacount = DC}}; + XT = State#state.tictac_exchangetime + timer:now_diff(os:timestamp(), ST), + {noreply, State#state{tictac_exchangecount = XC, + tictac_deltacount = DC, + tictac_exchangetime = XT, + tictac_skiptick = 0}}; handle_command({upgrade_hashtree, Node}, _, State=#state{hashtrees=HT}) -> %% Make sure we dont kick off an upgrade during a possible handoff @@ -1087,8 +1159,8 @@ handle_command(tictacaae_exchangepoke, _Sender, State) -> XTick = app_helper:get_env(riak_kv, tictacaae_exchangetick), riak_core_vnode:send_command_after(XTick, tictacaae_exchangepoke), Idx = State#state.idx, - case State#state.tictac_exchangequeue of - [] -> + case {State#state.tictac_exchangequeue, State#state.tictac_skiptick} of + {[], _} -> {ok, Ring} = riak_core_ring_manager:get_my_ring(), Exchanges = @@ -1106,17 +1178,20 @@ handle_command(tictacaae_exchangepoke, _Sender, State) -> ++ "exchanges expected=~w " ++ "exchanges completed=~w " ++ "total deltas=~w " + ++ "total exchange_time=~w seconds " ++ "loop duration=~w seconds (elapsed)", [Idx, length(Exchanges), State#state.tictac_exchangecount, State#state.tictac_deltacount, + State#state.tictac_exchangetime div (1000 * 1000), LoopDuration div (1000 * 1000)]), {noreply, State#state{tictac_exchangequeue = Exchanges, tictac_exchangecount = 0, tictac_deltacount = 0, + tictac_exchangetime = 0, tictac_startqueue = Now}}; - [{Local, Remote, {DocIdx, N}}|Rest] -> + {[{Local, Remote, {DocIdx, N}}|Rest], 0} -> PrimaryOnly = app_helper:get_env(riak_kv, tictacaae_primaryonly, true), % By default TictacAAE exchanges are run only between primary @@ -1133,41 +1208,78 @@ handle_command(tictacaae_exchangepoke, _Sender, State) -> false -> riak_core_apl:get_apl(PlLup, N, riak_kv) end, - case {lists:keyfind(Local, 1, PL), lists:keyfind(Remote, 1, PL)} of - {{Local, LN}, {Remote, RN}} -> - IndexN = {DocIdx, N}, - BlueList = - [{riak_kv_vnode:aae_send({Local, LN}), [IndexN]}], - PinkList = - [{riak_kv_vnode:aae_send({Remote, RN}), [IndexN]}], - RepairFun = - riak_kv_get_fsm:prompt_readrepair([{Local, LN}, - {Remote, RN}]), - ReplyFun = tictac_returnfun(Idx, exchange), - aae_exchange:start(BlueList, - PinkList, - RepairFun, - ReplyFun); - _ -> - lager:warning("Proposed exchange between ~w and ~w " ++ - "not currently supported within " ++ - "preflist for IndexN=~w possibly due to " ++ - "node failure", - [Local, Remote, {DocIdx, N}]) - end, + SkipCount = + case {lists:keyfind(Local, 1, PL), + lists:keyfind(Remote, 1, PL)} of + {{Local, LN}, {Remote, RN}} -> + IndexN = {DocIdx, N}, + BlueList = + [{riak_kv_vnode:aae_send({Local, LN}), [IndexN]}], + PinkList = + [{riak_kv_vnode:aae_send({Remote, RN}), [IndexN]}], + RepairFun = + riak_kv_get_fsm:prompt_readrepair([{Local, LN}, + {Remote, RN}]), + ReplyFun = tictac_returnfun(Idx, exchange), + ScanTimeout = ?AAE_SKIP_COUNT * XTick, + ExchangeOptions = + case app_helper:get_env(riak_kv, + tictacaae_maxresults) of + MR when is_integer(MR) -> + [{scan_timeout, ScanTimeout}, + {max_results, MR}]; + _ -> + [{scan_timeout, ScanTimeout}] + end, + aae_exchange:start(full, + BlueList, + PinkList, + RepairFun, + ReplyFun, + none, + ExchangeOptions), + ?AAE_SKIP_COUNT; + _ -> + lager:warning("Proposed exchange between ~w and ~w " ++ + "not currently supported within " ++ + "preflist for IndexN=~w possibly " ++ + "due to node failure", + [Local, Remote, {DocIdx, N}]), + 0 + end, ok = aae_controller:aae_ping(State#state.aae_controller, os:timestamp(), self()), - {noreply, State#state{tictac_exchangequeue = Rest}} + {noreply, State#state{tictac_exchangequeue = Rest, + tictac_skiptick = SkipCount}}; + {_, SkipCount} -> + lager:warning("Skipping a tick due to non_zero " ++ + "skip_count=~w", [SkipCount]), + {noreply, State#state{tictac_skiptick = max(0, SkipCount - 1)}} end; +handle_command(tictacaae_rebuildpoke, _Sender, State=#state{tictac_startup=TS}) + when TS == true -> + % On startup the first poke should check if the trees need rebuilding, e.g. + % as the tree was not persisted when shutdown. This won't rebuild unless + % the trees have been marked as broken. Trees are marked as broken in a + % non-empty store where trees do not exist. + RTick = app_helper:get_env(riak_kv, tictacaae_rebuildtick), + riak_core_vnode:send_command_after(RTick, tictacaae_rebuildpoke), + AAECntrl = State#state.aae_controller, + Partition = State#state.idx, + queue_tictactreerebuild(AAECntrl, Partition, true, State), + {noreply, State#state{tictac_rebuilding = os:timestamp(), + tictac_startup = false}}; + + handle_command(tictacaae_rebuildpoke, Sender, State) -> NRT = aae_controller:aae_nextrebuild(State#state.aae_controller), RTick = app_helper:get_env(riak_kv, tictacaae_rebuildtick), riak_core_vnode:send_command_after(RTick, tictacaae_rebuildpoke), TimeToRebuild = timer:now_diff(NRT, os:timestamp()), RebuildPending = State#state.tictac_rebuilding =/= false, - + case {TimeToRebuild < 0, RebuildPending} of {false, _} -> lager:info("No rebuild as next_rebuild=~w seconds in the future", @@ -1191,6 +1303,8 @@ handle_command(tictacaae_rebuildpoke, Sender, State) -> {noreply, State}; {true, false} -> % Next Rebuild Time is in the past - prompt a rebuild + lager:info("Prompting tictac_aae rebuild for controller=~w", + [State#state.aae_controller]), ReturnFun = tictac_returnfun(State#state.idx, store), State0 = State#state{tictac_rebuilding = os:timestamp()}, case aae_controller:aae_rebuildstore(State#state.aae_controller, @@ -3327,19 +3441,19 @@ fold_type_for_query(Query) -> maybe_enable_async_fold(AsyncFolding, Capabilities, Opts) -> AsyncBackend = lists:member(async_fold, Capabilities), options_for_folding_and_backend(Opts, - AsyncFolding andalso AsyncBackend, - async_fold). + AsyncFolding andalso AsyncBackend, + async_fold). -spec maybe_enable_snap_prefold(boolean(), list(), list()) -> list(). maybe_enable_snap_prefold(SnapFolding, Capabilities, Opts) -> SnapBackend = lists:member(snap_prefold, Capabilities), options_for_folding_and_backend(Opts, - SnapFolding andalso SnapBackend, - snap_prefold). + SnapFolding andalso SnapBackend, + snap_prefold). -spec options_for_folding_and_backend(list(), - UseAsyncFolding :: boolean(), - atom()) -> list(). + UseAsyncFolding :: boolean(), + atom()) -> list(). options_for_folding_and_backend(Opts, true, async_fold) -> [async_fold | Opts]; options_for_folding_and_backend(Opts, true, snap_prefold) -> @@ -3482,8 +3596,14 @@ nextgenrepl(_B, _K, _Obj, _Size, _Coord, _Enabled, _Limit) -> %% @doc %% Update both the AAE controller (tictac aae) and old school hashtree aae %% if either or both are enabled. -aae_update(Bucket, Key, UpdObj, PrevObj, UpdObjBin, State) -> - case State#state.hashtrees of +aae_update(_Bucket, _Key, _UpdObj, _PrevObj, _UpdObjBin, + #state{hashtrees = HTs, tictac_aae = TAAE} = _State) + when HTs == undefined, TAAE == false -> + ok; +aae_update(Bucket, Key, UpdObj, PrevObj, UpdObjBin, + #state{hashtrees = HTs, tictac_aae = TAAE} = State) -> + Async = async_aae(State#state.aae_tokenbucket), + case HTs of undefined -> ok; Trees -> @@ -3494,9 +3614,9 @@ aae_update(Bucket, Key, UpdObj, PrevObj, UpdObjBin, State) -> _ -> UpdObj end, - update_hashtree(Bucket, Key, RObj, Trees) + update_hashtree(Bucket, Key, RObj, Trees, Async) end, - case {State#state.tictac_aae, PrevObj} of + case {TAAE, PrevObj} of {false, _} -> ok; {_, unchanged_no_old_object} -> @@ -3520,6 +3640,14 @@ aae_update(Bucket, Key, UpdObj, PrevObj, UpdObjBin, State) -> _ -> UpdObjBin end, + case Async of + true -> + ok; + false -> + aae_controller:aae_ping(State#state.aae_controller, + os:timestamp(), + self()) + end, aae_controller:aae_put(State#state.aae_controller, IndexN, Bucket, Key, @@ -3531,19 +3659,33 @@ aae_update(Bucket, Key, UpdObj, PrevObj, UpdObjBin, State) -> -spec aae_delete(binary(), binary(), old_object(), state()) -> ok. %% @doc %% Remove an item from the AAE store, where AAE has been enabled -aae_delete(Bucket, Key, PrevObj, State) -> - case State#state.hashtrees of +aae_delete(_Bucket, _Key, _PrevObj, + #state{hashtrees = HTs, tictac_aae = TAAE} = _State) + when HTs == undefined, TAAE == false -> + ok; +aae_delete(Bucket, Key, PrevObj, + #state{hashtrees = HTs, tictac_aae = TAAE} = State) -> + Async = async_aae(State#state.aae_tokenbucket), + case HTs of undefined -> ok; Trees -> - delete_from_hashtree(Bucket, Key, Trees) + delete_from_hashtree(Bucket, Key, Trees, Async) end, - case State#state.tictac_aae of + case TAAE of false -> ok; true -> PrevClock = get_clock(PrevObj), IndexN = riak_kv_util:get_index_n({Bucket, Key}), + case Async of + true -> + ok; + false -> + aae_controller:aae_ping(State#state.aae_controller, + os:timestamp(), + self()) + end, aae_controller:aae_put(State#state.aae_controller, IndexN, Bucket, Key, @@ -3551,6 +3693,21 @@ aae_delete(Bucket, Key, PrevObj, State) -> <<>>) end. +%% @doc +%% Normally should use an async aae call, unless using a token bucket when a +%% non-async call may be requested (false) each time the bucket is empty +-spec async_aae(boolean()) -> boolean(). +async_aae(false) -> + true; +async_aae(true) -> + case get_hashtree_token() of + true -> + true; + false -> + put(hashtree_tokens, max_hashtree_tokens()), + false + end. + -spec get_clock(old_object()) -> aae_controller:version_vector(). %% @doc @@ -3579,39 +3736,37 @@ maybe_old_object(OldObject) -> OldObject. --spec update_hashtree(binary(), binary(), riak_object:riak_object(), pid()) - -> ok. +-spec update_hashtree(binary(), binary(), riak_object:riak_object(), pid(), + boolean()) -> ok. %% @doc %% Update hashtree based AAE when enabled. %% Note that this requires an object copy - the object has been converted from %% a binary before being sent to another pid. Also, all information on the %% object is ignored other than that necessary to hash the object. There is %% scope for greater efficiency here, even without moving to Tictac AAE -update_hashtree(Bucket, Key, RObj, Trees) -> +update_hashtree(Bucket, Key, RObj, Trees, Async) -> Items = [{object, {Bucket, Key}, RObj}], - case get_hashtree_token() of + case Async of true -> riak_kv_index_hashtree:async_insert(Items, [], Trees), ok; false -> riak_kv_index_hashtree:insert(Items, [], Trees), - put(hashtree_tokens, max_hashtree_tokens()), ok end. --spec delete_from_hashtree(binary(), binary(), pid()) -> ok. +-spec delete_from_hashtree(binary(), binary(), pid(), boolean()) -> ok. %% @doc %% Remove an object from the hashtree based AAE -delete_from_hashtree(Bucket, Key, Trees)-> +delete_from_hashtree(Bucket, Key, Trees, Async)-> Items = [{object, {Bucket, Key}}], - case get_hashtree_token() of + case Async of true -> riak_kv_index_hashtree:async_delete(Items, Trees), ok; false -> riak_kv_index_hashtree:delete(Items, Trees), - put(hashtree_tokens, max_hashtree_tokens()), ok end.