Skip to content

Commit

Permalink
Merge pull request #35 from esl/prune-unavailable-nodes-list
Browse files Browse the repository at this point in the history
Ensure that unavailable_nodes list is a subset of discovered_nodes
  • Loading branch information
chrzaszcz authored Oct 23, 2023
2 parents f715c4a + 8fcd64b commit f131ac9
Show file tree
Hide file tree
Showing 3 changed files with 84 additions and 32 deletions.
3 changes: 2 additions & 1 deletion rebar.config
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@
]},
{test, [
{deps, [
{logger_debug_h, "0.1.0"}
{logger_debug_h, "0.1.0"},
{meck, "0.9.2"}
]},
{plugins, [
{rebar3_codecov, "0.6.0"}
Expand Down
14 changes: 10 additions & 4 deletions src/cets_discovery.erl
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,9 @@
-type state() :: #{
phase := initial | regular,
results := [join_result()],
nodes := [node()],
nodes := ordsets:ordset(node()),
%% The nodes that returned pang, sorted
unavailable_nodes := [node()],
unavailable_nodes := ordsets:ordset(node()),
tables := [atom()],
backend_module := module(),
backend_state := state(),
Expand Down Expand Up @@ -269,8 +269,9 @@ handle_get_nodes_result(Result, BackendState, State) ->
set_nodes({error, _Reason}, State) ->
State;
set_nodes({ok, Nodes}, State) ->
ping_not_connected_nodes(Nodes),
try_joining(State#{nodes := Nodes}).
Nodes2 = lists:usort(Nodes),
ping_not_connected_nodes(Nodes2),
prune_unavailable_nodes_if_needed(try_joining(State#{nodes := Nodes2})).

%% Called when:
%% - a list of connected nodes changes (i.e. nodes() call result)
Expand Down Expand Up @@ -304,6 +305,11 @@ handle_joining_finished(Results, State = #{should_retry_join := Retry}) ->
State2
end.

-spec prune_unavailable_nodes_if_needed(state()) -> state().
prune_unavailable_nodes_if_needed(State = #{nodes := Nodes, unavailable_nodes := UnNodes}) ->
%% Unavailable nodes is a subset of discovered nodes
State#{unavailable_nodes := ordsets:intersection(Nodes, UnNodes)}.

-spec ping_not_connected_nodes([node()]) -> ok.
ping_not_connected_nodes(Nodes) ->
Self = self(),
Expand Down
99 changes: 72 additions & 27 deletions test/cets_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,10 @@ cases() ->
status_available_nodes,
status_available_nodes_do_not_contain_nodes_with_stopped_disco,
status_unavailable_nodes,
status_unavailable_nodes_is_subset_of_discovery_nodes,
status_joined_nodes,
status_discovery_works,
status_discovered_nodes,
status_remote_nodes_without_disco,
status_remote_nodes_with_unknown_tables,
status_remote_nodes_with_missing_nodes,
Expand Down Expand Up @@ -1309,19 +1311,45 @@ status_unavailable_nodes(Config) ->
ok = cets_discovery:wait_for_ready(DiscoName, 5000),
?assertMatch(#{unavailable_nodes := ['badnode@localhost']}, cets_status:status(DiscoName)).

status_unavailable_nodes_is_subset_of_discovery_nodes(Config) ->
Node1 = node(),
GetFn1 = fun(State) -> {{ok, [Node1, 'badnode@localhost']}, State} end,
GetFn2 = fun(State) -> {{ok, [Node1]}, State} end,
%% Setup meck
BackendModule = make_name(Config, disco_backend),
meck:new(BackendModule, [non_strict]),
meck:expect(BackendModule, init, fun(_Opts) -> undefined end),
meck:expect(BackendModule, get_nodes, GetFn1),
DiscoName = disco_name(Config),
Disco = start_disco(Node1, #{
name => DiscoName, backend_module => BackendModule
}),
%% Disco needs at least one table to start calling get_nodes function
Tab = make_name(Config),
{ok, _} = start(Node1, Tab),
cets_discovery:add_table(Disco, Tab),
ok = cets_discovery:wait_for_ready(DiscoName, 5000),
?assertMatch(#{unavailable_nodes := ['badnode@localhost']}, cets_status:status(DiscoName)),
%% Remove badnode from disco
meck:expect(BackendModule, get_nodes, GetFn2),
%% Force check.
Disco ! check,
%% The unavailable_nodes list is updated
CondF = fun() -> maps:get(unavailable_nodes, cets_status:status(DiscoName)) end,
cets_test_wait:wait_until(CondF, []).

status_joined_nodes(Config) ->
Node1 = node(),
#{ct2 := Node2} = proplists:get_value(nodes, Config),
F = fun(State) ->
{{ok, [Node1, Node2]}, State}
end,
DiscoName = disco_name(Config),
Disco1 = start_disco(Node1, #{
DiscoOpts = #{
name => DiscoName, backend_module => cets_discovery_fun, get_nodes_fn => F
}),
Disco2 = start_disco(Node2, #{
name => DiscoName, backend_module => cets_discovery_fun, get_nodes_fn => F
}),
},
Disco1 = start_disco(Node1, DiscoOpts),
Disco2 = start_disco(Node2, DiscoOpts),
Tab = make_name(Config),
{ok, _} = start(Node1, Tab),
{ok, _} = start(Node2, Tab),
Expand All @@ -1340,12 +1368,11 @@ status_discovery_works(Config) ->
{{ok, [Node1, Node2]}, State}
end,
DiscoName = disco_name(Config),
Disco1 = start_disco(Node1, #{
DiscoOpts = #{
name => DiscoName, backend_module => cets_discovery_fun, get_nodes_fn => F
}),
Disco2 = start_disco(Node2, #{
name => DiscoName, backend_module => cets_discovery_fun, get_nodes_fn => F
}),
},
Disco1 = start_disco(Node1, DiscoOpts),
Disco2 = start_disco(Node2, DiscoOpts),
Tab = make_name(Config),
{ok, _} = start(Node1, Tab),
{ok, _} = start(Node2, Tab),
Expand All @@ -1355,6 +1382,24 @@ status_discovery_works(Config) ->
ok = cets_discovery:wait_for_ready(DiscoName, 5000),
?assertMatch(#{discovery_works := true}, cets_status:status(DiscoName)).

status_discovered_nodes(Config) ->
Node1 = node(),
#{ct2 := Node2} = proplists:get_value(nodes, Config),
F = fun(State) ->
{{ok, [Node1, Node2]}, State}
end,
DiscoName = disco_name(Config),
Disco = start_disco(Node1, #{
name => DiscoName, backend_module => cets_discovery_fun, get_nodes_fn => F
}),
Tab = make_name(Config),
{ok, _} = start(Node1, Tab),
{ok, _} = start(Node2, Tab),
%% Add table using pids (i.e. no need to do RPCs here)
cets_discovery:add_table(Disco, Tab),
ok = cets_discovery:wait_for_ready(DiscoName, 5000),
?assertMatch(#{discovered_nodes := [Node1, Node2]}, cets_status:status(DiscoName)).

status_remote_nodes_without_disco(Config) ->
Node1 = node(),
#{ct2 := Node2} = proplists:get_value(nodes, Config),
Expand All @@ -1378,12 +1423,11 @@ status_remote_nodes_with_unknown_tables(Config) ->
{{ok, [Node1, Node2]}, State}
end,
DiscoName = disco_name(Config),
Disco1 = start_disco(Node1, #{
DiscoOpts = #{
name => DiscoName, backend_module => cets_discovery_fun, get_nodes_fn => F
}),
Disco2 = start_disco(Node2, #{
name => DiscoName, backend_module => cets_discovery_fun, get_nodes_fn => F
}),
},
Disco1 = start_disco(Node1, DiscoOpts),
Disco2 = start_disco(Node2, DiscoOpts),
Tab1 = make_name(Config, 1),
Tab2 = make_name(Config, 2),
%% Node1 does not have Tab2
Expand Down Expand Up @@ -1413,12 +1457,11 @@ status_remote_nodes_with_missing_nodes(Config) ->
{{ok, [Node1, Node2]}, State}
end,
DiscoName = disco_name(Config),
Disco1 = start_disco(Node1, #{
name => DiscoName, backend_module => cets_discovery_fun, get_nodes_fn => F
}),
Disco2 = start_disco(Node2, #{
DiscoOpts = #{
name => DiscoName, backend_module => cets_discovery_fun, get_nodes_fn => F
}),
},
Disco1 = start_disco(Node1, DiscoOpts),
Disco2 = start_disco(Node2, DiscoOpts),
Tab1 = make_name(Config, 1),
Tab2 = make_name(Config, 2),
%% Node2 does not have Tab2
Expand Down Expand Up @@ -1447,12 +1490,11 @@ status_conflict_nodes(Config) ->
{{ok, [Node1, Node2]}, State}
end,
DiscoName = disco_name(Config),
Disco1 = start_disco(Node1, #{
name => DiscoName, backend_module => cets_discovery_fun, get_nodes_fn => F
}),
Disco2 = start_disco(Node2, #{
DiscoOpts = #{
name => DiscoName, backend_module => cets_discovery_fun, get_nodes_fn => F
}),
},
Disco1 = start_disco(Node1, DiscoOpts),
Disco2 = start_disco(Node2, DiscoOpts),
Tab1 = make_name(Config, 1),
Tab2 = make_name(Config, 2),
{ok, _} = start(Node1, Tab1),
Expand Down Expand Up @@ -2051,9 +2093,12 @@ receive_message_with_arg(Tag) ->
make_name(Config) ->
make_name(Config, 1).

make_name(Config, Num) ->
make_name(Config, Num) when is_integer(Num) ->
Testcase = proplists:get_value(testcase, Config),
list_to_atom(atom_to_list(Testcase) ++ "_" ++ integer_to_list(Num));
make_name(Config, Atom) when is_atom(Atom) ->
Testcase = proplists:get_value(testcase, Config),
list_to_atom(atom_to_list(Testcase) ++ "_" ++ integer_to_list(Num)).
list_to_atom(atom_to_list(Testcase) ++ "_" ++ atom_to_list(Atom)).

lock_name(Config) ->
Testcase = proplists:get_value(testcase, Config),
Expand Down

0 comments on commit f131ac9

Please sign in to comment.