Skip to content

Commit

Permalink
Merge pull request #40 from esl/mu-fix-status-tables
Browse files Browse the repository at this point in the history
Fix cets_status improper missing/unknown tables
  • Loading branch information
pawlooss1 committed Nov 13, 2023
2 parents 5fb3db1 + e28f49e commit 2ca3105
Show file tree
Hide file tree
Showing 3 changed files with 133 additions and 15 deletions.
73 changes: 59 additions & 14 deletions src/cets_status.erl
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,12 @@

%% RPC callbacks
-export([get_local_table_to_other_nodes_map_from_disco/1]).
-ignore_xref([status/1, get_local_table_to_other_nodes_map_from_disco/1]).
%% Export for debugging/testing
-export([gather_data/1, format_data/1]).

-ignore_xref([
status/1, get_local_table_to_other_nodes_map_from_disco/1, gather_data/1, format_data/1
]).

-include_lib("kernel/include/logger.hrl").

Expand Down Expand Up @@ -39,19 +44,56 @@
conflict_tables := [table_name()]
}.

-type status_data() :: #{
this_node := node(),
online_nodes := [node()],
system_info := cets_discovery:system_info(),
available_nodes := [node()],
local_table_to_other_nodes_map := tab_nodes_map(),
node_to_tab_nodes_map := node_to_tab_nodes_map()
}.

-spec status(disco_name()) -> info().
status(Disco) when is_atom(Disco) ->
%% Gathering and formatting data is separated to simplify testing/debugging
Data = gather_data(Disco),
format_data(Data).

-spec gather_data(disco_name()) -> status_data().
gather_data(Disco) ->
ThisNode = node(),
Info = cets_discovery:system_info(Disco),
#{tables := Tables} = Info,
OnlineNodes = [ThisNode | nodes()],
AvailNodes = available_nodes(Disco, OnlineNodes),
Expected = get_local_table_to_other_nodes_map(Tables),
OtherTabNodes = get_node_to_tab_nodes_map(AvailNodes, Disco),
#{
this_node => ThisNode,
online_nodes => OnlineNodes,
system_info => Info,
available_nodes => AvailNodes,
local_table_to_other_nodes_map => Expected,
node_to_tab_nodes_map => OtherTabNodes
}.

%% Formats gathered data.
%% Contains pure functions logic.
-spec format_data(status_data()) -> info().
format_data(#{
this_node := ThisNode,
online_nodes := OnlineNodes,
system_info := Info,
available_nodes := AvailNodes,
local_table_to_other_nodes_map := Expected,
node_to_tab_nodes_map := OtherTabNodes
}) ->
%% The node lists could not match for different nodes
%% because they are updated periodically
#{unavailable_nodes := UnNodes, nodes := DiscoNodes, tables := Tables} =
Info = cets_discovery:system_info(Disco),
#{unavailable_nodes := UnNodes, nodes := DiscoNodes, tables := Tables} = Info,
DiscoNodesSorted = lists:sort(DiscoNodes),
OnlineNodes = [node() | nodes()],
AvailNodes = available_nodes(Disco, OnlineNodes),
NoDiscoNodes = remote_nodes_without_disco(DiscoNodesSorted, AvailNodes, OnlineNodes),
Expected = get_local_table_to_other_nodes_map(Tables),
OtherTabNodes = get_node_to_tab_nodes_map(AvailNodes, Disco),
JoinedNodes = joined_nodes(Expected, OtherTabNodes),
JoinedNodes = joined_nodes(ThisNode, Expected, OtherTabNodes),
AllTables = all_tables(Expected, OtherTabNodes),
{UnknownTables, NodesWithUnknownTables} = unknown_tables(OtherTabNodes, Tables, AllTables),
{MissingTables, NodesWithMissingTables} = missing_tables(OtherTabNodes, Tables),
Expand Down Expand Up @@ -96,9 +138,9 @@ get_node_to_tab_nodes_map(AvailNodes, Disco) ->

%% Nodes that has our local tables running (but could also have some unknown tables).
%% All joined nodes replicate data between each other.
-spec joined_nodes(tab_nodes_map(), node_to_tab_nodes_map()) -> [node()].
joined_nodes(Expected, OtherTabNodes) ->
ExpectedTables = maps:keys(Expected),
-spec joined_nodes(node(), tab_nodes_map(), node_to_tab_nodes_map()) -> [node()].
joined_nodes(ThisNode, Expected, OtherTabNodes) ->
ExpectedTables = maps_keys_sorted(Expected),
OtherJoined = maps:fold(
fun(Node, TabNodes, Acc) ->
case maps:with(ExpectedTables, TabNodes) =:= Expected of
Expand All @@ -109,7 +151,7 @@ joined_nodes(Expected, OtherTabNodes) ->
[],
OtherTabNodes
),
lists:sort([node() | OtherJoined]).
lists:sort([ThisNode | OtherJoined]).

unknown_tables(OtherTabNodes, Tables, AllTables) ->
UnknownTables = ordsets:subtract(AllTables, Tables),
Expand All @@ -130,7 +172,7 @@ unknown_tables(OtherTabNodes, Tables, AllTables) ->
missing_tables(OtherTabNodes, LocalTables) ->
Zip = maps:fold(
fun(Node, TabNodes, Acc) ->
RemoteTables = maps:keys(TabNodes),
RemoteTables = maps_keys_sorted(TabNodes),
MissingTables = ordsets:subtract(LocalTables, RemoteTables),
case MissingTables of
[] -> Acc;
Expand Down Expand Up @@ -171,7 +213,7 @@ conflict_tables(Expected, OtherTabNodes) ->
-spec all_tables(tab_nodes_map(), node_to_tab_nodes_map()) -> [table_name()].
all_tables(Expected, OtherTabNodes) ->
TableNodesVariants = [Expected | maps:values(OtherTabNodes)],
TableVariants = lists:map(fun maps:keys/1, TableNodesVariants),
TableVariants = lists:map(fun maps_keys_sorted/1, TableNodesVariants),
ordsets:union(TableVariants).

%% Returns nodes for each table hosted on node()
Expand Down Expand Up @@ -201,3 +243,6 @@ discovery_works(#{last_get_nodes_result := {ok, _}}) ->
true;
discovery_works(_) ->
false.

maps_keys_sorted(Map) ->
lists:sort(maps:keys(Map)).
23 changes: 22 additions & 1 deletion test/cets_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,8 @@ cases() ->
long_call_to_unknown_name_throws_pid_not_found,
send_leader_op_throws_noproc,
pinfo_returns_value,
pinfo_returns_undefined
pinfo_returns_undefined,
format_data_does_not_return_table_duplicates
].

only_for_logger_cases() ->
Expand Down Expand Up @@ -2207,6 +2208,10 @@ join_interrupted_when_ping_crashes(Config) ->
?assertMatch({error, {task_failed, ping_all_failed, #{}}}, Res),
meck:unload().

format_data_does_not_return_table_duplicates(Config) ->
Res = cets_status:format_data(test_data_for_duplicate_missing_table_in_status(Config)),
?assertMatch(#{remote_unknown_tables := [], remote_nodes_with_missing_tables := []}, Res).

%% Helper functions

receive_all_logs(Id) ->
Expand Down Expand Up @@ -2484,3 +2489,19 @@ receive_all_logs_with_log_ref(LogHandlerId, LogRef) ->
after 5000 ->
ct:fail({timeout, receive_all_logs_with_log_ref})
end.

%% Gathered after Helm update
%% with cets_status:gather_data(mongoose_cets_discovery).
test_data_for_duplicate_missing_table_in_status(Config) ->
%% Create atoms in non sorted order
%% maps:keys returns keys in the atom-creation order (and not sorted).
%% Also, compiler is smart and would optimize list_to_atom("literal_string"),
%% so we do a module call to disable this optimization.
_ = list_to_atom(?MODULE:return_same("cets_external_component")),
_ = list_to_atom(?MODULE:return_same("cets_bosh")),
Name = filename:join(proplists:get_value(data_dir, Config), "status_data.txt"),
{ok, [Term]} = file:consult(Name),
Term.

return_same(X) ->
X.
52 changes: 52 additions & 0 deletions test/cets_SUITE_data/status_data.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
#{
system_info =>
#{
nodes =>
[
'[email protected]',
'[email protected]'
],
tables => [cets_bosh, cets_external_component],
unavailable_nodes => []
},
available_nodes =>
[
'[email protected]',
'[email protected]'
],
local_table_to_other_nodes_map =>
#{
cets_external_component =>
[
'[email protected]',
'[email protected]'
],
cets_bosh =>
[
'[email protected]',
'[email protected]'
]
},
node_to_tab_nodes_map =>
#{
'[email protected]' =>
#{
cets_external_component =>
[
'[email protected]',
'[email protected]'
],
cets_bosh =>
[
'[email protected]',
'[email protected]'
]
}
},
online_nodes =>
[
'[email protected]',
'[email protected]'
],
this_node => '[email protected]'
}.

0 comments on commit 2ca3105

Please sign in to comment.