Skip to content
48 changes: 42 additions & 6 deletions src/leveled_bookie.erl
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,8 @@
book_loglevel/2,
book_addlogs/2,
book_removelogs/2,
book_headstatus/1
book_headstatus/1,
book_status/1
]).

%% folding API
Expand Down Expand Up @@ -1316,6 +1317,24 @@ book_removelogs(Pid, ForcedLogs) ->
book_headstatus(Pid) ->
gen_server:call(Pid, head_status, infinity).

-spec book_status(pid()) -> proplists:proplist().
%% @doc
%% Return a proplist conteaining the following items:
%% * current size of the ledger cache;
%% * number of active journal files;
%% * average compaction score for the journal;
%% * current distribution of files across the ledger (e.g. count of files by level);
%% * current size of the penciller in-memory cache;
%% * penciller work backlog status;
%% * last merge time (penciller);
%% * last compaction time (journal);
%% * last compaction result (journal) e.g. files compacted and compaction score;
%% * ratio of metadata to object size (recent PUTs);
%% * PUT/GET/HEAD recent time/count metrics;
%% * mean level for recent fetches.
book_status(Pid) ->
gen_server:call(Pid, status, infinity).

%%%============================================================================
%%% gen_server callbacks
%%%============================================================================
Expand Down Expand Up @@ -1475,7 +1494,8 @@ handle_call(
State#state.cache_size,
State#state.cache_multiple,
Cache0,
State#state.penciller
State#state.penciller,
State#state.monitor
)
of
{ok, Cache} ->
Expand Down Expand Up @@ -1509,7 +1529,8 @@ handle_call({mput, ObjectSpecs, TTL}, From, State) when
State#state.cache_size,
State#state.cache_multiple,
Cache0,
State#state.penciller
State#state.penciller,
State#state.monitor
)
of
{ok, Cache} ->
Expand Down Expand Up @@ -1686,7 +1707,8 @@ handle_call({compact_journal, Timeout}, From, State) when
State#state.cache_size,
State#state.cache_multiple,
State#state.ledger_cache,
State#state.penciller
State#state.penciller,
State#state.monitor
)
of
{_, NewCache} ->
Expand Down Expand Up @@ -1740,6 +1762,8 @@ handle_call(return_actors, _From, State) ->
{reply, {ok, State#state.inker, State#state.penciller}, State};
handle_call(head_status, _From, State) ->
{reply, {State#state.head_only, State#state.head_lookup}, State};
handle_call(status, _From, State) ->
{reply, status(State), State};
handle_call(Msg, _From, State) ->
{reply, {unsupported_message, element(1, Msg)}, State}.

Expand Down Expand Up @@ -2877,7 +2901,11 @@ check_in_ledgercache(PK, Hash, Cache, loader) ->
end.

-spec maybepush_ledgercache(
pos_integer(), pos_integer(), ledger_cache(), pid()
pos_integer(),
pos_integer(),
ledger_cache(),
pid(),
leveled_monitor:monitor()
) ->
{ok | returned, ledger_cache()}.
%% @doc
Expand All @@ -2890,9 +2918,12 @@ check_in_ledgercache(PK, Hash, Cache, loader) ->
%% in the reply. Try again later when it isn't busy (and also potentially
%% implement a slow_offer state to slow down the pace at which PUTs are being
%% received)
maybepush_ledgercache(MaxCacheSize, MaxCacheMult, Cache, Penciller) ->
maybepush_ledgercache(
MaxCacheSize, MaxCacheMult, Cache, Penciller, {Monitor, _}
) ->
Tab = Cache#ledger_cache.mem,
CacheSize = ets:info(Tab, size),
leveled_monitor:add_stat(Monitor, {ledger_cache_size_update, CacheSize}),
TimeToPush = maybe_withjitter(CacheSize, MaxCacheSize, MaxCacheMult),
if
TimeToPush ->
Expand Down Expand Up @@ -3048,6 +3079,11 @@ maybelog_snap_timing({Pid, _StatsFreq}, BookieTime, PCLTime) when
maybelog_snap_timing(_Monitor, _, _) ->
ok.

status(#state{monitor = {no_monitor, 0}}) ->
#{};
status(#state{monitor = {Monitor, _}}) ->
leveled_monitor:get_bookie_status(Monitor).

%%%============================================================================
%%% Test
%%%============================================================================
Expand Down
10 changes: 10 additions & 0 deletions src/leveled_cdb.erl
Original file line number Diff line number Diff line change
Expand Up @@ -493,6 +493,8 @@ starting({call, From}, {open_writer, Filename}, State) ->
{next_state, writer, State0, [{reply, From, ok}, hibernate]};
starting({call, From}, {open_reader, Filename}, State) ->
leveled_log:save(State#state.log_options),
{Monitor, _} = State#state.monitor,
leveled_monitor:add_stat(Monitor, {n_active_journal_files_update, +1}),
leveled_log:log(cdb02, [Filename]),
{Handle, Index, LastKey} = open_for_readonly(Filename, false),
State0 = State#state{
Expand All @@ -504,6 +506,8 @@ starting({call, From}, {open_reader, Filename}, State) ->
{next_state, reader, State0, [{reply, From, ok}, hibernate]};
starting({call, From}, {open_reader, Filename, LastKey}, State) ->
leveled_log:save(State#state.log_options),
{Monitor, _} = State#state.monitor,
leveled_monitor:add_stat(Monitor, {n_active_journal_files_update, +1}),
leveled_log:log(cdb02, [Filename]),
{Handle, Index, LastKey} = open_for_readonly(Filename, LastKey),
State0 = State#state{
Expand Down Expand Up @@ -880,6 +884,8 @@ delete_pending(
) when
?IS_DEF(FN), ?IS_DEF(IO)
->
{Monitor, _} = State#state.monitor,
leveled_monitor:add_stat(Monitor, {n_active_journal_files_update, -1}),
leveled_log:log(cdb04, [FN, State#state.delete_point]),
close_pendingdelete(IO, FN, State#state.waste_path),
{stop, normal};
Expand All @@ -906,6 +912,10 @@ delete_pending(
),
{keep_state_and_data, [?DELETE_TIMEOUT]};
false ->
{Monitor, _} = State#state.monitor,
leveled_monitor:add_stat(
Monitor, {n_active_journal_files_update, -1}
),
leveled_log:log(cdb04, [FN, ManSQN]),
close_pendingdelete(IO, FN, State#state.waste_path),
{stop, normal}
Expand Down
46 changes: 36 additions & 10 deletions src/leveled_iclerk.erl
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,7 @@ handle_cast(
{noreply, State#state{scored_files = [], scoring_state = ScoringState}};
handle_cast(
{score_filelist, [Entry | Tail]},
State = #state{scoring_state = ScoringState}
State = #state{scoring_state = ScoringState, cdb_options = CDBOpts}
) when
?IS_DEF(ScoringState)
->
Expand All @@ -379,7 +379,8 @@ handle_cast(
ScoringState#scoring_state.max_sqn,
?SAMPLE_SIZE,
?BATCH_SIZE,
State#state.reload_strategy
State#state.reload_strategy,
CDBOpts#cdb_options.monitor
);
{CachedScore, true, _ScoreOneIn} ->
% If caches are used roll the score towards the current score
Expand All @@ -394,7 +395,8 @@ handle_cast(
ScoringState#scoring_state.max_sqn,
?SAMPLE_SIZE,
?BATCH_SIZE,
State#state.reload_strategy
State#state.reload_strategy,
CDBOpts#cdb_options.monitor
),
(NewScore + CachedScore) / 2;
{CachedScore, false, _ScoreOneIn} ->
Expand Down Expand Up @@ -427,6 +429,11 @@ handle_cast(
{MaxRunLength, State#state.maxrunlength_compactionperc,
State#state.singlefile_compactionperc},
{BestRun0, Score} = assess_candidates(Candidates, ScoreParams),
{Monitor, _} = CDBopts#cdb_options.monitor,
leveled_monitor:add_stat(
Monitor,
{journal_last_compaction_result_update, {length(BestRun0), Score}}
),
leveled_log:log_timer(ic003, [Score, length(BestRun0)], SW),
case Score > 0.0 of
true ->
Expand Down Expand Up @@ -472,6 +479,12 @@ handle_cast(
->
FilesToDelete =
leveled_imanifest:find_persistedentries(PersistedSQN, ManifestAsList),
CDBopts = State#state.cdb_options,
{Monitor, _} = CDBopts#cdb_options.monitor,
leveled_monitor:add_stat(
Monitor,
{journal_last_compaction_time_update, os:system_time(millisecond)}
),
leveled_log:log(ic007, []),
ok = leveled_inker:ink_clerkcomplete(Ink, [], FilesToDelete),
{noreply, State};
Expand Down Expand Up @@ -594,7 +607,8 @@ schedule_compaction(CompactionHours, RunsPerDay, CurrentTS) ->
leveled_codec:sqn(),
non_neg_integer(),
non_neg_integer(),
leveled_codec:compaction_strategy()
leveled_codec:compaction_strategy(),
leveled_monitor:monitor()
) ->
float().
%% @doc
Expand All @@ -615,7 +629,8 @@ check_single_file(
MaxSQN,
SampleSize,
BatchSize,
ReloadStrategy
ReloadStrategy,
{Monitor, _}
) ->
FN = leveled_cdb:cdb_filename(CDB),
SW = os:timestamp(),
Expand All @@ -629,6 +644,7 @@ check_single_file(
MaxSQN,
ReloadStrategy
),
leveled_monitor:add_stat(Monitor, {avg_compaction_score_update, Score}),
safely_log_filescore(PositionList, FN, Score, SW),
Score.

Expand Down Expand Up @@ -1265,14 +1281,22 @@ check_single_file_test() ->
replaced
end
end,
Score1 = check_single_file(CDB, LedgerFun1, LedgerSrv1, 9, 8, 4, RS),
Score1 = check_single_file(
CDB, LedgerFun1, LedgerSrv1, 9, 8, 4, RS, {no_monitor, 0}
),
?assertMatch(37.5, Score1),
LedgerFun2 = fun(_Srv, _Key, _ObjSQN) -> current end,
Score2 = check_single_file(CDB, LedgerFun2, LedgerSrv1, 9, 8, 4, RS),
Score2 = check_single_file(
CDB, LedgerFun2, LedgerSrv1, 9, 8, 4, RS, {no_monitor, 0}
),
?assertMatch(100.0, Score2),
Score3 = check_single_file(CDB, LedgerFun1, LedgerSrv1, 9, 8, 3, RS),
Score3 = check_single_file(
CDB, LedgerFun1, LedgerSrv1, 9, 8, 3, RS, {no_monitor, 0}
),
?assertMatch(37.5, Score3),
Score4 = check_single_file(CDB, LedgerFun1, LedgerSrv1, 4, 8, 4, RS),
Score4 = check_single_file(
CDB, LedgerFun1, LedgerSrv1, 4, 8, 4, RS, {no_monitor, 0}
),
?assertMatch(75.0, Score4),
ok = leveled_cdb:cdb_deletepending(CDB),
ok = leveled_cdb:cdb_destroy(CDB).
Expand Down Expand Up @@ -1417,7 +1441,9 @@ compact_empty_file_test() ->
{3, {o, "Bucket", "Key3", null}}
],
LedgerFun1 = fun(_Srv, _Key, _ObjSQN) -> replaced end,
Score1 = check_single_file(CDB2, LedgerFun1, LedgerSrv1, 9, 8, 4, RS),
Score1 = check_single_file(
CDB2, LedgerFun1, LedgerSrv1, 9, 8, 4, RS, {no_monitor, 0}
),
?assert((+0.0 =:= Score1) orelse (-0.0 =:= Score1)),
ok = leveled_cdb:cdb_deletepending(CDB2),
ok = leveled_cdb:cdb_destroy(CDB2).
Expand Down
Loading