Skip to content

Commit

Permalink
queue stats logging refinements
Browse files Browse the repository at this point in the history
* New tunable, queue_manager_log_frequency, setting the interval,
  in seconds, between logging of queue stats in replrtq_{snk,src}.

* New tunable, queue_manager_log_suppress_zero_stats, optionally to
  avoid logging of zero stats in replrtq_src and overflow_queue.

* Logging in replrtq_* is now done with queue_name in lager:info metadata,
  to allow operators to filter such entries into separate lager sinks.
  • Loading branch information
hmmr committed Nov 5, 2023
1 parent e80a069 commit 06afdaa
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 33 deletions.
15 changes: 14 additions & 1 deletion priv/riak_kv.schema
Original file line number Diff line number Diff line change
Expand Up @@ -1411,6 +1411,19 @@
{commented, 900}
]}.

%% @doc Periodically log replrtq queue stats (queue size, reap/delete
%% attempts and aborts) at this interval (seconds)
{mapping, "queue_manager_log_frequency", "riak_kv.queue_manager_log_frequency", [
{datatype, integer},
{default, 30}
]}.

%% @doc Suppress logging of queue stats when all items are 0
{mapping, "queue_manager_log_suppress_zero_stats", "riak_kv.queue_manager_log_suppress_zero_stats", [
{datatype, flag},
{default, off}
]}.

%% @doc Enable the `recalc` compaction strategy within the leveled backend in
%% riak. The default (when disabled) is `retain`, but this will leave
%% uncollected garbage within the, journal.
Expand Down Expand Up @@ -1508,4 +1521,4 @@
{mapping, "handoff_deletes", "riak_kv.handoff_deletes", [
{datatype, {flag, enabled, disabled}},
{default, disabled}
]}.
]}.
47 changes: 27 additions & 20 deletions src/riak_kv_overflow_queue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -105,32 +105,39 @@ new(Priorities, FilePath, QueueLimit, OverflowLimit) ->
non_neg_integer(), non_neg_integer(),
overflowq()) -> overflowq().
log(Type, JobID, Attempts, Aborts, Queue) ->
QueueLengths =
lists:foldl(fun({P, L}, Acc) ->
[Acc, io_lib:format("queue_p~w=~w ", [P, L])]
{QueueLengths, QLCount} =
lists:foldl(fun({P, L}, {AccS, AccN}) ->
{[AccS, io_lib:format("queue_p~w=~w ", [P, L])], AccN + L}
end,
"Queue lengths ",
{"Queue lengths ", 0},
Queue#overflowq.mqueue_lengths),
OverflowLengths =
lists:foldl(fun({P, L}, Acc) ->
[Acc, io_lib:format("overflow_p~w=~w ", [P, L])]
{OverflowLengths, OLCount} =
lists:foldl(fun({P, L}, {AccS, AccN}) ->
{[AccS, io_lib:format("overflow_p~w=~w ", [P, L])], AccN + L}
end,
"Overflow lengths ",
{"Overflow lengths ", 0},
Queue#overflowq.overflow_lengths),
DiscardCounts =
lists:foldl(fun({P, L}, Acc) ->
[Acc, io_lib:format("discard_p~w=~w ", [P, L])]
{DiscardCounts, DCCount} =
lists:foldl(fun({P, L}, {AccS, AccN}) ->
{[AccS, io_lib:format("discard_p~w=~w ", [P, L])], AccN}
end,
"Discard counts ",
{"Discard counts ", 0},
Queue#overflowq.overflow_discards),

_ = lager:info(lists:flatten(["~p job_id=~p has ",
"attempts=~w aborts=~w ",
QueueLengths,
OverflowLengths,
DiscardCounts]),
[Type, JobID, Attempts, Aborts]),

case app_helper:get_env(riak_kv, queue_manager_log_suppress_zero_stats, false) of
true when QLCount == 0,
OLCount == 0,
DCCount == 0,
Attempts == 0,
Aborts == 0 ->
ok;
_ ->
lager:info(lists:flatten(["~p job_id=~p has attempts=~w aborts=~w ",
QueueLengths,
OverflowLengths,
DiscardCounts]),
[Type, JobID, Attempts, Aborts])
end,
ResetDiscards =
lists:map(fun({P, _L}) -> {P, 0} end,
Queue#overflowq.overflow_discards),
Expand Down Expand Up @@ -579,4 +586,4 @@ underover_overflow_test() ->
close(RootPath, FlowQ15).


-endif.
-endif.
12 changes: 8 additions & 4 deletions src/riak_kv_replrtq_snk.erl
Original file line number Diff line number Diff line change
Expand Up @@ -415,10 +415,12 @@ handle_cast({requeue_work, WorkItem}, State) ->

handle_info(timeout, State) ->
prompt_work(),
erlang:send_after(?LOG_TIMER_SECONDS * 1000, self(), log_stats),
LogFreq = app_helper:get_env(riak_kv, queue_manager_log_frequency, ?LOG_TIMER_SECONDS),
erlang:send_after(LogFreq * 1000, self(), log_stats),
{noreply, State};
handle_info(log_stats, State) ->
erlang:send_after(?LOG_TIMER_SECONDS * 1000, self(), log_stats),
LogFreq = app_helper:get_env(riak_kv, queue_manager_log_frequency, ?LOG_TIMER_SECONDS),
erlang:send_after(LogFreq * 1000, self(), log_stats),
SinkWork0 =
case State#state.enabled of
true ->
Expand Down Expand Up @@ -854,7 +856,8 @@ log_mapfun({QueueName, Iteration, SinkWork}) ->
{replmod_time, RT},
{modified_time, MTS, MTM, MTH, MTD, MTL}}
= SinkWork#sink_work.queue_stats,
lager:info("Queue=~w success_count=~w error_count=~w" ++
lager:info([{queue_name, QueueName}],
"Queue=~w success_count=~w error_count=~w" ++
" mean_fetchtime_ms=~s" ++
" mean_pushtime_ms=~s" ++
" mean_repltime_ms=~s" ++
Expand All @@ -868,7 +871,8 @@ log_mapfun({QueueName, Iteration, SinkWork}) ->
end,
PeerDelays =
lists:foldl(FoldPeerInfoFun, "", SinkWork#sink_work.peer_list),
lager:info("Queue=~w has peer delays of~s", [QueueName, PeerDelays]),
lager:info([{queue_name, QueueName}],
"Queue=~w has peer delays of~s", [QueueName, PeerDelays]),
{QueueName, Iteration, SinkWork#sink_work{queue_stats = ?ZERO_STATS}}.

-spec log_queue_addition(
Expand Down
18 changes: 10 additions & 8 deletions src/riak_kv_replrtq_src.erl
Original file line number Diff line number Diff line change
Expand Up @@ -325,11 +325,7 @@ init([FilePath]) ->
end,
QO = lists:map(MapToQOverflow, QFM),
QC = lists:map(MaptoQCache, QFM),
LogFreq =
app_helper:get_env(
riak_kv,
replrtq_logfrequency,
?LOG_TIMER_SECONDS * 1000),
LogFreq = 1000 * app_helper:get_env(riak_kv, queue_manager_log_frequency, ?LOG_TIMER_SECONDS),
erlang:send_after(LogFreq, self(), log_queue),

{ok, #state{queue_filtermap = QFM,
Expand Down Expand Up @@ -670,9 +666,15 @@ handle_info(log_queue, State) ->
lists:map(
MapFun,
[?FLD_PRIORITY, ?AAE_PRIORITY, ?RTQ_PRIORITY]),
lager:info(
"QueueName=~w has queue sizes p1=~w p2=~w p3=~w",
[QueueName, P1L, P2L, P3L])
case app_helper:get_env(riak_kv, queue_manager_log_suppress_zero_stats, false) of
true when P1L == 0, P2L == 0, P3L == 0 ->
ok;
_ ->
lager:info(
[{queue_name, QueueName}],
"QueueName=~w has queue sizes p1=~w p2=~w p3=~w",
[QueueName, P1L, P2L, P3L])
end
end,
lists:foreach(LogFun, State#state.queue_filtermap),
erlang:send_after(State#state.log_frequency_in_ms, self(), log_queue),
Expand Down

0 comments on commit 06afdaa

Please sign in to comment.