Skip to content

Commit

Permalink
Merge pull request #34 from esl/skip-check_do_not_overlap_failed-erro…
Browse files Browse the repository at this point in the history
…r-if-fully-connected

Do not log check_do_not_overlap_failed error, if the cluster is already joined
  • Loading branch information
chrzaszcz authored Oct 6, 2023
2 parents e3ad43f + 626e88e commit 9849494
Show file tree
Hide file tree
Showing 2 changed files with 143 additions and 34 deletions.
66 changes: 40 additions & 26 deletions src/cets_join.erl
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
-include_lib("kernel/include/logger.hrl").

-ifdef(TEST).
-export([check_could_reach_each_other/2]).
-export([check_could_reach_each_other/3]).
-endif.

-type lock_key() :: term().
Expand Down Expand Up @@ -77,7 +77,8 @@ join_loop(LockKey, Info, LocalPid, RemotePid, Start, JoinOpts) ->
%% overloaded or joining is already in progress on another node
?LOG_INFO(Info#{what => join_got_lock, after_time_ms => Diff}),
%% Do joining in a separate process to reduce GC
cets_long:run_spawn(Info, fun() -> join2(LocalPid, RemotePid, JoinOpts) end)
FF = handle_throw(fun() -> join2(Info, LocalPid, RemotePid, JoinOpts) end),
cets_long:run_spawn(Info, FF)
end,
LockRequest = {LockKey, self()},
%% Just lock all nodes, no magic here :)
Expand All @@ -94,8 +95,8 @@ join_loop(LockKey, Info, LocalPid, RemotePid, Start, JoinOpts) ->

%% Exchanges data and a list of servers.
%% Pauses new operations during the exchange.
-spec join2(server_pid(), server_pid(), join_opts()) -> ok.
join2(LocalPid, RemotePid, JoinOpts) ->
-spec join2(cets_long:log_info(), server_pid(), server_pid(), join_opts()) -> ok.
join2(Info, LocalPid, RemotePid, JoinOpts) ->
checkpoint(join_start, JoinOpts),
JoinRef = make_ref(),
%% Joining is a symmetrical operation here - both servers exchange information between each other.
Expand All @@ -105,7 +106,7 @@ join2(LocalPid, RemotePid, JoinOpts) ->
checkpoint(before_get_pids, JoinOpts),
LocPids = get_pids(LocalPid),
RemPids = get_pids(RemotePid),
check_pids(LocPids, RemPids, JoinOpts),
check_pids(Info, LocPids, RemPids, JoinOpts),
AllPids = LocPids ++ RemPids,
Paused = [{Pid, cets:pause(Pid)} || Pid <- AllPids],
%% Merges data from two partitions together.
Expand All @@ -118,8 +119,8 @@ join2(LocalPid, RemotePid, JoinOpts) ->
{ok, RemoteDump} = remote_or_local_dump(RemotePid),
%% Check that still fully connected after getting the dumps
%% and before making any changes
check_fully_connected(LocPids),
check_fully_connected(RemPids),
check_fully_connected(Info, LocPids),
check_fully_connected(Info, RemPids),
{LocalDump2, RemoteDump2} = maybe_apply_resolver(LocalDump, RemoteDump, ServerOpts),
RemF = fun(Pid) -> send_dump(Pid, LocPids, JoinRef, LocalDump2, JoinOpts) end,
LocF = fun(Pid) -> send_dump(Pid, RemPids, JoinRef, RemoteDump2, JoinOpts) end,
Expand Down Expand Up @@ -190,16 +191,16 @@ apply_resolver_for_sorted(LocalDump, RemoteDump, _F, _Pos, LocalAcc, RemoteAcc)
get_pids(Pid) ->
ordsets:add_element(Pid, cets:other_pids(Pid)).

-spec check_pids(cets:servers(), cets:servers(), join_opts()) -> ok.
check_pids(LocPids, RemPids, JoinOpts) ->
check_do_not_overlap(LocPids, RemPids),
-spec check_pids(cets_long:log_info(), cets:servers(), cets:servers(), join_opts()) -> ok.
check_pids(Info, LocPids, RemPids, JoinOpts) ->
check_do_not_overlap(Info, LocPids, RemPids),
checkpoint(before_check_fully_connected, JoinOpts),
check_fully_connected(LocPids),
check_fully_connected(RemPids),
check_could_reach_each_other(LocPids, RemPids).
check_fully_connected(Info, LocPids),
check_fully_connected(Info, RemPids),
check_could_reach_each_other(Info, LocPids, RemPids).

-spec check_could_reach_each_other(cets:servers(), cets:servers()) -> ok.
check_could_reach_each_other(LocPids, RemPids) ->
-spec check_could_reach_each_other(cets_long:log_info(), cets:servers(), cets:servers()) -> ok.
check_could_reach_each_other(Info, LocPids, RemPids) ->
LocNodes = lists:usort(lists:map(fun node/1, LocPids)),
RemNodes = lists:usort(lists:map(fun node/1, RemPids)),
Pairs = lists:usort([
Expand All @@ -221,20 +222,23 @@ check_could_reach_each_other(LocPids, RemPids) ->
[] ->
ok;
_ ->
?LOG_ERROR(#{
?LOG_ERROR(Info#{
what => check_could_reach_each_other_failed,
node_pairs_not_connected => NotConnected
}),
error(check_could_reach_each_other_failed)
end.

-spec check_do_not_overlap(cets:servers(), cets:servers()) -> ok.
check_do_not_overlap(LocPids, RemPids) ->
-spec check_do_not_overlap(cets_long:log_info(), cets:servers(), cets:servers()) -> ok.
check_do_not_overlap(_Info, Pids, Pids) ->
%% Same pids, looks like cluster is fully connected, just exit
throw(skip_join_when_pids_are_the_same);
check_do_not_overlap(Info, LocPids, RemPids) ->
case ordsets:intersection(LocPids, RemPids) of
[] ->
ok;
Overlap ->
?LOG_ERROR(#{
?LOG_ERROR(Info#{
what => check_do_not_overlap_failed,
local_servers => LocPids,
remote_servers => RemPids,
Expand All @@ -243,16 +247,26 @@ check_do_not_overlap(LocPids, RemPids) ->
error(check_do_not_overlap_failed)
end.

handle_throw(F) ->
fun() ->
try
F()
catch
throw:skip_join_when_pids_are_the_same ->
ok
end
end.

%% Checks that other_pids lists match for all nodes
%% If they are not matching - the node removal process could be in progress
-spec check_fully_connected(cets:servers()) -> ok.
check_fully_connected(Pids) ->
-spec check_fully_connected(cets_long:log_info(), cets:servers()) -> ok.
check_fully_connected(Info, Pids) ->
Lists = [get_pids(Pid) || Pid <- Pids],
case lists:usort([Pids | Lists]) of
[_] ->
check_same_join_ref(Pids);
check_same_join_ref(Info, Pids);
UniqueLists ->
?LOG_ERROR(#{
?LOG_ERROR(Info#{
what => check_fully_connected_failed,
expected_pids => Pids,
server_lists => Lists,
Expand All @@ -263,14 +277,14 @@ check_fully_connected(Pids) ->

%% Check if all nodes have the same join_ref
%% If not - we don't want to continue joining
-spec check_same_join_ref(cets:servers()) -> ok.
check_same_join_ref(Pids) ->
-spec check_same_join_ref(cets_long:log_info(), cets:servers()) -> ok.
check_same_join_ref(Info, Pids) ->
Refs = [pid_to_join_ref(Pid) || Pid <- Pids],
case lists:usort(Refs) of
[_] ->
ok;
UniqueRefs ->
?LOG_ERROR(#{
?LOG_ERROR(Info#{
what => check_same_join_ref_failed,
refs => lists:zip(Pids, Refs),
unique_refs => UniqueRefs
Expand Down
111 changes: 103 additions & 8 deletions test/cets_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,9 @@ cases() ->

only_for_logger_cases() ->
[
run_tracked_logged_check_logger
run_tracked_logged_check_logger,
logs_are_printed_when_join_fails_because_servers_overlap,
join_done_already_while_waiting_for_lock_so_do_nothing
].

seq_cases() ->
Expand Down Expand Up @@ -844,6 +846,26 @@ join_fails_because_servers_overlap(Config) ->
{error, check_do_not_overlap_failed} =
cets_join:join(lock_name(Config), #{}, Pid1, Pid2, #{}).

%% join_fails_because_servers_overlap testcase, but we check the logging.
%% We check that `?LOG_ERROR(#{what => check_do_not_overlap_failed})' is called.
logs_are_printed_when_join_fails_because_servers_overlap(Config) ->
LogRef = make_ref(),
logger_debug_h:start(#{id => ?FUNCTION_NAME}),
#{pids := [Pid1, Pid2, Pid3]} = given_3_servers(Config),
set_other_servers(Pid1, [Pid3]),
set_other_servers(Pid2, [Pid3]),
{error, check_do_not_overlap_failed} =
cets_join:join(lock_name(Config), #{log_ref => LogRef}, Pid1, Pid2, #{}),
receive
{log, ?FUNCTION_NAME, #{
level := error,
msg := {report, #{what := check_do_not_overlap_failed, log_ref := LogRef}}
}} ->
ok
after 5000 ->
ct:fail(timeout)
end.

remote_ops_are_ignored_if_join_ref_does_not_match(Config) ->
{ok, Pid1} = start_local(make_name(Config, 1)),
{ok, Pid2} = start_local(make_name(Config, 2)),
Expand Down Expand Up @@ -883,6 +905,40 @@ join_retried_if_lock_is_busy(Config) ->
end),
receive_message(before_retry).

join_done_already_while_waiting_for_lock_so_do_nothing(Config) ->
logger_debug_h:start(#{id => ?FUNCTION_NAME}),
Me = self(),
#{pids := [Pid1, Pid2, Pid3, Pid4]} = given_n_servers(Config, 4, #{}),
Lock = lock_name(Config),
ok = cets_join:join(Lock, #{}, Pid1, Pid2, #{}),
ok = cets_join:join(Lock, #{}, Pid3, Pid4, #{}),
%% It is to just match logs
LogRef = make_ref(),
Info = #{log_ref => LogRef},
F1 = send_join_start_back_and_wait_for_continue_joining(),
F2 = fun(_) -> ok end,
%% Get the lock in a separate process
spawn_link(fun() ->
ok = cets_join:join(Lock, Info, Pid1, Pid3, #{checkpoint_handler => F1}),
Me ! first_join_returns
end),
JoinPid = receive_message_with_arg(join_start),
spawn_link(fun() ->
ok = cets_join:join(Lock, Info, Pid1, Pid3, #{checkpoint_handler => F2}),
Me ! second_join_returns
end),
JoinPid ! continue_joining,
%% At this point our first join would finish, after that our second join should exit too.
receive_message(first_join_returns),
receive_message(second_join_returns),
%% Ensure all logs are received by removing the handler, it is a sync operation.
%% (we do not expect any logs anyway).
logger:remove_handler(?FUNCTION_NAME),
%% Ensure there is nothing logged, we use log_ref to ignore logs from other tests.
%% The counter example for no logging is
%% the logs_are_printed_when_join_fails_because_servers_overlap testcase.
assert_nothing_is_logged(?FUNCTION_NAME, LogRef).

send_dump_contains_already_added_servers(Config) ->
%% Check that even if we have already added server in send_dump, nothing crashes
{ok, Pid1} = start_local(make_name(Config, 1)),
Expand Down Expand Up @@ -1654,7 +1710,7 @@ check_could_reach_each_other_fails(_Config) ->
?assertException(
error,
check_could_reach_each_other_failed,
cets_join:check_could_reach_each_other([self()], [bad_node_pid()])
cets_join:check_could_reach_each_other(#{}, [self()], [bad_node_pid()])
).

%% Cases to improve code coverage
Expand Down Expand Up @@ -1727,15 +1783,16 @@ run_tracked_logged(_Config) ->

run_tracked_logged_check_logger(_Config) ->
logger_debug_h:start(#{id => ?FUNCTION_NAME}),
LogRef = make_ref(),
F = fun() -> timer:sleep(5000) end,
%% Run it in a separate process, so we can check logs in the test process
%% Overwrite default five seconds interval with 10 milliseconds
spawn_link(fun() -> cets_long:run_tracked(#{report_interval => 10}, F) end),
spawn_link(fun() -> cets_long:run_tracked(#{report_interval => 10, log_ref => LogRef}, F) end),
%% Exit test after first log event
receive
{log, ?FUNCTION_NAME, #{
level := warning,
msg := {report, #{what := long_task_progress}}
msg := {report, #{what := long_task_progress, log_ref := LogRef}}
}} ->
ok
after 5000 ->
Expand Down Expand Up @@ -1973,6 +2030,12 @@ receive_message(M) ->
after 5000 -> error({receive_message_timeout, M})
end.

receive_message_with_arg(Tag) ->
receive
{Tag, Arg} -> Arg
after 5000 -> error({receive_message_with_arg_timeout, Tag})
end.

make_name(Config) ->
make_name(Config, 1).

Expand Down Expand Up @@ -2023,10 +2086,18 @@ given_3_servers(Config) ->
given_3_servers(Config, #{}).

given_3_servers(Config, Opts) ->
{ok, Pid1} = start_local(T1 = make_name(Config, 1), Opts),
{ok, Pid2} = start_local(T2 = make_name(Config, 2), Opts),
{ok, Pid3} = start_local(T3 = make_name(Config, 3), Opts),
#{pids => [Pid1, Pid2, Pid3], tabs => [T1, T2, T3]}.
given_n_servers(Config, 3, Opts).

given_n_servers(Config, N, Opts) ->
Tabs = [make_name(Config, X) || X <- lists:seq(1, N)],
Pids = [
begin
{ok, Pid} = start_local(Tab, Opts),
Pid
end
|| Tab <- Tabs
],
#{pids => Pids, tabs => Tabs}.

stopped_pid() ->
%% Get a pid for a stopped process
Expand Down Expand Up @@ -2076,3 +2147,27 @@ bad_node_pid_binary() ->
%% Pid <0.90.0> on badnode@localhost
<<131, 88, 100, 0, 17, 98, 97, 100, 110, 111, 100, 101, 64, 108, 111, 99, 97, 108, 104, 111,
115, 116, 0, 0, 0, 90, 0, 0, 0, 0, 100, 206, 70, 92>>.

assert_nothing_is_logged(LogHandlerId, LogRef) ->
receive
{log, LogHandlerId, #{
level := Level,
msg := {report, #{log_ref := LogRef}}
}} when Level =:= warning; Level =:= error ->
ct:fail(got_logging_but_should_not)
after 0 ->
ok
end.

send_join_start_back_and_wait_for_continue_joining() ->
Me = self(),
fun
(join_start) ->
Me ! {join_start, self()},
receive
continue_joining ->
ok
end;
(_) ->
ok
end.

0 comments on commit 9849494

Please sign in to comment.