From 162cb24035eb600f220d26c9e42e0717584c26d2 Mon Sep 17 00:00:00 2001 From: Mikhail Uvarov Date: Thu, 9 Nov 2023 13:11:11 +0100 Subject: [PATCH 01/41] Improve net_adm:ping/1 Do not call net_kernel to connect to a node, if its name is not resolvable Do not disconnect from a node if ping fails --- src/cets_discovery.erl | 2 +- src/cets_ping.erl | 44 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 45 insertions(+), 1 deletion(-) create mode 100644 src/cets_ping.erl diff --git a/src/cets_discovery.erl b/src/cets_discovery.erl index f39ab5cf..2eb54b04 100644 --- a/src/cets_discovery.erl +++ b/src/cets_discovery.erl @@ -315,7 +315,7 @@ ping_not_connected_nodes(Nodes) -> Self = self(), NotConNodes = Nodes -- [node() | nodes()], [ - spawn(fun() -> Self ! {ping_result, Node, net_adm:ping(Node)} end) + spawn(fun() -> Self ! {ping_result, Node, cets_ping:ping(Node)} end) || Node <- lists:sort(NotConNodes) ], ok. diff --git a/src/cets_ping.erl b/src/cets_ping.erl new file mode 100644 index 00000000..bed41489 --- /dev/null +++ b/src/cets_ping.erl @@ -0,0 +1,44 @@ +-module(cets_ping). +-export([ping/1]). +ping(Node) when is_atom(Node) -> + %% It is important to understand, that initial setup for dist connections + %% is done by the single net_kernel process. + %% It calls net_kernel:setup, which calls inet_tcp_dist, which calls + %% erl_epmd:address_please/3, which does a DNS request. + %% If DNS is slow - net_kernel process would become busy. + %% But if we have a lot of offline nodes in the CETS discovery table, + %% we would try to call net_kernel for each node (even if we probably would receive + %% {error, nxdomain} from erl_epmd:address_please/3). + %% So, we first do nslookup here and only after that we try to connect. + case lists:member(Node, nodes()) of + true -> + pong; + false -> + case dist_util:split_node(Node) of + {node, _Name, Host} -> + case {inet:getaddr(Host, inet6), inet:getaddr(Host, inet)} of + {{error, _}, {error, _}} -> + pang; + _ -> + ping_without_disconnect(Node) + end; + _ -> + pang + end + end. + +%% net_adm:ping/1 but without disconnect_node +%% (because disconnect_node could introduce more chaos and it is not atomic) +ping_without_disconnect(Node) -> + Msg = {is_auth, node()}, + Dst = {net_kernel, Node}, + try gen:call(Dst, '$gen_call', Msg, infinity) of + {ok, yes} -> + pong; + _ -> + % erlang:disconnect_node(Node), + pang + catch + _:_ -> + pang + end. From 18e119cf71b10471ab24fab9c4d0cea410a52161 Mon Sep 17 00:00:00 2001 From: Mikhail Uvarov Date: Fri, 10 Nov 2023 11:04:02 +0100 Subject: [PATCH 02/41] Rewrite ping logic - use connect_node there instead Add ping_pairs function ping_pairs does not use net_adm:ping anymore --- src/cets_join.erl | 11 +---------- src/cets_ping.erl | 50 +++++++++++++++++++++++++++++++++++------------ 2 files changed, 38 insertions(+), 23 deletions(-) diff --git a/src/cets_join.erl b/src/cets_join.erl index 28a01394..ad449d6d 100644 --- a/src/cets_join.erl +++ b/src/cets_join.erl @@ -207,16 +207,7 @@ check_could_reach_each_other(Info, LocPids, RemPids) -> {min(LocNode, RemNode), max(LocNode, RemNode)} || LocNode <- LocNodes, RemNode <- RemNodes, LocNode =/= RemNode ]), - Results = - [ - {Node1, Node2, - cets_long:run_tracked( - #{task => ping_node, node1 => Node1, node2 => Node2}, fun() -> - rpc:call(Node1, net_adm, ping, [Node2], 10000) - end - )} - || {Node1, Node2} <- Pairs - ], + Results = cets_ping:ping_pairs(Pairs), NotConnected = [X || {_Node1, _Node2, Res} = X <- Results, Res =/= pong], case NotConnected of [] -> diff --git a/src/cets_ping.erl b/src/cets_ping.erl index bed41489..6fb86608 100644 --- a/src/cets_ping.erl +++ b/src/cets_ping.erl @@ -1,5 +1,5 @@ -module(cets_ping). --export([ping/1]). +-export([ping/1, ping_pairs/1]). ping(Node) when is_atom(Node) -> %% It is important to understand, that initial setup for dist connections %% is done by the single net_kernel process. @@ -20,25 +20,49 @@ ping(Node) when is_atom(Node) -> {{error, _}, {error, _}} -> pang; _ -> - ping_without_disconnect(Node) + connect_ping(Node) end; _ -> pang end end. -%% net_adm:ping/1 but without disconnect_node -%% (because disconnect_node could introduce more chaos and it is not atomic) -ping_without_disconnect(Node) -> - Msg = {is_auth, node()}, - Dst = {net_kernel, Node}, - try gen:call(Dst, '$gen_call', Msg, infinity) of - {ok, yes} -> +connect_ping(Node) -> + %% We could use net_adm:ping/1 but it does: + %% - disconnect node on pang - we don't want that + %% (because it could disconnect already connected node because of race conditions) + %% - it calls net_kernel's gen_server of the remote server, + %% but it could be busy doing something, + %% which means slower response time. + case net_kernel:connect_node(Node) of + true -> pong; _ -> - % erlang:disconnect_node(Node), - pang - catch - _:_ -> pang end. + +-spec ping_pairs([{node(), node()}]) -> [{node(), node(), pong | Reason :: term()}]. +ping_pairs(Pairs) -> + %% We could use rpc:multicall(Nodes, cets_ping, ping, Args). + %% But it means more chance of nodes trying to contact each other. + ping_pairs_stop_on_pang(Pairs). + +ping_pairs_stop_on_pang([{Node1, Node2} | Pairs]) -> + F = fun() -> rpc:call(Node1, cets_ping, ping, [Node2], 10000) end, + Info = #{task => ping_node, node1 => Node1, node2 => Node2}, + Res = cets_long:run_tracked(Info, F), + case Res of + pong -> + [{Node1, Node2, pong} | ping_pairs_stop_on_pang(Pairs)]; + Other -> + %% We do not need to ping the rest of nodes - + %% one node returning pang is enough to cancel join. + %% We could exit earlier and safe some time + %% (connect_node to the dead node could be time consuming) + [{Node1, Node2, Other} | fail_pairs(Pairs, skipped)] + end; +ping_pairs_stop_on_pang([]) -> + []. + +fail_pairs(Pairs, Reason) -> + [{Node1, Node2, Reason} || {Node1, Node2} <- Pairs]. From 96cf985f684d6632184833beeefae1db9d70b12e Mon Sep 17 00:00:00 2001 From: Mikhail Uvarov Date: Fri, 10 Nov 2023 15:26:15 +0100 Subject: [PATCH 03/41] Do check_could_reach_each_other before check_fully_connected Do pings before sending any messages to new nodes --- src/cets_join.erl | 4 ++-- test/cets_SUITE.erl | 3 +-- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/src/cets_join.erl b/src/cets_join.erl index ad449d6d..95b63387 100644 --- a/src/cets_join.erl +++ b/src/cets_join.erl @@ -195,9 +195,9 @@ get_pids(Pid) -> check_pids(Info, LocPids, RemPids, JoinOpts) -> check_do_not_overlap(Info, LocPids, RemPids), checkpoint(before_check_fully_connected, JoinOpts), + check_could_reach_each_other(Info, LocPids, RemPids), check_fully_connected(Info, LocPids), - check_fully_connected(Info, RemPids), - check_could_reach_each_other(Info, LocPids, RemPids). + check_fully_connected(Info, RemPids). -spec check_could_reach_each_other(cets_long:log_info(), cets:servers(), cets:servers()) -> ok. check_could_reach_each_other(Info, LocPids, RemPids) -> diff --git a/test/cets_SUITE.erl b/test/cets_SUITE.erl index 2ea8cf76..c527354f 100644 --- a/test/cets_SUITE.erl +++ b/test/cets_SUITE.erl @@ -2087,8 +2087,7 @@ joining_not_fully_connected_node_is_not_allowed(Config) -> %% Pid5 and Pid3 could contact each other. %% Pid3 could contact Pid1 (they are joined). %% But Pid5 cannot contact Pid1. - {error, - {task_failed, {{nodedown, Node1}, {gen_server, call, [_, other_servers, infinity]}}, _}} = + {error, {task_failed, check_could_reach_each_other_failed, _}} = rpc(Peer5, cets_join, join, [lock_name(Config), #{}, Pid5, Pid3]), %% Still connected cets:insert(Pid1, {r1}), From d6e617eac6dfdd741388d06564497117aef0b82b Mon Sep 17 00:00:00 2001 From: Mikhail Uvarov Date: Tue, 14 Nov 2023 10:07:01 +0100 Subject: [PATCH 04/41] Report transaction retry on info level, not error --- src/cets_join.erl | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/cets_join.erl b/src/cets_join.erl index 95b63387..b63974c5 100644 --- a/src/cets_join.erl +++ b/src/cets_join.erl @@ -84,10 +84,12 @@ join_loop(LockKey, Info, LocalPid, RemotePid, Start, JoinOpts) -> %% Just lock all nodes, no magic here :) Nodes = [node() | nodes()], Retries = 1, + %% global could abort the transaction when one of the nodes goes down. + %% It could usually abort it during startup or update. case global:trans(LockRequest, F, Nodes, Retries) of aborted -> checkpoint(before_retry, JoinOpts), - ?LOG_ERROR(Info#{what => join_retry, reason => lock_aborted}), + ?LOG_INFO(Info#{what => join_retry, reason => lock_aborted}), join_loop(LockKey, Info, LocalPid, RemotePid, Start, JoinOpts); Result -> Result From 3ad38f3a67417de9b43b0b93575005158cfcebd1 Mon Sep 17 00:00:00 2001 From: Mikhail Uvarov Date: Tue, 14 Nov 2023 10:28:45 +0100 Subject: [PATCH 05/41] Log nodeup/nodedown events with warning level --- src/cets_discovery.erl | 41 ++++++++++++++++++++++++++++++++++------- 1 file changed, 34 insertions(+), 7 deletions(-) diff --git a/src/cets_discovery.erl b/src/cets_discovery.erl index 2eb54b04..bb339264 100644 --- a/src/cets_discovery.erl +++ b/src/cets_discovery.erl @@ -90,8 +90,10 @@ join_status := not_running | running, should_retry_join := boolean(), timer_ref := reference() | undefined, - pending_wait_for_ready := [gen_server:from()] + pending_wait_for_ready := [gen_server:from()], + nodeup_timestamps := #{node() => milliseconds()} }. +-type milliseconds() :: integer(). %% Backend could define its own options -type opts() :: #{name := atom(), _ := _}. @@ -159,7 +161,7 @@ init(Opts) -> BackendState = Mod:init(Opts), %% Changes phase from initial to regular (affects the check interval) erlang:send_after(timer:minutes(5), self(), enter_regular_phase), - {ok, #{ + State = #{ phase => initial, results => [], nodes => [], @@ -174,8 +176,13 @@ init(Opts) -> join_status => not_running, should_retry_join => false, timer_ref => undefined, - pending_wait_for_ready => [] - }}. + pending_wait_for_ready => [], + nodeup_timestamps => #{} + }, + %% Set initial timestamps because we would not receive nodeup events for + %% already connected nodes + State2 = lists:foldl(fun remember_nodeup_timestamp/2, State, nodes()), + {ok, State2}. -spec handle_call(term(), from(), state()) -> {reply, term(), state()} | {noreply, state()}. handle_call(get_tables, _From, State = #{tables := Tables}) -> @@ -216,12 +223,15 @@ handle_info(check, State) -> handle_info({handle_check_result, Result, BackendState}, State) -> {noreply, handle_get_nodes_result(Result, BackendState, State)}; handle_info({nodeup, Node}, State) -> + ?LOG_WARNING(#{what => nodeup, remote_node => Node}), State2 = remove_node_from_unavailable_list(Node, State), - {noreply, try_joining(State2)}; -handle_info({nodedown, _Node}, State) -> + {noreply, remember_nodeup_timestamp(Node, try_joining(State2))}; +handle_info({nodedown, Node}, State) -> + {NodeUpTime, State2} = remove_nodeup_timestamp(Node, State), + ?LOG_WARNING(#{what => nodedown, remote_node => Node, connected_for_milliseconds => NodeUpTime}), %% Do another check to update unavailable_nodes list self() ! check, - {noreply, State}; + {noreply, State2}; handle_info({joining_finished, Results}, State) -> {noreply, handle_joining_finished(Results, State)}; handle_info({ping_result, Node, Result}, State) -> @@ -454,3 +464,20 @@ has_join_result_for(Node, Table, #{results := Results}) -> -spec handle_system_info(state()) -> system_info(). handle_system_info(State) -> State#{verify_ready => verify_ready(State)}. + +remember_nodeup_timestamp(Node, State = #{nodeup_timestamps := Map}) -> + Time = erlang:system_time(millisecond), + Map2 = Map#{Node => Time}, + State#{nodeup_timestamps := Map2}. + +remove_nodeup_timestamp(Node, State = #{nodeup_timestamps := Map}) -> + StartTime = maps:get(Node, Map, unknown), + NodeUpTime = calculate_uptime(StartTime), + Map2 = maps:remove(Node, State), + {NodeUpTime, Map2}. + +calculate_uptime(unknown) -> + unknown; +calculate_uptime(StartTime) -> + Time = erlang:system_time(millisecond), + Time - StartTime. From 7acbabc70c2ed09524f3697b3ea58250617cd723 Mon Sep 17 00:00:00 2001 From: Mikhail Uvarov Date: Tue, 14 Nov 2023 11:33:40 +0100 Subject: [PATCH 06/41] Report downtime duration --- src/cets_discovery.erl | 45 ++++++++++++++++++++++++++++++++++-------- 1 file changed, 37 insertions(+), 8 deletions(-) diff --git a/src/cets_discovery.erl b/src/cets_discovery.erl index bb339264..eb725fa3 100644 --- a/src/cets_discovery.erl +++ b/src/cets_discovery.erl @@ -91,7 +91,8 @@ should_retry_join := boolean(), timer_ref := reference() | undefined, pending_wait_for_ready := [gen_server:from()], - nodeup_timestamps := #{node() => milliseconds()} + nodeup_timestamps := #{node() => milliseconds()}, + nodedown_timestamps := #{node() => milliseconds()} }. -type milliseconds() :: integer(). @@ -177,7 +178,8 @@ init(Opts) -> should_retry_join => false, timer_ref => undefined, pending_wait_for_ready => [], - nodeup_timestamps => #{} + nodeup_timestamps => #{}, + nodedown_timestamps => #{} }, %% Set initial timestamps because we would not receive nodeup events for %% already connected nodes @@ -223,12 +225,17 @@ handle_info(check, State) -> handle_info({handle_check_result, Result, BackendState}, State) -> {noreply, handle_get_nodes_result(Result, BackendState, State)}; handle_info({nodeup, Node}, State) -> - ?LOG_WARNING(#{what => nodeup, remote_node => Node}), - State2 = remove_node_from_unavailable_list(Node, State), - {noreply, remember_nodeup_timestamp(Node, try_joining(State2))}; + {NodeDownTime, State2} = handle_nodeup(Node, State), + ?LOG_WARNING(#{ + what => nodeup, remote_node => Node, downtime_millisecond_duration => NodeDownTime + }), + State3 = remove_node_from_unavailable_list(Node, State2), + {noreply, try_joining(State3)}; handle_info({nodedown, Node}, State) -> - {NodeUpTime, State2} = remove_nodeup_timestamp(Node, State), - ?LOG_WARNING(#{what => nodedown, remote_node => Node, connected_for_milliseconds => NodeUpTime}), + {NodeUpTime, State2} = handle_nodedown(Node, State), + ?LOG_WARNING(#{ + what => nodedown, remote_node => Node, connected_millisecond_duration => NodeUpTime + }), %% Do another check to update unavailable_nodes list self() ! check, {noreply, State2}; @@ -465,19 +472,41 @@ has_join_result_for(Node, Table, #{results := Results}) -> handle_system_info(State) -> State#{verify_ready => verify_ready(State)}. +handle_nodedown(Node, State) -> + State2 = remember_nodedown_timestamp(Node, State), + remove_nodeup_timestamp(Node, State2). + +handle_nodeup(Node, State) -> + State2 = remember_nodeup_timestamp(Node, State), + {get_downtime(Node, State2), State2}. + remember_nodeup_timestamp(Node, State = #{nodeup_timestamps := Map}) -> Time = erlang:system_time(millisecond), Map2 = Map#{Node => Time}, State#{nodeup_timestamps := Map2}. +remember_nodedown_timestamp(Node, State = #{nodedown_timestamps := Map}) -> + Time = erlang:system_time(millisecond), + Map2 = Map#{Node => Time}, + State#{nodedown_timestamps := Map2}. + remove_nodeup_timestamp(Node, State = #{nodeup_timestamps := Map}) -> StartTime = maps:get(Node, Map, unknown), NodeUpTime = calculate_uptime(StartTime), Map2 = maps:remove(Node, State), - {NodeUpTime, Map2}. + {NodeUpTime, State#{nodeup_timestamps := Map2}}. calculate_uptime(unknown) -> unknown; calculate_uptime(StartTime) -> Time = erlang:system_time(millisecond), Time - StartTime. + +get_downtime(Node, #{nodedown_timestamps := Map}) -> + case maps:get(Node, Map, unknown) of + unknown -> + unknown; + WentDown -> + Time = erlang:system_time(millisecond), + Time - WentDown + end. From bca320619a432004918f2239f85229917c3af72b Mon Sep 17 00:00:00 2001 From: Mikhail Uvarov Date: Tue, 14 Nov 2023 12:02:38 +0100 Subject: [PATCH 07/41] Use undefined instead of unknown for timestaps in logs --- src/cets_discovery.erl | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/cets_discovery.erl b/src/cets_discovery.erl index eb725fa3..929a7f0d 100644 --- a/src/cets_discovery.erl +++ b/src/cets_discovery.erl @@ -491,21 +491,21 @@ remember_nodedown_timestamp(Node, State = #{nodedown_timestamps := Map}) -> State#{nodedown_timestamps := Map2}. remove_nodeup_timestamp(Node, State = #{nodeup_timestamps := Map}) -> - StartTime = maps:get(Node, Map, unknown), + StartTime = maps:get(Node, Map, undefined), NodeUpTime = calculate_uptime(StartTime), Map2 = maps:remove(Node, State), {NodeUpTime, State#{nodeup_timestamps := Map2}}. -calculate_uptime(unknown) -> - unknown; +calculate_uptime(undefined) -> + undefined; calculate_uptime(StartTime) -> Time = erlang:system_time(millisecond), Time - StartTime. get_downtime(Node, #{nodedown_timestamps := Map}) -> - case maps:get(Node, Map, unknown) of - unknown -> - unknown; + case maps:get(Node, Map, undefined) of + undefined -> + undefined; WentDown -> Time = erlang:system_time(millisecond), Time - WentDown From df696522361832a8329e56c38e6a180ea932d590 Mon Sep 17 00:00:00 2001 From: Mikhail Uvarov Date: Tue, 14 Nov 2023 12:18:07 +0100 Subject: [PATCH 08/41] Skip undefined timestamps in logs --- src/cets_discovery.erl | 21 +++++++++++++++------ 1 file changed, 15 insertions(+), 6 deletions(-) diff --git a/src/cets_discovery.erl b/src/cets_discovery.erl index 929a7f0d..f7a62c1e 100644 --- a/src/cets_discovery.erl +++ b/src/cets_discovery.erl @@ -226,16 +226,20 @@ handle_info({handle_check_result, Result, BackendState}, State) -> {noreply, handle_get_nodes_result(Result, BackendState, State)}; handle_info({nodeup, Node}, State) -> {NodeDownTime, State2} = handle_nodeup(Node, State), - ?LOG_WARNING(#{ - what => nodeup, remote_node => Node, downtime_millisecond_duration => NodeDownTime - }), + ?LOG_WARNING( + set_defined(downtime_millisecond_duration, NodeDownTime, #{ + what => nodeup, remote_node => Node + }) + ), State3 = remove_node_from_unavailable_list(Node, State2), {noreply, try_joining(State3)}; handle_info({nodedown, Node}, State) -> {NodeUpTime, State2} = handle_nodedown(Node, State), - ?LOG_WARNING(#{ - what => nodedown, remote_node => Node, connected_millisecond_duration => NodeUpTime - }), + ?LOG_WARNING( + set_defined(connected_millisecond_duration, NodeUpTime, #{ + what => nodedown, remote_node => Node + }) + ), %% Do another check to update unavailable_nodes list self() ! check, {noreply, State2}; @@ -510,3 +514,8 @@ get_downtime(Node, #{nodedown_timestamps := Map}) -> Time = erlang:system_time(millisecond), Time - WentDown end. + +set_defined(_Key, undefined, Map) -> + Map; +set_defined(Key, Value, Map) -> + Map#{Key => Value}. From 82b170b6d671070d8d92c02afa69fbc51c814f09 Mon Sep 17 00:00:00 2001 From: Mikhail Uvarov Date: Tue, 14 Nov 2023 12:42:35 +0100 Subject: [PATCH 09/41] Report alive nodes count on nodeup/nodedown --- src/cets_discovery.erl | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/cets_discovery.erl b/src/cets_discovery.erl index f7a62c1e..41c12312 100644 --- a/src/cets_discovery.erl +++ b/src/cets_discovery.erl @@ -228,7 +228,7 @@ handle_info({nodeup, Node}, State) -> {NodeDownTime, State2} = handle_nodeup(Node, State), ?LOG_WARNING( set_defined(downtime_millisecond_duration, NodeDownTime, #{ - what => nodeup, remote_node => Node + what => nodeup, remote_node => Node, alive_nodes => length(nodes()) + 1 }) ), State3 = remove_node_from_unavailable_list(Node, State2), @@ -237,7 +237,7 @@ handle_info({nodedown, Node}, State) -> {NodeUpTime, State2} = handle_nodedown(Node, State), ?LOG_WARNING( set_defined(connected_millisecond_duration, NodeUpTime, #{ - what => nodedown, remote_node => Node + what => nodedown, remote_node => Node, alive_nodes => length(nodes()) + 1 }) ), %% Do another check to update unavailable_nodes list From 9686b0d26b73d57417201dd2cf4fe9d377766463 Mon Sep 17 00:00:00 2001 From: Mikhail Uvarov Date: Tue, 14 Nov 2023 13:03:02 +0100 Subject: [PATCH 10/41] Report time_since_startup_in_milliseconds for nodeup/nodedown events --- src/cets_discovery.erl | 31 +++++++++++++++++++++++-------- 1 file changed, 23 insertions(+), 8 deletions(-) diff --git a/src/cets_discovery.erl b/src/cets_discovery.erl index 41c12312..c255e66e 100644 --- a/src/cets_discovery.erl +++ b/src/cets_discovery.erl @@ -92,7 +92,8 @@ timer_ref := reference() | undefined, pending_wait_for_ready := [gen_server:from()], nodeup_timestamps := #{node() => milliseconds()}, - nodedown_timestamps := #{node() => milliseconds()} + nodedown_timestamps := #{node() => milliseconds()}, + start_time := milliseconds() }. -type milliseconds() :: integer(). @@ -154,6 +155,7 @@ wait_for_ready(Server, Timeout) -> -spec init(term()) -> {ok, state()}. init(Opts) -> + StartTime = erlang:system_time(millisecond), %% Sends nodeup / nodedown ok = net_kernel:monitor_nodes(true), Mod = maps:get(backend_module, Opts, cets_discovery_file), @@ -179,7 +181,8 @@ init(Opts) -> timer_ref => undefined, pending_wait_for_ready => [], nodeup_timestamps => #{}, - nodedown_timestamps => #{} + nodedown_timestamps => #{}, + start_time => StartTime }, %% Set initial timestamps because we would not receive nodeup events for %% already connected nodes @@ -228,7 +231,12 @@ handle_info({nodeup, Node}, State) -> {NodeDownTime, State2} = handle_nodeup(Node, State), ?LOG_WARNING( set_defined(downtime_millisecond_duration, NodeDownTime, #{ - what => nodeup, remote_node => Node, alive_nodes => length(nodes()) + 1 + what => nodeup, + remote_node => Node, + alive_nodes => length(nodes()) + 1, + %% We report that time so we could work on minimizing that time. + %% It says how long it took to discover nodes after startup. + time_since_startup_in_milliseconds => time_since_startup_in_milliseconds(State) }) ), State3 = remove_node_from_unavailable_list(Node, State2), @@ -237,7 +245,10 @@ handle_info({nodedown, Node}, State) -> {NodeUpTime, State2} = handle_nodedown(Node, State), ?LOG_WARNING( set_defined(connected_millisecond_duration, NodeUpTime, #{ - what => nodedown, remote_node => Node, alive_nodes => length(nodes()) + 1 + what => nodedown, + remote_node => Node, + alive_nodes => length(nodes()) + 1, + time_since_startup_in_milliseconds => time_since_startup_in_milliseconds(State) }) ), %% Do another check to update unavailable_nodes list @@ -503,19 +514,23 @@ remove_nodeup_timestamp(Node, State = #{nodeup_timestamps := Map}) -> calculate_uptime(undefined) -> undefined; calculate_uptime(StartTime) -> - Time = erlang:system_time(millisecond), - Time - StartTime. + time_since(StartTime). get_downtime(Node, #{nodedown_timestamps := Map}) -> case maps:get(Node, Map, undefined) of undefined -> undefined; WentDown -> - Time = erlang:system_time(millisecond), - Time - WentDown + time_since(WentDown) end. set_defined(_Key, undefined, Map) -> Map; set_defined(Key, Value, Map) -> Map#{Key => Value}. + +time_since_startup_in_milliseconds(#{start_time := StartTime}) -> + time_since(StartTime). + +time_since(StartTime) -> + erlang:system_time(millisecond) - StartTime. From a02e3284ef752104bb1b480208bb842e9447ae4a Mon Sep 17 00:00:00 2001 From: Mikhail Uvarov Date: Thu, 16 Nov 2023 11:17:55 +0100 Subject: [PATCH 11/41] Use epmd instead of resolver to test if node is pingable --- src/cets_ping.erl | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/src/cets_ping.erl b/src/cets_ping.erl index 6fb86608..300e4416 100644 --- a/src/cets_ping.erl +++ b/src/cets_ping.erl @@ -15,8 +15,11 @@ ping(Node) when is_atom(Node) -> pong; false -> case dist_util:split_node(Node) of - {node, _Name, Host} -> - case {inet:getaddr(Host, inet6), inet:getaddr(Host, inet)} of + {node, Name, Host} -> + Epmd = net_kernel:epmd_module(), + V4 = Epmd:address_please(Name, Host, inet), + V6 = Epmd:address_please(Name, Host, inet6), + case {V4, V6} of {{error, _}, {error, _}} -> pang; _ -> From 966bf81b339538d86127f82b7dc68266cd92aad2 Mon Sep 17 00:00:00 2001 From: Mikhail Uvarov Date: Thu, 16 Nov 2023 13:11:32 +0100 Subject: [PATCH 12/41] Do not report long task failing with reason stop --- src/cets_long.erl | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/cets_long.erl b/src/cets_long.erl index dce536fb..bf2ccc63 100644 --- a/src/cets_long.erl +++ b/src/cets_long.erl @@ -91,6 +91,9 @@ run_monitor(Info, Ref, Parent, Start) -> monitor_loop(Mon, Info, Parent, Start, Interval) -> receive + {'DOWN', _MonRef, process, _Pid, stop} -> + %% Special case, the long task is stopped using exit(Pid, stop) + ok; {'DOWN', MonRef, process, _Pid, Reason} when Mon =:= MonRef -> ?LOG_ERROR(Info#{ what => task_failed, From 22afd47ff5c4a4539bddec87a4b2c28f65727db0 Mon Sep 17 00:00:00 2001 From: Mikhail Uvarov Date: Thu, 16 Nov 2023 15:55:29 +0100 Subject: [PATCH 13/41] Log warning if same node reconnects --- src/cets_discovery.erl | 30 ++++++++++++++++++++++++++++++ 1 file changed, 30 insertions(+) diff --git a/src/cets_discovery.erl b/src/cets_discovery.erl index c255e66e..8014b178 100644 --- a/src/cets_discovery.erl +++ b/src/cets_discovery.erl @@ -93,6 +93,7 @@ pending_wait_for_ready := [gen_server:from()], nodeup_timestamps := #{node() => milliseconds()}, nodedown_timestamps := #{node() => milliseconds()}, + node_start_timestamps := #{node() => milliseconds()}, start_time := milliseconds() }. -type milliseconds() :: integer(). @@ -181,6 +182,7 @@ init(Opts) -> timer_ref => undefined, pending_wait_for_ready => [], nodeup_timestamps => #{}, + node_start_timestamps => #{}, nodedown_timestamps => #{}, start_time => StartTime }, @@ -251,9 +253,12 @@ handle_info({nodedown, Node}, State) -> time_since_startup_in_milliseconds => time_since_startup_in_milliseconds(State) }) ), + send_start_time_to(Node, State), %% Do another check to update unavailable_nodes list self() ! check, {noreply, State2}; +handle_info({start_time, Node, StartTime}, State) -> + {noreply, handle_receive_start_time(Node, StartTime, State)}; handle_info({joining_finished, Results}, State) -> {noreply, handle_joining_finished(Results, State)}; handle_info({ping_result, Node, Result}, State) -> @@ -534,3 +539,28 @@ time_since_startup_in_milliseconds(#{start_time := StartTime}) -> time_since(StartTime) -> erlang:system_time(millisecond) - StartTime. + +send_start_time_to(Node, #{start_time := StartTime}) -> + case erlang:process_info(self(), registered_name) of + {registered_name, Name} -> + erlang:send({Name, Node}, {start_time, node(), StartTime}); + _ -> + ok + end. + +handle_receive_start_time(Node, StartTime, State = #{node_start_timestamps := Map}) -> + case maps:get(Node, Map, undefined) of + undefined -> + ok; + StartTime -> + ?LOG_WARNING(#{ + what => node_reconnects, + remote_node => Node, + start_time => StartTime, + text => <<"Netsplit recovery. The remote node has been connected to us before.">> + }); + _ -> + %% Restarted node reconnected, this is fine during the rolling updates + ok + end, + State#{node_start_timestamps := maps:put(Node, StartTime, Map)}. From ea5d8489217828a189e24e3feebf4d44c252be0f Mon Sep 17 00:00:00 2001 From: Mikhail Uvarov Date: Fri, 17 Nov 2023 16:56:39 +0100 Subject: [PATCH 14/41] Add node_down_history --- src/cets.erl | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/src/cets.erl b/src/cets.erl index 54340953..cd6260f2 100644 --- a/src/cets.erl +++ b/src/cets.erl @@ -130,7 +130,8 @@ is_leader := boolean(), opts := start_opts(), backlog := [backlog_entry()], - pause_monitors := [pause_monitor()] + pause_monitors := [pause_monitor()], + node_down_history := [node()] }. -type long_msg() :: @@ -154,7 +155,8 @@ memory := non_neg_integer(), ack_pid := ack_pid(), join_ref := join_ref(), - opts := start_opts() + opts := start_opts(), + node_down_history := [node()] }. -type handle_down_fun() :: fun((#{remote_pid := server_pid(), table := table_name()}) -> ok). @@ -417,7 +419,8 @@ init({Tab, Opts}) -> is_leader => true, opts => Opts, backlog => [], - pause_monitors => [] + pause_monitors => [], + node_down_history => [] }}. -spec handle_call(long_msg() | {op, op()}, from(), state()) -> @@ -524,7 +527,7 @@ handle_down2(RemotePid, State = #{other_servers := Servers, ack_pid := AckPid}) cets_ack:send_remote_down(AckPid, RemotePid), call_user_handle_down(RemotePid, State), Servers2 = lists:delete(RemotePid, Servers), - set_other_servers(Servers2, State); + update_node_down_history(RemotePid, set_other_servers(Servers2, State)); false -> %% This should not happen ?LOG_ERROR(#{ @@ -535,6 +538,9 @@ handle_down2(RemotePid, State = #{other_servers := Servers, ack_pid := AckPid}) State end. +update_node_down_history(RemotePid, State = #{node_down_history := History}) -> + State#{node_down_history := [node(RemotePid) | History]}. + %% Merge two lists of pids, create the missing monitors. -spec add_servers(Servers, Servers) -> Servers when Servers :: servers(). add_servers(Pids, Servers) -> @@ -742,6 +748,7 @@ handle_get_info( other_servers := Servers, ack_pid := AckPid, join_ref := JoinRef, + node_down_history := DownHistory, opts := Opts } ) -> @@ -752,6 +759,7 @@ handle_get_info( memory => ets:info(Tab, memory), ack_pid => AckPid, join_ref => JoinRef, + node_down_history => DownHistory, opts => Opts }. From 8308ef6c50869595803fc138b0af29dce291d353 Mon Sep 17 00:00:00 2001 From: Mikhail Uvarov Date: Fri, 17 Nov 2023 21:10:07 +0100 Subject: [PATCH 15/41] Sort available nodes in the status API --- src/cets_status.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/cets_status.erl b/src/cets_status.erl index 36b29e14..6b8a7865 100644 --- a/src/cets_status.erl +++ b/src/cets_status.erl @@ -64,7 +64,7 @@ gather_data(Disco) -> ThisNode = node(), Info = cets_discovery:system_info(Disco), #{tables := Tables} = Info, - OnlineNodes = [ThisNode | nodes()], + OnlineNodes = lists:sort([ThisNode | nodes()]), AvailNodes = available_nodes(Disco, OnlineNodes), Expected = get_local_table_to_other_nodes_map(Tables), OtherTabNodes = get_node_to_tab_nodes_map(AvailNodes, Disco), From bd2560b97b529d8c01dc78583ae91b9ef74c7a9a Mon Sep 17 00:00:00 2001 From: Mikhail Uvarov Date: Mon, 20 Nov 2023 09:42:44 +0100 Subject: [PATCH 16/41] Add +node_down_history_is_updated_when_netsplit_happens testcase --- test/cets_SUITE.erl | 23 ++++++++++++++++++++++- 1 file changed, 22 insertions(+), 1 deletion(-) diff --git a/test/cets_SUITE.erl b/test/cets_SUITE.erl index c527354f..4086d3e1 100644 --- a/test/cets_SUITE.erl +++ b/test/cets_SUITE.erl @@ -179,7 +179,8 @@ seq_cases() -> cets_seq_no_log_cases() -> [ - join_interrupted_when_ping_crashes + join_interrupted_when_ping_crashes, + node_down_history_is_updated_when_netsplit_happens ]. init_per_suite(Config) -> @@ -2207,6 +2208,26 @@ join_interrupted_when_ping_crashes(Config) -> ?assertMatch({error, {task_failed, ping_all_failed, #{}}}, Res), meck:unload(). +node_down_history_is_updated_when_netsplit_happens(Config) -> + %% node_down_history is available in cets:info/1 API. + %% It could be used for manual debugging in situations + %% we get netsplits or during rolling upgrades. + #{ct5 := Peer5} = proplists:get_value(peers, Config), + #{ct5 := Node5} = proplists:get_value(nodes, Config), + Node1 = node(), + Tab = make_name(Config), + {ok, Pid1} = start(Node1, Tab), + {ok, Pid5} = start(Peer5, Tab), + ok = cets_join:join(lock_name(Config), #{}, Pid1, Pid5), + block_node(Node5, Peer5), + try + F = fun() -> maps:get(node_down_history, cets:info(Pid1)) end, + cets_test_wait:wait_until(F, [Node5]) + after + reconnect_node(Node5, Peer5), + cets:stop(Pid5) + end. + format_data_does_not_return_table_duplicates(Config) -> Res = cets_status:format_data(test_data_for_duplicate_missing_table_in_status(Config)), ?assertMatch(#{remote_unknown_tables := [], remote_nodes_with_missing_tables := []}, Res). From 3bbb15465f2d09198109b5d9920063da62c67510 Mon Sep 17 00:00:00 2001 From: Mikhail Uvarov Date: Mon, 20 Nov 2023 10:16:03 +0100 Subject: [PATCH 17/41] Add disco_logs_nodeup --- test/cets_SUITE.erl | 30 +++++++++++++++++++++++++++++- 1 file changed, 29 insertions(+), 1 deletion(-) diff --git a/test/cets_SUITE.erl b/test/cets_SUITE.erl index 4086d3e1..2c66b86a 100644 --- a/test/cets_SUITE.erl +++ b/test/cets_SUITE.erl @@ -174,7 +174,8 @@ seq_cases() -> %% Cannot be run in parallel with other tests because checks all logging messages. logging_when_failing_join_with_disco, cets_ping_all_returns_when_ping_crashes, - join_interrupted_when_ping_crashes + join_interrupted_when_ping_crashes, + disco_logs_nodeup ]. cets_seq_no_log_cases() -> @@ -2228,6 +2229,33 @@ node_down_history_is_updated_when_netsplit_happens(Config) -> cets:stop(Pid5) end. +disco_logs_nodeup(Config) -> + logger_debug_h:start(#{id => ?FUNCTION_NAME}), + Node1 = node(), + #{ct2 := Peer2} = proplists:get_value(peers, Config), + #{ct2 := Node2} = proplists:get_value(nodes, Config), + rpc(Peer2, erlang, disconnect_node, [Node1]), + Tab = make_name(Config), + {ok, _Pid1} = start(Node1, Tab), + {ok, _Pid2} = start(Peer2, Tab), + F = fun(State) -> + {{ok, [Node1, Node2]}, State} + end, + DiscoName = disco_name(Config), + Disco = start_disco(Node1, #{ + name => DiscoName, backend_module => cets_discovery_fun, get_nodes_fn => F + }), + cets_discovery:add_table(Disco, Tab), + receive + {log, ?FUNCTION_NAME, #{ + level := warning, + msg := {report, #{what := nodeup}} + }} -> + ok + after 5000 -> + ct:fail(timeout) + end. + format_data_does_not_return_table_duplicates(Config) -> Res = cets_status:format_data(test_data_for_duplicate_missing_table_in_status(Config)), ?assertMatch(#{remote_unknown_tables := [], remote_nodes_with_missing_tables := []}, Res). From f7d74a0d7287332d05e55cf209295d6882f49111 Mon Sep 17 00:00:00 2001 From: Mikhail Uvarov Date: Mon, 20 Nov 2023 10:19:55 +0100 Subject: [PATCH 18/41] Add disco_logs_nodedown --- test/cets_SUITE.erl | 32 +++++++++++++++++++++++++++++++- 1 file changed, 31 insertions(+), 1 deletion(-) diff --git a/test/cets_SUITE.erl b/test/cets_SUITE.erl index 2c66b86a..461491cb 100644 --- a/test/cets_SUITE.erl +++ b/test/cets_SUITE.erl @@ -175,7 +175,8 @@ seq_cases() -> logging_when_failing_join_with_disco, cets_ping_all_returns_when_ping_crashes, join_interrupted_when_ping_crashes, - disco_logs_nodeup + disco_logs_nodeup, + disco_logs_nodedown ]. cets_seq_no_log_cases() -> @@ -2256,6 +2257,35 @@ disco_logs_nodeup(Config) -> ct:fail(timeout) end. +disco_logs_nodedown(Config) -> + logger_debug_h:start(#{id => ?FUNCTION_NAME}), + Node1 = node(), + #{ct2 := Peer2} = proplists:get_value(peers, Config), + #{ct2 := Node2} = proplists:get_value(nodes, Config), + rpc(Peer2, erlang, disconnect_node, [Node1]), + Tab = make_name(Config), + {ok, _Pid1} = start(Node1, Tab), + {ok, _Pid2} = start(Peer2, Tab), + F = fun(State) -> + {{ok, [Node1, Node2]}, State} + end, + DiscoName = disco_name(Config), + Disco = start_disco(Node1, #{ + name => DiscoName, backend_module => cets_discovery_fun, get_nodes_fn => F + }), + cets_discovery:add_table(Disco, Tab), + ok = cets_discovery:wait_for_ready(Disco, 5000), + rpc(Peer2, erlang, disconnect_node, [Node1]), + receive + {log, ?FUNCTION_NAME, #{ + level := warning, + msg := {report, #{what := nodedown}} + }} -> + ok + after 5000 -> + ct:fail(timeout) + end. + format_data_does_not_return_table_duplicates(Config) -> Res = cets_status:format_data(test_data_for_duplicate_missing_table_in_status(Config)), ?assertMatch(#{remote_unknown_tables := [], remote_nodes_with_missing_tables := []}, Res). From bf0f174e7b4d7deaa234ad1863154b0113bc242a Mon Sep 17 00:00:00 2001 From: Mikhail Uvarov Date: Mon, 20 Nov 2023 10:36:23 +0100 Subject: [PATCH 19/41] Add disco_logs_nodeup_after_downtime testcase --- test/cets_SUITE.erl | 54 +++++++++++++++++++++++++++++++++++++++------ 1 file changed, 47 insertions(+), 7 deletions(-) diff --git a/test/cets_SUITE.erl b/test/cets_SUITE.erl index 461491cb..1dc0a93e 100644 --- a/test/cets_SUITE.erl +++ b/test/cets_SUITE.erl @@ -176,7 +176,8 @@ seq_cases() -> cets_ping_all_returns_when_ping_crashes, join_interrupted_when_ping_crashes, disco_logs_nodeup, - disco_logs_nodedown + disco_logs_nodedown, + disco_logs_nodeup_after_downtime ]. cets_seq_no_log_cases() -> @@ -2250,9 +2251,10 @@ disco_logs_nodeup(Config) -> receive {log, ?FUNCTION_NAME, #{ level := warning, - msg := {report, #{what := nodeup}} - }} -> - ok + msg := {report, #{what := nodeup, remote_node := Node2} = R} + }} = M -> + ?assert(is_integer(maps:get(alive_nodes, R)), M), + ?assert(is_integer(maps:get(time_since_startup_in_milliseconds, R)), M) after 5000 -> ct:fail(timeout) end. @@ -2279,9 +2281,47 @@ disco_logs_nodedown(Config) -> receive {log, ?FUNCTION_NAME, #{ level := warning, - msg := {report, #{what := nodedown}} - }} -> - ok + msg := {report, #{what := nodedown, remote_node := Node2} = R} + }} = M -> + ?assert(is_integer(maps:get(alive_nodes, R)), M), + ?assert(is_integer(maps:get(time_since_startup_in_milliseconds, R)), M), + ?assert(is_integer(maps:get(connected_millisecond_duration, R)), M) + after 5000 -> + ct:fail(timeout) + end. + +disco_logs_nodeup_after_downtime(Config) -> + logger_debug_h:start(#{id => ?FUNCTION_NAME}), + Node1 = node(), + #{ct2 := Peer2} = proplists:get_value(peers, Config), + #{ct2 := Node2} = proplists:get_value(nodes, Config), + rpc(Peer2, erlang, disconnect_node, [Node1]), + Tab = make_name(Config), + {ok, _Pid1} = start(Node1, Tab), + {ok, _Pid2} = start(Peer2, Tab), + F = fun(State) -> + {{ok, [Node1, Node2]}, State} + end, + DiscoName = disco_name(Config), + Disco = start_disco(Node1, #{ + name => DiscoName, backend_module => cets_discovery_fun, get_nodes_fn => F + }), + cets_discovery:add_table(Disco, Tab), + ok = cets_discovery:wait_for_ready(Disco, 5000), + rpc(Peer2, erlang, disconnect_node, [Node1]), + receive + {log, ?FUNCTION_NAME, #{ + level := warning, + msg := + {report, + #{ + what := nodeup, + remote_node := Node2, + downtime_millisecond_duration := Downtime + } = R} + }} = M -> + ?assert(is_integer(maps:get(alive_nodes, R)), M), + ?assert(is_integer(Downtime), M) after 5000 -> ct:fail(timeout) end. From 9508a8d565590822602cb96c4da9e08f4e45251c Mon Sep 17 00:00:00 2001 From: Mikhail Uvarov Date: Mon, 20 Nov 2023 11:14:16 +0100 Subject: [PATCH 20/41] Move logging into handle_nodedown/handle_nodeup --- src/cets_discovery.erl | 50 ++++++++++++++++++++++-------------------- 1 file changed, 26 insertions(+), 24 deletions(-) diff --git a/src/cets_discovery.erl b/src/cets_discovery.erl index 8014b178..f04894c1 100644 --- a/src/cets_discovery.erl +++ b/src/cets_discovery.erl @@ -188,7 +188,7 @@ init(Opts) -> }, %% Set initial timestamps because we would not receive nodeup events for %% already connected nodes - State2 = lists:foldl(fun remember_nodeup_timestamp/2, State, nodes()), + State2 = lists:foldl(fun handle_nodeup/2, State, nodes()), {ok, State2}. -spec handle_call(term(), from(), state()) -> {reply, term(), state()} | {noreply, state()}. @@ -230,30 +230,11 @@ handle_info(check, State) -> handle_info({handle_check_result, Result, BackendState}, State) -> {noreply, handle_get_nodes_result(Result, BackendState, State)}; handle_info({nodeup, Node}, State) -> - {NodeDownTime, State2} = handle_nodeup(Node, State), - ?LOG_WARNING( - set_defined(downtime_millisecond_duration, NodeDownTime, #{ - what => nodeup, - remote_node => Node, - alive_nodes => length(nodes()) + 1, - %% We report that time so we could work on minimizing that time. - %% It says how long it took to discover nodes after startup. - time_since_startup_in_milliseconds => time_since_startup_in_milliseconds(State) - }) - ), + State2 = handle_nodeup(Node, State), State3 = remove_node_from_unavailable_list(Node, State2), {noreply, try_joining(State3)}; handle_info({nodedown, Node}, State) -> - {NodeUpTime, State2} = handle_nodedown(Node, State), - ?LOG_WARNING( - set_defined(connected_millisecond_duration, NodeUpTime, #{ - what => nodedown, - remote_node => Node, - alive_nodes => length(nodes()) + 1, - time_since_startup_in_milliseconds => time_since_startup_in_milliseconds(State) - }) - ), - send_start_time_to(Node, State), + State2 = handle_nodedown(Node, State), %% Do another check to update unavailable_nodes list self() ! check, {noreply, State2}; @@ -494,11 +475,32 @@ handle_system_info(State) -> handle_nodedown(Node, State) -> State2 = remember_nodedown_timestamp(Node, State), - remove_nodeup_timestamp(Node, State2). + {NodeUpTime, State3} = remove_nodeup_timestamp(Node, State2), + ?LOG_WARNING( + set_defined(connected_millisecond_duration, NodeUpTime, #{ + what => nodedown, + remote_node => Node, + alive_nodes => length(nodes()) + 1, + time_since_startup_in_milliseconds => time_since_startup_in_milliseconds(State) + }) + ), + State3. handle_nodeup(Node, State) -> + send_start_time_to(Node, State), State2 = remember_nodeup_timestamp(Node, State), - {get_downtime(Node, State2), State2}. + NodeDownTime = get_downtime(Node, State2), + ?LOG_WARNING( + set_defined(downtime_millisecond_duration, NodeDownTime, #{ + what => nodeup, + remote_node => Node, + alive_nodes => length(nodes()) + 1, + %% We report that time so we could work on minimizing that time. + %% It says how long it took to discover nodes after startup. + time_since_startup_in_milliseconds => time_since_startup_in_milliseconds(State) + }) + ), + State2. remember_nodeup_timestamp(Node, State = #{nodeup_timestamps := Map}) -> Time = erlang:system_time(millisecond), From 03cdbb30d4924d889a7eed6057f10b1c89bba924 Mon Sep 17 00:00:00 2001 From: Mikhail Uvarov Date: Mon, 20 Nov 2023 11:14:29 +0100 Subject: [PATCH 21/41] Add disco_logs_node_reconnects_after_downtime testcase --- test/cets_SUITE.erl | 42 +++++++++++++++++++++++++++++++++++++++++- 1 file changed, 41 insertions(+), 1 deletion(-) diff --git a/test/cets_SUITE.erl b/test/cets_SUITE.erl index 1dc0a93e..9c995952 100644 --- a/test/cets_SUITE.erl +++ b/test/cets_SUITE.erl @@ -177,7 +177,8 @@ seq_cases() -> join_interrupted_when_ping_crashes, disco_logs_nodeup, disco_logs_nodedown, - disco_logs_nodeup_after_downtime + disco_logs_nodeup_after_downtime, + disco_logs_node_reconnects_after_downtime ]. cets_seq_no_log_cases() -> @@ -2326,6 +2327,45 @@ disco_logs_nodeup_after_downtime(Config) -> ct:fail(timeout) end. +disco_logs_node_reconnects_after_downtime(Config) -> + logger_debug_h:start(#{id => ?FUNCTION_NAME}), + Node1 = node(), + #{ct2 := Peer2} = proplists:get_value(peers, Config), + #{ct2 := Node2} = proplists:get_value(nodes, Config), + Tab = make_name(Config), + {ok, _Pid1} = start(Node1, Tab), + {ok, _Pid2} = start(Peer2, Tab), + F = fun(State) -> + {{ok, [Node1, Node2]}, State} + end, + DiscoName = disco_name(Config), + Disco = start_disco(Node1, #{ + name => DiscoName, backend_module => cets_discovery_fun, get_nodes_fn => F + }), + %% We have to start discovery server on both nodes for that feature to work + _Disco2 = start_disco(Node2, #{ + name => DiscoName, backend_module => cets_discovery_fun, get_nodes_fn => F + }), + cets_discovery:add_table(Disco, Tab), + ok = cets_discovery:wait_for_ready(Disco, 5000), + %% Check that a start timestamp from a remote node is stored + Info = cets_discovery:system_info(Disco), + ?assertMatch(#{node_start_timestamps := #{Node2 := _}}, Info), + rpc(Peer2, erlang, disconnect_node, [Node1]), + receive + {log, ?FUNCTION_NAME, #{ + level := warning, + msg := + {report, #{ + what := node_reconnects, + remote_node := Node2 + }} + }} -> + ok + after 5000 -> + ct:fail(timeout) + end. + format_data_does_not_return_table_duplicates(Config) -> Res = cets_status:format_data(test_data_for_duplicate_missing_table_in_status(Config)), ?assertMatch(#{remote_unknown_tables := [], remote_nodes_with_missing_tables := []}, Res). From 75d9ee88926c03dd00e1a9a4f7dae8e4b20af3ba Mon Sep 17 00:00:00 2001 From: Mikhail Uvarov Date: Mon, 20 Nov 2023 11:37:53 +0100 Subject: [PATCH 22/41] Add disco_logs_nodeup_no_log/disco_logs_nodedown_no_log testcases --- test/cets_SUITE.erl | 59 ++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 58 insertions(+), 1 deletion(-) diff --git a/test/cets_SUITE.erl b/test/cets_SUITE.erl index 9c995952..9e30468e 100644 --- a/test/cets_SUITE.erl +++ b/test/cets_SUITE.erl @@ -184,7 +184,9 @@ seq_cases() -> cets_seq_no_log_cases() -> [ join_interrupted_when_ping_crashes, - node_down_history_is_updated_when_netsplit_happens + node_down_history_is_updated_when_netsplit_happens, + disco_logs_nodeup_no_log, + disco_logs_nodedown_no_log ]. init_per_suite(Config) -> @@ -2260,6 +2262,33 @@ disco_logs_nodeup(Config) -> ct:fail(timeout) end. +%% disco_logs_nodeup, but logger is disabled (for code coverage) +disco_logs_nodeup_no_log(Config) -> + logger_debug_h:start(#{id => ?FUNCTION_NAME}), + Node1 = node(), + #{ct2 := Peer2} = proplists:get_value(peers, Config), + #{ct2 := Node2} = proplists:get_value(nodes, Config), + rpc(Peer2, erlang, disconnect_node, [Node1]), + Tab = make_name(Config), + {ok, _Pid1} = start(Node1, Tab), + {ok, _Pid2} = start(Peer2, Tab), + F = fun(State) -> + {{ok, [Node1, Node2]}, State} + end, + DiscoName = disco_name(Config), + Disco = start_disco(Node1, #{ + name => DiscoName, backend_module => cets_discovery_fun, get_nodes_fn => F + }), + cets_discovery:add_table(Disco, Tab), + %% Check that nodeup is remembered + cets_test_wait:wait_until( + fun() -> + #{nodeup_timestamps := Map} = cets_discovery:system_info(Disco), + maps:is_key(Node2, Map) + end, + true + ). + disco_logs_nodedown(Config) -> logger_debug_h:start(#{id => ?FUNCTION_NAME}), Node1 = node(), @@ -2291,6 +2320,34 @@ disco_logs_nodedown(Config) -> ct:fail(timeout) end. +disco_logs_nodedown_no_log(Config) -> + logger_debug_h:start(#{id => ?FUNCTION_NAME}), + Node1 = node(), + #{ct2 := Peer2} = proplists:get_value(peers, Config), + #{ct2 := Node2} = proplists:get_value(nodes, Config), + rpc(Peer2, erlang, disconnect_node, [Node1]), + Tab = make_name(Config), + {ok, _Pid1} = start(Node1, Tab), + {ok, _Pid2} = start(Peer2, Tab), + F = fun(State) -> + {{ok, [Node1, Node2]}, State} + end, + DiscoName = disco_name(Config), + Disco = start_disco(Node1, #{ + name => DiscoName, backend_module => cets_discovery_fun, get_nodes_fn => F + }), + cets_discovery:add_table(Disco, Tab), + ok = cets_discovery:wait_for_ready(Disco, 5000), + rpc(Peer2, erlang, disconnect_node, [Node1]), + %% Check that nodedown is remembered + cets_test_wait:wait_until( + fun() -> + #{nodedown_timestamps := Map} = cets_discovery:system_info(Disco), + maps:is_key(Node2, Map) + end, + true + ). + disco_logs_nodeup_after_downtime(Config) -> logger_debug_h:start(#{id => ?FUNCTION_NAME}), Node1 = node(), From 21b428058f577073f0a289e1ed6fd9b98abecfd4 Mon Sep 17 00:00:00 2001 From: Mikhail Uvarov Date: Mon, 20 Nov 2023 11:45:25 +0100 Subject: [PATCH 23/41] Tweak for code coverage and logging --- src/cets_discovery.erl | 35 +++++++++++++++++------------------ test/cets_SUITE.erl | 13 ++++++------- 2 files changed, 23 insertions(+), 25 deletions(-) diff --git a/src/cets_discovery.erl b/src/cets_discovery.erl index f04894c1..8574a4d9 100644 --- a/src/cets_discovery.erl +++ b/src/cets_discovery.erl @@ -476,30 +476,29 @@ handle_system_info(State) -> handle_nodedown(Node, State) -> State2 = remember_nodedown_timestamp(Node, State), {NodeUpTime, State3} = remove_nodeup_timestamp(Node, State2), - ?LOG_WARNING( - set_defined(connected_millisecond_duration, NodeUpTime, #{ - what => nodedown, - remote_node => Node, - alive_nodes => length(nodes()) + 1, - time_since_startup_in_milliseconds => time_since_startup_in_milliseconds(State) - }) - ), + %% Not inside of the macro to make code coverage happy + Log = set_defined(connected_millisecond_duration, NodeUpTime, #{ + what => nodedown, + remote_node => Node, + alive_nodes => length(nodes()) + 1, + time_since_startup_in_milliseconds => time_since_startup_in_milliseconds(State) + }), + ?LOG_WARNING(Log), State3. handle_nodeup(Node, State) -> send_start_time_to(Node, State), State2 = remember_nodeup_timestamp(Node, State), NodeDownTime = get_downtime(Node, State2), - ?LOG_WARNING( - set_defined(downtime_millisecond_duration, NodeDownTime, #{ - what => nodeup, - remote_node => Node, - alive_nodes => length(nodes()) + 1, - %% We report that time so we could work on minimizing that time. - %% It says how long it took to discover nodes after startup. - time_since_startup_in_milliseconds => time_since_startup_in_milliseconds(State) - }) - ), + Log = set_defined(downtime_millisecond_duration, NodeDownTime, #{ + what => nodeup, + remote_node => Node, + alive_nodes => length(nodes()) + 1, + %% We report that time so we could work on minimizing that time. + %% It says how long it took to discover nodes after startup. + time_since_startup_in_milliseconds => time_since_startup_in_milliseconds(State) + }), + ?LOG_WARNING(Log), State2. remember_nodeup_timestamp(Node, State = #{nodeup_timestamps := Map}) -> diff --git a/test/cets_SUITE.erl b/test/cets_SUITE.erl index 9e30468e..9f709551 100644 --- a/test/cets_SUITE.erl +++ b/test/cets_SUITE.erl @@ -178,15 +178,15 @@ seq_cases() -> disco_logs_nodeup, disco_logs_nodedown, disco_logs_nodeup_after_downtime, - disco_logs_node_reconnects_after_downtime + disco_logs_node_reconnects_after_downtime, + disco_node_up_timestamp_is_remembered, + disco_logs_nodedown_timestamp_is_remembered ]. cets_seq_no_log_cases() -> [ join_interrupted_when_ping_crashes, - node_down_history_is_updated_when_netsplit_happens, - disco_logs_nodeup_no_log, - disco_logs_nodedown_no_log + node_down_history_is_updated_when_netsplit_happens ]. init_per_suite(Config) -> @@ -2262,8 +2262,7 @@ disco_logs_nodeup(Config) -> ct:fail(timeout) end. -%% disco_logs_nodeup, but logger is disabled (for code coverage) -disco_logs_nodeup_no_log(Config) -> +disco_node_up_timestamp_is_remembered(Config) -> logger_debug_h:start(#{id => ?FUNCTION_NAME}), Node1 = node(), #{ct2 := Peer2} = proplists:get_value(peers, Config), @@ -2320,7 +2319,7 @@ disco_logs_nodedown(Config) -> ct:fail(timeout) end. -disco_logs_nodedown_no_log(Config) -> +disco_logs_nodedown_timestamp_is_remembered(Config) -> logger_debug_h:start(#{id => ?FUNCTION_NAME}), Node1 = node(), #{ct2 := Peer2} = proplists:get_value(peers, Config), From 52c04ea0db44a958db75e61b4cb6570bd14ec5ec Mon Sep 17 00:00:00 2001 From: Mikhail Uvarov Date: Mon, 20 Nov 2023 11:51:03 +0100 Subject: [PATCH 24/41] Test that start_time is set in logs --- test/cets_SUITE.erl | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/test/cets_SUITE.erl b/test/cets_SUITE.erl index 9f709551..b189e2a3 100644 --- a/test/cets_SUITE.erl +++ b/test/cets_SUITE.erl @@ -2414,10 +2414,11 @@ disco_logs_node_reconnects_after_downtime(Config) -> msg := {report, #{ what := node_reconnects, + start_time := StartTime, remote_node := Node2 }} - }} -> - ok + }} = M -> + ?assert(is_integer(StartTime), M) after 5000 -> ct:fail(timeout) end. From 5b403b713c7a478893643f29f47207c7afb61b1a Mon Sep 17 00:00:00 2001 From: Mikhail Uvarov Date: Mon, 20 Nov 2023 12:00:37 +0100 Subject: [PATCH 25/41] Add disco_nodeup_timestamp_is_updated_after_node_reconnects testcase --- test/cets_SUITE.erl | 41 ++++++++++++++++++++++++++++++++++++++--- 1 file changed, 38 insertions(+), 3 deletions(-) diff --git a/test/cets_SUITE.erl b/test/cets_SUITE.erl index b189e2a3..1008e406 100644 --- a/test/cets_SUITE.erl +++ b/test/cets_SUITE.erl @@ -180,13 +180,15 @@ seq_cases() -> disco_logs_nodeup_after_downtime, disco_logs_node_reconnects_after_downtime, disco_node_up_timestamp_is_remembered, - disco_logs_nodedown_timestamp_is_remembered + disco_logs_nodedown_timestamp_is_remembered, + disco_nodeup_timestamp_is_updated_after_node_reconnects ]. cets_seq_no_log_cases() -> [ join_interrupted_when_ping_crashes, - node_down_history_is_updated_when_netsplit_happens + node_down_history_is_updated_when_netsplit_happens, + disco_nodeup_timestamp_is_updated_after_node_reconnects ]. init_per_suite(Config) -> @@ -2263,7 +2265,6 @@ disco_logs_nodeup(Config) -> end. disco_node_up_timestamp_is_remembered(Config) -> - logger_debug_h:start(#{id => ?FUNCTION_NAME}), Node1 = node(), #{ct2 := Peer2} = proplists:get_value(peers, Config), #{ct2 := Node2} = proplists:get_value(nodes, Config), @@ -2423,6 +2424,40 @@ disco_logs_node_reconnects_after_downtime(Config) -> ct:fail(timeout) end. +disco_nodeup_timestamp_is_updated_after_node_reconnects(Config) -> + logger_debug_h:start(#{id => ?FUNCTION_NAME}), + Node1 = node(), + #{ct2 := Peer2} = proplists:get_value(peers, Config), + #{ct2 := Node2} = proplists:get_value(nodes, Config), + Tab = make_name(Config), + {ok, _Pid1} = start(Node1, Tab), + {ok, _Pid2} = start(Peer2, Tab), + F = fun(State) -> + {{ok, [Node1, Node2]}, State} + end, + DiscoName = disco_name(Config), + Disco = start_disco(Node1, #{ + name => DiscoName, backend_module => cets_discovery_fun, get_nodes_fn => F + }), + %% We have to start discovery server on both nodes for that feature to work + _Disco2 = start_disco(Node2, #{ + name => DiscoName, backend_module => cets_discovery_fun, get_nodes_fn => F + }), + cets_discovery:add_table(Disco, Tab), + ok = cets_discovery:wait_for_ready(Disco, 5000), + %% Get an old nodeup timestamp + Info1 = cets_discovery:system_info(Disco), + #{nodeup_timestamps := #{Node2 := OldTimestamp}} = Info1, + rpc(Peer2, erlang, disconnect_node, [Node1]), + cets_test_wait:wait_until( + fun() -> + Info2 = cets_discovery:system_info(Disco), + #{nodeup_timestamps := #{Node2 := NewTimestamp}} = Info2, + NewTimestamp =/= OldTimestamp + end, + true + ). + format_data_does_not_return_table_duplicates(Config) -> Res = cets_status:format_data(test_data_for_duplicate_missing_table_in_status(Config)), ?assertMatch(#{remote_unknown_tables := [], remote_nodes_with_missing_tables := []}, Res). From 1ead59afec2cabd39420326d50866216ce112b10 Mon Sep 17 00:00:00 2001 From: Mikhail Uvarov Date: Mon, 20 Nov 2023 12:26:12 +0100 Subject: [PATCH 26/41] Add disco_node_start_timestamp_is_updated_after_node_restarts testcase --- test/cets_SUITE.erl | 48 +++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 46 insertions(+), 2 deletions(-) diff --git a/test/cets_SUITE.erl b/test/cets_SUITE.erl index 1008e406..16828c9b 100644 --- a/test/cets_SUITE.erl +++ b/test/cets_SUITE.erl @@ -181,14 +181,16 @@ seq_cases() -> disco_logs_node_reconnects_after_downtime, disco_node_up_timestamp_is_remembered, disco_logs_nodedown_timestamp_is_remembered, - disco_nodeup_timestamp_is_updated_after_node_reconnects + disco_nodeup_timestamp_is_updated_after_node_reconnects, + disco_node_start_timestamp_is_updated_after_node_restarts ]. cets_seq_no_log_cases() -> [ join_interrupted_when_ping_crashes, node_down_history_is_updated_when_netsplit_happens, - disco_nodeup_timestamp_is_updated_after_node_reconnects + disco_nodeup_timestamp_is_updated_after_node_reconnects, + disco_node_start_timestamp_is_updated_after_node_restarts ]. init_per_suite(Config) -> @@ -2458,6 +2460,48 @@ disco_nodeup_timestamp_is_updated_after_node_reconnects(Config) -> true ). +disco_node_start_timestamp_is_updated_after_node_restarts(Config) -> + logger_debug_h:start(#{id => ?FUNCTION_NAME}), + Node1 = node(), + #{ct2 := Peer2} = proplists:get_value(peers, Config), + #{ct2 := Node2} = proplists:get_value(nodes, Config), + Tab = make_name(Config), + {ok, _Pid1} = start(Node1, Tab), + {ok, _Pid2} = start(Peer2, Tab), + F = fun(State) -> + {{ok, [Node1, Node2]}, State} + end, + DiscoName = disco_name(Config), + Disco = start_disco(Node1, #{ + name => DiscoName, backend_module => cets_discovery_fun, get_nodes_fn => F + }), + %% We have to start discovery server on both nodes for that feature to work + Disco2 = start_disco(Node2, #{ + name => DiscoName, backend_module => cets_discovery_fun, get_nodes_fn => F + }), + cets_discovery:add_table(Disco, Tab), + ok = cets_discovery:wait_for_ready(Disco, 5000), + %% Get an old nodeup timestamp + Info1 = cets_discovery:system_info(Disco), + #{node_start_timestamps := #{Node2 := OldTimestamp}} = Info1, + %% Instead of restart the node, restart the process. It is enough to get + %% a new start_time. + rpc(Peer2, erlang, disconnect_node, [Node1]), + cets:stop(Disco2), + %% We actually would not detect the case of us just stopping the remote disco + %% server. Because we use nodeup/nodedown to detect downs, not monitors. + _RestartedDisco2 = start_disco(Node2, #{ + name => DiscoName, backend_module => cets_discovery_fun, get_nodes_fn => F + }), + cets_test_wait:wait_until( + fun() -> + Info2 = cets_discovery:system_info(Disco), + #{node_start_timestamps := #{Node2 := NewTimestamp}} = Info2, + NewTimestamp =/= OldTimestamp + end, + true + ). + format_data_does_not_return_table_duplicates(Config) -> Res = cets_status:format_data(test_data_for_duplicate_missing_table_in_status(Config)), ?assertMatch(#{remote_unknown_tables := [], remote_nodes_with_missing_tables := []}, Res). From ef05e4fb0b582bf588674b8305426e0c2854d3f2 Mon Sep 17 00:00:00 2001 From: Mikhail Uvarov Date: Mon, 20 Nov 2023 12:40:21 +0100 Subject: [PATCH 27/41] Add cets_discovery into a list of modules to disable logging during testing There is a special test group for that. So, we test with logging enabled and disabled. --- src/cets_discovery.erl | 34 ++++++++++++++++++---------------- test/cets_SUITE.erl | 8 +++++--- 2 files changed, 23 insertions(+), 19 deletions(-) diff --git a/src/cets_discovery.erl b/src/cets_discovery.erl index 8574a4d9..bac3a7c6 100644 --- a/src/cets_discovery.erl +++ b/src/cets_discovery.erl @@ -477,28 +477,30 @@ handle_nodedown(Node, State) -> State2 = remember_nodedown_timestamp(Node, State), {NodeUpTime, State3} = remove_nodeup_timestamp(Node, State2), %% Not inside of the macro to make code coverage happy - Log = set_defined(connected_millisecond_duration, NodeUpTime, #{ - what => nodedown, - remote_node => Node, - alive_nodes => length(nodes()) + 1, - time_since_startup_in_milliseconds => time_since_startup_in_milliseconds(State) - }), - ?LOG_WARNING(Log), + ?LOG_WARNING( + set_defined(connected_millisecond_duration, NodeUpTime, #{ + what => nodedown, + remote_node => Node, + alive_nodes => length(nodes()) + 1, + time_since_startup_in_milliseconds => time_since_startup_in_milliseconds(State) + }) + ), State3. handle_nodeup(Node, State) -> send_start_time_to(Node, State), State2 = remember_nodeup_timestamp(Node, State), NodeDownTime = get_downtime(Node, State2), - Log = set_defined(downtime_millisecond_duration, NodeDownTime, #{ - what => nodeup, - remote_node => Node, - alive_nodes => length(nodes()) + 1, - %% We report that time so we could work on minimizing that time. - %% It says how long it took to discover nodes after startup. - time_since_startup_in_milliseconds => time_since_startup_in_milliseconds(State) - }), - ?LOG_WARNING(Log), + ?LOG_WARNING( + set_defined(downtime_millisecond_duration, NodeDownTime, #{ + what => nodeup, + remote_node => Node, + alive_nodes => length(nodes()) + 1, + %% We report that time so we could work on minimizing that time. + %% It says how long it took to discover nodes after startup. + time_since_startup_in_milliseconds => time_since_startup_in_milliseconds(State) + }) + ), State2. remember_nodeup_timestamp(Node, State = #{nodeup_timestamps := Map}) -> diff --git a/test/cets_SUITE.erl b/test/cets_SUITE.erl index 16828c9b..37ea4bd2 100644 --- a/test/cets_SUITE.erl +++ b/test/cets_SUITE.erl @@ -180,7 +180,7 @@ seq_cases() -> disco_logs_nodeup_after_downtime, disco_logs_node_reconnects_after_downtime, disco_node_up_timestamp_is_remembered, - disco_logs_nodedown_timestamp_is_remembered, + disco_node_down_timestamp_is_remembered, disco_nodeup_timestamp_is_updated_after_node_reconnects, disco_node_start_timestamp_is_updated_after_node_restarts ]. @@ -189,6 +189,8 @@ cets_seq_no_log_cases() -> [ join_interrupted_when_ping_crashes, node_down_history_is_updated_when_netsplit_happens, + disco_node_up_timestamp_is_remembered, + disco_node_down_timestamp_is_remembered, disco_nodeup_timestamp_is_updated_after_node_reconnects, disco_node_start_timestamp_is_updated_after_node_restarts ]. @@ -231,7 +233,7 @@ end_per_testcase(_, _Config) -> %% Modules that use a multiline LOG_ macro log_modules() -> - [cets, cets_call, cets_long, cets_join]. + [cets, cets_call, cets_long, cets_join, cets_discovery]. inserted_records_could_be_read_back(Config) -> Tab = make_name(Config), @@ -2322,7 +2324,7 @@ disco_logs_nodedown(Config) -> ct:fail(timeout) end. -disco_logs_nodedown_timestamp_is_remembered(Config) -> +disco_node_down_timestamp_is_remembered(Config) -> logger_debug_h:start(#{id => ?FUNCTION_NAME}), Node1 = node(), #{ct2 := Peer2} = proplists:get_value(peers, Config), From bb1e849a408bc5408c7fb49e68d7e1b225e59432 Mon Sep 17 00:00:00 2001 From: Mikhail Uvarov Date: Mon, 20 Nov 2023 12:56:28 +0100 Subject: [PATCH 28/41] Use proto_dist to just do one adress_please call per ping --- src/cets_ping.erl | 22 ++++++++++++++++++---- 1 file changed, 18 insertions(+), 4 deletions(-) diff --git a/src/cets_ping.erl b/src/cets_ping.erl index 300e4416..3d2c3c9c 100644 --- a/src/cets_ping.erl +++ b/src/cets_ping.erl @@ -17,10 +17,8 @@ ping(Node) when is_atom(Node) -> case dist_util:split_node(Node) of {node, Name, Host} -> Epmd = net_kernel:epmd_module(), - V4 = Epmd:address_please(Name, Host, inet), - V6 = Epmd:address_please(Name, Host, inet6), - case {V4, V6} of - {{error, _}, {error, _}} -> + case Epmd:address_please(Name, Host, net_family()) of + {error, _} -> pang; _ -> connect_ping(Node) @@ -30,6 +28,22 @@ ping(Node) when is_atom(Node) -> end end. +%% The user should use proto_dist flag to enable inet6. +-spec net_family() -> inet | inet6. +net_family() -> + net_family(init:get_argument(proto_dist)). + +net_family({ok, [[ProtoDist]]}) -> + %% Check that the string contains "6". i.e. inet6, inet6_tls. + case lists:member($6, ProtoDist) of + true -> + inet6; + false -> + inet + end; +net_family(_) -> + inet. + connect_ping(Node) -> %% We could use net_adm:ping/1 but it does: %% - disconnect node on pang - we don't want that From b1dd16f0b81e5707806e20fed298ebeaa9f39192 Mon Sep 17 00:00:00 2001 From: Mikhail Uvarov Date: Mon, 20 Nov 2023 13:04:32 +0100 Subject: [PATCH 29/41] Add cets_ping_non_existing_node testcase --- test/cets_SUITE.erl | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/test/cets_SUITE.erl b/test/cets_SUITE.erl index 37ea4bd2..1a3d9303 100644 --- a/test/cets_SUITE.erl +++ b/test/cets_SUITE.erl @@ -150,7 +150,8 @@ cases() -> send_leader_op_throws_noproc, pinfo_returns_value, pinfo_returns_undefined, - format_data_does_not_return_table_duplicates + format_data_does_not_return_table_duplicates, + cets_ping_non_existing_node ]. only_for_logger_cases() -> @@ -2508,6 +2509,9 @@ format_data_does_not_return_table_duplicates(Config) -> Res = cets_status:format_data(test_data_for_duplicate_missing_table_in_status(Config)), ?assertMatch(#{remote_unknown_tables := [], remote_nodes_with_missing_tables := []}, Res). +cets_ping_non_existing_node(_Config) -> + pang = cets_ping:ping('mongooseim@non_existing_host'). + %% Helper functions receive_all_logs(Id) -> From 39dfdb9a34bd0751551f4d885c51b2ea1793db28 Mon Sep 17 00:00:00 2001 From: Mikhail Uvarov Date: Mon, 20 Nov 2023 13:13:45 +0100 Subject: [PATCH 30/41] Add tests for net_family code in cets_ping --- src/cets_ping.erl | 5 +++++ test/cets_SUITE.erl | 9 ++++++++- 2 files changed, 13 insertions(+), 1 deletion(-) diff --git a/src/cets_ping.erl b/src/cets_ping.erl index 3d2c3c9c..4c23df9d 100644 --- a/src/cets_ping.erl +++ b/src/cets_ping.erl @@ -1,5 +1,10 @@ -module(cets_ping). -export([ping/1, ping_pairs/1]). + +-ifdef(TEST). +-export([net_family/1]). +-endif. + ping(Node) when is_atom(Node) -> %% It is important to understand, that initial setup for dist connections %% is done by the single net_kernel process. diff --git a/test/cets_SUITE.erl b/test/cets_SUITE.erl index 1a3d9303..0050aaa4 100644 --- a/test/cets_SUITE.erl +++ b/test/cets_SUITE.erl @@ -151,7 +151,8 @@ cases() -> pinfo_returns_value, pinfo_returns_undefined, format_data_does_not_return_table_duplicates, - cets_ping_non_existing_node + cets_ping_non_existing_node, + cets_ping_net_family ]. only_for_logger_cases() -> @@ -2512,6 +2513,12 @@ format_data_does_not_return_table_duplicates(Config) -> cets_ping_non_existing_node(_Config) -> pang = cets_ping:ping('mongooseim@non_existing_host'). +cets_ping_net_family(_Config) -> + inet = cets_ping:net_family(error), + inet = cets_ping:net_family({ok, [["inet"]]}), + inet6 = cets_ping:net_family({ok, [["inet6"]]}), + inet6 = cets_ping:net_family({ok, [["inet6_tls"]]}). + %% Helper functions receive_all_logs(Id) -> From e637138213f63e97c13340b646397b136a06c61d Mon Sep 17 00:00:00 2001 From: Mikhail Uvarov Date: Mon, 20 Nov 2023 13:28:14 +0100 Subject: [PATCH 31/41] Add stop_reason_is_not_logged_in_tracked testcase --- test/cets_SUITE.erl | 41 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 41 insertions(+) diff --git a/test/cets_SUITE.erl b/test/cets_SUITE.erl index 0050aaa4..062747b1 100644 --- a/test/cets_SUITE.erl +++ b/test/cets_SUITE.erl @@ -162,6 +162,8 @@ only_for_logger_cases() -> logs_are_printed_when_join_fails_because_servers_overlap, join_done_already_while_waiting_for_lock_so_do_nothing, atom_error_is_logged_in_tracked, + stop_reason_is_not_logged_in_tracked, + other_reason_is_logged_in_tracked, nested_calls_errors_are_logged_once_with_tuple_reason, nested_calls_errors_are_logged_once_with_map_reason ]. @@ -1024,6 +1026,45 @@ atom_error_is_logged_in_tracked(_Config) -> ] = receive_all_logs_with_log_ref(?FUNCTION_NAME, LogRef). +stop_reason_is_not_logged_in_tracked(_Config) -> + logger_debug_h:start(#{id => ?FUNCTION_NAME}), + Me = self(), + LogRef = make_ref(), + F = fun() -> + Me ! ready, + timer:sleep(infinity) + end, + Pid = spawn(fun() -> cets_long:run_tracked(#{log_ref => LogRef}, F) end), + receive_message(ready), + exit(Pid, stop), + wait_for_down(Pid), + [] = receive_all_logs_with_log_ref(?FUNCTION_NAME, LogRef). + +%% Counter example for stop_reason_is_not_logged_in_tracked +other_reason_is_logged_in_tracked(_Config) -> + logger_debug_h:start(#{id => ?FUNCTION_NAME}), + Me = self(), + LogRef = make_ref(), + F = fun() -> + Me ! ready, + timer:sleep(infinity) + end, + Pid = spawn(fun() -> cets_long:run_tracked(#{log_ref => LogRef}, F) end), + receive_message(ready), + exit(Pid, bad_stuff_happened), + wait_for_down(Pid), + [ + #{ + level := error, + msg := + {report, #{ + what := task_failed, + log_ref := LogRef, + reason := bad_stuff_happened + }} + } + ] = receive_all_logs_with_log_ref(?FUNCTION_NAME, LogRef). + nested_calls_errors_are_logged_once_with_tuple_reason(_Config) -> logger_debug_h:start(#{id => ?FUNCTION_NAME}), LogRef = make_ref(), From 87adc9fd2739a397785ae4b2ccacc8ab58046bb0 Mon Sep 17 00:00:00 2001 From: Mikhail Uvarov Date: Mon, 20 Nov 2023 13:36:06 +0100 Subject: [PATCH 32/41] Fail if cannot split nodename in cets_ping --- src/cets_ping.erl | 16 ++++++---------- 1 file changed, 6 insertions(+), 10 deletions(-) diff --git a/src/cets_ping.erl b/src/cets_ping.erl index 4c23df9d..7588aa61 100644 --- a/src/cets_ping.erl +++ b/src/cets_ping.erl @@ -19,17 +19,13 @@ ping(Node) when is_atom(Node) -> true -> pong; false -> - case dist_util:split_node(Node) of - {node, Name, Host} -> - Epmd = net_kernel:epmd_module(), - case Epmd:address_please(Name, Host, net_family()) of - {error, _} -> - pang; - _ -> - connect_ping(Node) - end; + {node, Name, Host} = dist_util:split_node(Node), + Epmd = net_kernel:epmd_module(), + case Epmd:address_please(Name, Host, net_family()) of + {error, _} -> + pang; _ -> - pang + connect_ping(Node) end end. From 7732bb84c89ecd04e1301b19478b3ec73f3b39b7 Mon Sep 17 00:00:00 2001 From: Mikhail Uvarov Date: Tue, 21 Nov 2023 00:37:04 +0100 Subject: [PATCH 33/41] Fix flaking tests --- src/cets_discovery.erl | 7 ++++++- test/cets_SUITE.erl | 36 ++++++++++++++++++++++++++++++------ 2 files changed, 36 insertions(+), 7 deletions(-) diff --git a/src/cets_discovery.erl b/src/cets_discovery.erl index bac3a7c6..44ab3e9e 100644 --- a/src/cets_discovery.erl +++ b/src/cets_discovery.erl @@ -473,6 +473,7 @@ has_join_result_for(Node, Table, #{results := Results}) -> handle_system_info(State) -> State#{verify_ready => verify_ready(State)}. +-spec handle_nodedown(node(), state()) -> state(). handle_nodedown(Node, State) -> State2 = remember_nodedown_timestamp(Node, State), {NodeUpTime, State3} = remove_nodeup_timestamp(Node, State2), @@ -487,6 +488,7 @@ handle_nodedown(Node, State) -> ), State3. +-spec handle_nodeup(node(), state()) -> state(). handle_nodeup(Node, State) -> send_start_time_to(Node, State), State2 = remember_nodeup_timestamp(Node, State), @@ -503,20 +505,23 @@ handle_nodeup(Node, State) -> ), State2. +-spec remember_nodeup_timestamp(node(), state()) -> state(). remember_nodeup_timestamp(Node, State = #{nodeup_timestamps := Map}) -> Time = erlang:system_time(millisecond), Map2 = Map#{Node => Time}, State#{nodeup_timestamps := Map2}. +-spec remember_nodedown_timestamp(node(), state()) -> state(). remember_nodedown_timestamp(Node, State = #{nodedown_timestamps := Map}) -> Time = erlang:system_time(millisecond), Map2 = Map#{Node => Time}, State#{nodedown_timestamps := Map2}. +-spec remove_nodeup_timestamp(node(), state()) -> {integer(), state()}. remove_nodeup_timestamp(Node, State = #{nodeup_timestamps := Map}) -> StartTime = maps:get(Node, Map, undefined), NodeUpTime = calculate_uptime(StartTime), - Map2 = maps:remove(Node, State), + Map2 = maps:remove(Node, Map), {NodeUpTime, State#{nodeup_timestamps := Map2}}. calculate_uptime(undefined) -> diff --git a/test/cets_SUITE.erl b/test/cets_SUITE.erl index 062747b1..f7c0914a 100644 --- a/test/cets_SUITE.erl +++ b/test/cets_SUITE.erl @@ -2111,17 +2111,15 @@ disco_connects_to_unconnected_node(Config) -> Node1 = node(), #{ct5 := Peer5} = proplists:get_value(peers, Config), #{ct5 := Node5} = proplists:get_value(nodes, Config), + ok = net_kernel:monitor_nodes(true), rpc(Peer5, erlang, disconnect_node, [Node1]), + receive_message({nodedown, Node5}), Tab = make_name(Config), {ok, _} = start(Node1, Tab), {ok, _} = start(Peer5, Tab), F = fun(State) -> {{ok, [Node1, Node5]}, State} end, - cets_test_wait:wait_until( - fun() -> lists:member(Node5, nodes()) end, - false - ), {ok, Disco} = cets_discovery:start(#{backend_module => cets_discovery_fun, get_nodes_fn => F}), cets_discovery:add_table(Disco, Tab), ok = cets_discovery:wait_for_ready(Disco, 5000). @@ -2232,7 +2230,7 @@ logging_when_failing_join_with_disco(Config) -> Reason =:= Reason2 ], %% Only one message is logged - [_] = MatchedLogs + ?assertMatch([_], MatchedLogs, Logs) after meck:unload(), reconnect_node(Node2, Peer2), @@ -2300,9 +2298,12 @@ disco_logs_nodeup(Config) -> name => DiscoName, backend_module => cets_discovery_fun, get_nodes_fn => F }), cets_discovery:add_table(Disco, Tab), + %% There could be several disco processes still running from the previous tests, + %% filter out logs by pid. receive {log, ?FUNCTION_NAME, #{ level := warning, + meta := #{pid := Disco}, msg := {report, #{what := nodeup, remote_node := Node2} = R} }} = M -> ?assert(is_integer(maps:get(alive_nodes, R)), M), @@ -2338,10 +2339,12 @@ disco_node_up_timestamp_is_remembered(Config) -> disco_logs_nodedown(Config) -> logger_debug_h:start(#{id => ?FUNCTION_NAME}), + ok = net_kernel:monitor_nodes(true), Node1 = node(), #{ct2 := Peer2} = proplists:get_value(peers, Config), #{ct2 := Node2} = proplists:get_value(nodes, Config), rpc(Peer2, erlang, disconnect_node, [Node1]), + receive_message({nodedown, Node2}), Tab = make_name(Config), {ok, _Pid1} = start(Node1, Tab), {ok, _Pid2} = start(Peer2, Tab), @@ -2354,10 +2357,13 @@ disco_logs_nodedown(Config) -> }), cets_discovery:add_table(Disco, Tab), ok = cets_discovery:wait_for_ready(Disco, 5000), + receive_message({nodeup, Node2}), rpc(Peer2, erlang, disconnect_node, [Node1]), + receive_message({nodedown, Node2}), receive {log, ?FUNCTION_NAME, #{ level := warning, + meta := #{pid := Disco}, msg := {report, #{what := nodedown, remote_node := Node2} = R} }} = M -> ?assert(is_integer(maps:get(alive_nodes, R)), M), @@ -2417,6 +2423,7 @@ disco_logs_nodeup_after_downtime(Config) -> receive {log, ?FUNCTION_NAME, #{ level := warning, + meta := #{pid := Disco}, msg := {report, #{ @@ -2459,6 +2466,7 @@ disco_logs_node_reconnects_after_downtime(Config) -> receive {log, ?FUNCTION_NAME, #{ level := warning, + meta := #{pid := Disco}, msg := {report, #{ what := node_reconnects, @@ -2532,7 +2540,7 @@ disco_node_start_timestamp_is_updated_after_node_restarts(Config) -> %% Instead of restart the node, restart the process. It is enough to get %% a new start_time. rpc(Peer2, erlang, disconnect_node, [Node1]), - cets:stop(Disco2), + rpc(Peer2, cets, stop, [Disco2]), %% We actually would not detect the case of us just stopping the remote disco %% server. Because we use nodeup/nodedown to detect downs, not monitors. _RestartedDisco2 = start_disco(Node2, #{ @@ -2580,6 +2588,8 @@ start_local(Name) -> start_local(Name, #{}). start_local(Name, Opts) -> + catch cets:stop(Name), + wait_for_name_to_be_free(node(), Name), {ok, Pid} = cets:start(Name, Opts), schedule_cleanup(Pid), {ok, Pid}. @@ -2595,15 +2605,29 @@ schedule_cleanup(Pid) -> end). start(Node, Tab) -> + catch rpc(Node, cets, stop, [Tab]), + wait_for_name_to_be_free(Node, Tab), {ok, Pid} = rpc(Node, cets, start, [Tab, #{}]), schedule_cleanup(Pid), {ok, Pid}. start_disco(Node, Opts) -> + case Opts of + #{name := Name} -> + catch rpc(Node, cets, stop, [Name]), + wait_for_name_to_be_free(Node, Name); + _ -> + ok + end, {ok, Pid} = rpc(Node, cets_discovery, start, [Opts]), schedule_cleanup(Pid), Pid. +wait_for_name_to_be_free(Node, Name) -> + %% Wait for the old process to be killed by the cleaner in schedule_cleanup. + %% Cleaner is fast, but not instant. + cets_test_wait:wait_until(fun() -> rpc(Node, erlang, whereis, [Name]) end, undefined). + insert(Node, Tab, Rec) -> rpc(Node, cets, insert, [Tab, Rec]). From 1f2817d4f68a05d3bae607635eecdf8b027a18eb Mon Sep 17 00:00:00 2001 From: Mikhail Uvarov Date: Tue, 21 Nov 2023 01:45:04 +0100 Subject: [PATCH 34/41] Rename special reason from stop to shutdown --- src/cets_long.erl | 4 ++-- test/cets_SUITE.erl | 8 ++++---- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/src/cets_long.erl b/src/cets_long.erl index bf2ccc63..df49648a 100644 --- a/src/cets_long.erl +++ b/src/cets_long.erl @@ -91,8 +91,8 @@ run_monitor(Info, Ref, Parent, Start) -> monitor_loop(Mon, Info, Parent, Start, Interval) -> receive - {'DOWN', _MonRef, process, _Pid, stop} -> - %% Special case, the long task is stopped using exit(Pid, stop) + {'DOWN', _MonRef, process, _Pid, shutdown} -> + %% Special case, the long task is stopped using exit(Pid, shutdown) ok; {'DOWN', MonRef, process, _Pid, Reason} when Mon =:= MonRef -> ?LOG_ERROR(Info#{ diff --git a/test/cets_SUITE.erl b/test/cets_SUITE.erl index f7c0914a..cab29b45 100644 --- a/test/cets_SUITE.erl +++ b/test/cets_SUITE.erl @@ -162,7 +162,7 @@ only_for_logger_cases() -> logs_are_printed_when_join_fails_because_servers_overlap, join_done_already_while_waiting_for_lock_so_do_nothing, atom_error_is_logged_in_tracked, - stop_reason_is_not_logged_in_tracked, + shutdown_reason_is_not_logged_in_tracked, other_reason_is_logged_in_tracked, nested_calls_errors_are_logged_once_with_tuple_reason, nested_calls_errors_are_logged_once_with_map_reason @@ -1026,7 +1026,7 @@ atom_error_is_logged_in_tracked(_Config) -> ] = receive_all_logs_with_log_ref(?FUNCTION_NAME, LogRef). -stop_reason_is_not_logged_in_tracked(_Config) -> +shutdown_reason_is_not_logged_in_tracked(_Config) -> logger_debug_h:start(#{id => ?FUNCTION_NAME}), Me = self(), LogRef = make_ref(), @@ -1036,11 +1036,11 @@ stop_reason_is_not_logged_in_tracked(_Config) -> end, Pid = spawn(fun() -> cets_long:run_tracked(#{log_ref => LogRef}, F) end), receive_message(ready), - exit(Pid, stop), + exit(Pid, shutdown), wait_for_down(Pid), [] = receive_all_logs_with_log_ref(?FUNCTION_NAME, LogRef). -%% Counter example for stop_reason_is_not_logged_in_tracked +%% Counter example for shutdown_reason_is_not_logged_in_tracked other_reason_is_logged_in_tracked(_Config) -> logger_debug_h:start(#{id => ?FUNCTION_NAME}), Me = self(), From 1e65ec1ffc0b47c66c6df72db275c474364b05d5 Mon Sep 17 00:00:00 2001 From: Mikhail Uvarov Date: Wed, 22 Nov 2023 13:56:11 +0100 Subject: [PATCH 35/41] Add unexpected_nodedown_is_ignored_by_disco testcase --- test/cets_SUITE.erl | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/test/cets_SUITE.erl b/test/cets_SUITE.erl index cab29b45..9237945d 100644 --- a/test/cets_SUITE.erl +++ b/test/cets_SUITE.erl @@ -152,7 +152,8 @@ cases() -> pinfo_returns_undefined, format_data_does_not_return_table_duplicates, cets_ping_non_existing_node, - cets_ping_net_family + cets_ping_net_family, + unexpected_nodedown_is_ignored_by_disco ]. only_for_logger_cases() -> @@ -2568,6 +2569,20 @@ cets_ping_net_family(_Config) -> inet6 = cets_ping:net_family({ok, [["inet6"]]}), inet6 = cets_ping:net_family({ok, [["inet6_tls"]]}). +unexpected_nodedown_is_ignored_by_disco(Config) -> + %% Theoretically, should not happen + %% Still, check that we do not crash in this case + DiscoName = disco_name(Config), + F = fun(State) -> {{ok, []}, State} end, + Disco = start_disco(node(), #{ + name => DiscoName, backend_module => cets_discovery_fun, get_nodes_fn => F + }), + #{start_time := StartTime} = cets_discovery:system_info(Disco), + Disco ! {nodedown, 'cets@badnode'}, + %% Check that we are still running + #{start_time := StartTime} = cets_discovery:system_info(Disco), + ok. + %% Helper functions receive_all_logs(Id) -> From d04673f3b39f40dcbd4dceef05b102ffef3465af Mon Sep 17 00:00:00 2001 From: Mikhail Uvarov Date: Wed, 22 Nov 2023 14:04:16 +0100 Subject: [PATCH 36/41] Add ping_pairs tests --- test/cets_SUITE.erl | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/test/cets_SUITE.erl b/test/cets_SUITE.erl index 9237945d..7d8da081 100644 --- a/test/cets_SUITE.erl +++ b/test/cets_SUITE.erl @@ -187,7 +187,9 @@ seq_cases() -> disco_node_up_timestamp_is_remembered, disco_node_down_timestamp_is_remembered, disco_nodeup_timestamp_is_updated_after_node_reconnects, - disco_node_start_timestamp_is_updated_after_node_restarts + disco_node_start_timestamp_is_updated_after_node_restarts, + ping_pairs_returns_pongs, + ping_pairs_returns_earlier ]. cets_seq_no_log_cases() -> @@ -2583,6 +2585,19 @@ unexpected_nodedown_is_ignored_by_disco(Config) -> #{start_time := StartTime} = cets_discovery:system_info(Disco), ok. +ping_pairs_returns_pongs(Config) -> + #{ct2 := Node2, ct3 := Node3} = proplists:get_value(nodes, Config), + Me = node(), + [{Me, Node2, pong}, {Node2, Node3, pong}] = + cets_ping:ping_pairs([{Me, Node2}, {Node2, Node3}]). + +ping_pairs_returns_earlier(Config) -> + #{ct2 := Node2, ct3 := Node3} = proplists:get_value(nodes, Config), + Me = node(), + Bad = 'badnode@localhost', + [{Me, Me, pong}, {Me, Node2, pong}, {Me, Bad, pang}, {Me, Node3, skipped}] = + cets_ping:ping_pairs([{Me, Me}, {Me, Node2}, {Me, Bad}, {Me, Node3}]). + %% Helper functions receive_all_logs(Id) -> From 293ba6b6bab78f9419756472b00eb2023bdc844a Mon Sep 17 00:00:00 2001 From: Mikhail Uvarov Date: Fri, 24 Nov 2023 11:19:30 +0100 Subject: [PATCH 37/41] Simplify ipv6 matching in net_family --- src/cets_ping.erl | 10 ++-------- 1 file changed, 2 insertions(+), 8 deletions(-) diff --git a/src/cets_ping.erl b/src/cets_ping.erl index 7588aa61..0b793e14 100644 --- a/src/cets_ping.erl +++ b/src/cets_ping.erl @@ -34,14 +34,8 @@ ping(Node) when is_atom(Node) -> net_family() -> net_family(init:get_argument(proto_dist)). -net_family({ok, [[ProtoDist]]}) -> - %% Check that the string contains "6". i.e. inet6, inet6_tls. - case lists:member($6, ProtoDist) of - true -> - inet6; - false -> - inet - end; +net_family({ok, [["inet6" ++ _]]}) -> + inet6; net_family(_) -> inet. From 9e3e78900bc9f98ad7f91b8bcfc2e045928dc695 Mon Sep 17 00:00:00 2001 From: Mikhail Uvarov Date: Fri, 24 Nov 2023 11:21:06 +0100 Subject: [PATCH 38/41] Fix typos in cets_ping --- src/cets_ping.erl | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/src/cets_ping.erl b/src/cets_ping.erl index 0b793e14..ca70d055 100644 --- a/src/cets_ping.erl +++ b/src/cets_ping.erl @@ -40,12 +40,11 @@ net_family(_) -> inet. connect_ping(Node) -> - %% We could use net_adm:ping/1 but it does: - %% - disconnect node on pang - we don't want that - %% (because it could disconnect already connected node because of race conditions) - %% - it calls net_kernel's gen_server of the remote server, - %% but it could be busy doing something, - %% which means slower response time. + %% We could use net_adm:ping/1 but it: + %% - disconnects node on pang - we don't want that + %% (because it could disconnect an already connected node because of race conditions) + %% - calls net_kernel's gen_server of the remote server, + %% but it could be busy doing something, which means slower response time. case net_kernel:connect_node(Node) of true -> pong; @@ -69,7 +68,7 @@ ping_pairs_stop_on_pang([{Node1, Node2} | Pairs]) -> Other -> %% We do not need to ping the rest of nodes - %% one node returning pang is enough to cancel join. - %% We could exit earlier and safe some time + %% We could exit earlier and save some time %% (connect_node to the dead node could be time consuming) [{Node1, Node2, Other} | fail_pairs(Pairs, skipped)] end; From 2a11c1990b63e98448299224e2bef24311ec11bb Mon Sep 17 00:00:00 2001 From: Mikhail Uvarov Date: Fri, 24 Nov 2023 13:14:45 +0100 Subject: [PATCH 39/41] Deduplicate code in tests Add setup_two_nodes_and_discovery helper function --- test/cets_SUITE.erl | 299 ++++++++++++++++---------------------------- 1 file changed, 108 insertions(+), 191 deletions(-) diff --git a/test/cets_SUITE.erl b/test/cets_SUITE.erl index 7d8da081..0a0382ab 100644 --- a/test/cets_SUITE.erl +++ b/test/cets_SUITE.erl @@ -1043,7 +1043,7 @@ shutdown_reason_is_not_logged_in_tracked(_Config) -> wait_for_down(Pid), [] = receive_all_logs_with_log_ref(?FUNCTION_NAME, LogRef). -%% Counter example for shutdown_reason_is_not_logged_in_tracked +%% Complementary to shutdown_reason_is_not_logged_in_tracked other_reason_is_logged_in_tracked(_Config) -> logger_debug_h:start(#{id => ?FUNCTION_NAME}), Me = self(), @@ -2115,7 +2115,7 @@ disco_connects_to_unconnected_node(Config) -> #{ct5 := Peer5} = proplists:get_value(peers, Config), #{ct5 := Node5} = proplists:get_value(nodes, Config), ok = net_kernel:monitor_nodes(true), - rpc(Peer5, erlang, disconnect_node, [Node1]), + disconnect_node(Peer5, Node1), receive_message({nodedown, Node5}), Tab = make_name(Config), {ok, _} = start(Node1, Tab), @@ -2286,21 +2286,7 @@ node_down_history_is_updated_when_netsplit_happens(Config) -> disco_logs_nodeup(Config) -> logger_debug_h:start(#{id => ?FUNCTION_NAME}), - Node1 = node(), - #{ct2 := Peer2} = proplists:get_value(peers, Config), - #{ct2 := Node2} = proplists:get_value(nodes, Config), - rpc(Peer2, erlang, disconnect_node, [Node1]), - Tab = make_name(Config), - {ok, _Pid1} = start(Node1, Tab), - {ok, _Pid2} = start(Peer2, Tab), - F = fun(State) -> - {{ok, [Node1, Node2]}, State} - end, - DiscoName = disco_name(Config), - Disco = start_disco(Node1, #{ - name => DiscoName, backend_module => cets_discovery_fun, get_nodes_fn => F - }), - cets_discovery:add_table(Disco, Tab), + #{disco := Disco, node2 := Node2} = setup_two_nodes_and_discovery(Config), %% There could be several disco processes still running from the previous tests, %% filter out logs by pid. receive @@ -2316,52 +2302,14 @@ disco_logs_nodeup(Config) -> end. disco_node_up_timestamp_is_remembered(Config) -> - Node1 = node(), - #{ct2 := Peer2} = proplists:get_value(peers, Config), - #{ct2 := Node2} = proplists:get_value(nodes, Config), - rpc(Peer2, erlang, disconnect_node, [Node1]), - Tab = make_name(Config), - {ok, _Pid1} = start(Node1, Tab), - {ok, _Pid2} = start(Peer2, Tab), - F = fun(State) -> - {{ok, [Node1, Node2]}, State} - end, - DiscoName = disco_name(Config), - Disco = start_disco(Node1, #{ - name => DiscoName, backend_module => cets_discovery_fun, get_nodes_fn => F - }), - cets_discovery:add_table(Disco, Tab), + #{disco := Disco, node2 := Node2} = setup_two_nodes_and_discovery(Config), %% Check that nodeup is remembered - cets_test_wait:wait_until( - fun() -> - #{nodeup_timestamps := Map} = cets_discovery:system_info(Disco), - maps:is_key(Node2, Map) - end, - true - ). + wait_for_disco_timestamp_to_appear(Disco, nodeup_timestamps, Node2). disco_logs_nodedown(Config) -> logger_debug_h:start(#{id => ?FUNCTION_NAME}), ok = net_kernel:monitor_nodes(true), - Node1 = node(), - #{ct2 := Peer2} = proplists:get_value(peers, Config), - #{ct2 := Node2} = proplists:get_value(nodes, Config), - rpc(Peer2, erlang, disconnect_node, [Node1]), - receive_message({nodedown, Node2}), - Tab = make_name(Config), - {ok, _Pid1} = start(Node1, Tab), - {ok, _Pid2} = start(Peer2, Tab), - F = fun(State) -> - {{ok, [Node1, Node2]}, State} - end, - DiscoName = disco_name(Config), - Disco = start_disco(Node1, #{ - name => DiscoName, backend_module => cets_discovery_fun, get_nodes_fn => F - }), - cets_discovery:add_table(Disco, Tab), - ok = cets_discovery:wait_for_ready(Disco, 5000), - receive_message({nodeup, Node2}), - rpc(Peer2, erlang, disconnect_node, [Node1]), + #{disco := Disco, node2 := Node2} = setup_two_nodes_and_discovery(Config, [wait, netsplit]), receive_message({nodedown, Node2}), receive {log, ?FUNCTION_NAME, #{ @@ -2377,52 +2325,17 @@ disco_logs_nodedown(Config) -> end. disco_node_down_timestamp_is_remembered(Config) -> - logger_debug_h:start(#{id => ?FUNCTION_NAME}), - Node1 = node(), - #{ct2 := Peer2} = proplists:get_value(peers, Config), - #{ct2 := Node2} = proplists:get_value(nodes, Config), - rpc(Peer2, erlang, disconnect_node, [Node1]), - Tab = make_name(Config), - {ok, _Pid1} = start(Node1, Tab), - {ok, _Pid2} = start(Peer2, Tab), - F = fun(State) -> - {{ok, [Node1, Node2]}, State} - end, - DiscoName = disco_name(Config), - Disco = start_disco(Node1, #{ - name => DiscoName, backend_module => cets_discovery_fun, get_nodes_fn => F - }), - cets_discovery:add_table(Disco, Tab), - ok = cets_discovery:wait_for_ready(Disco, 5000), - rpc(Peer2, erlang, disconnect_node, [Node1]), + #{disco := Disco, node2 := Node2} = setup_two_nodes_and_discovery(Config, [wait, netsplit]), %% Check that nodedown is remembered - cets_test_wait:wait_until( - fun() -> - #{nodedown_timestamps := Map} = cets_discovery:system_info(Disco), - maps:is_key(Node2, Map) - end, - true - ). + wait_for_disco_timestamp_to_appear(Disco, nodedown_timestamps, Node2). disco_logs_nodeup_after_downtime(Config) -> logger_debug_h:start(#{id => ?FUNCTION_NAME}), - Node1 = node(), - #{ct2 := Peer2} = proplists:get_value(peers, Config), - #{ct2 := Node2} = proplists:get_value(nodes, Config), - rpc(Peer2, erlang, disconnect_node, [Node1]), - Tab = make_name(Config), - {ok, _Pid1} = start(Node1, Tab), - {ok, _Pid2} = start(Peer2, Tab), - F = fun(State) -> - {{ok, [Node1, Node2]}, State} - end, - DiscoName = disco_name(Config), - Disco = start_disco(Node1, #{ - name => DiscoName, backend_module => cets_discovery_fun, get_nodes_fn => F - }), - cets_discovery:add_table(Disco, Tab), - ok = cets_discovery:wait_for_ready(Disco, 5000), - rpc(Peer2, erlang, disconnect_node, [Node1]), + #{disco := Disco, node2 := Node2} = setup_two_nodes_and_discovery(Config, [wait, netsplit]), + %% At this point cets_disco should reconnect nodes back automatically. + %% Receive a nodeup after the disconnect. + %% This nodeup should contain the downtime_millisecond_duration field + %% (initial nodeup should not contain this field). receive {log, ?FUNCTION_NAME, #{ level := warning, @@ -2443,29 +2356,12 @@ disco_logs_nodeup_after_downtime(Config) -> disco_logs_node_reconnects_after_downtime(Config) -> logger_debug_h:start(#{id => ?FUNCTION_NAME}), - Node1 = node(), - #{ct2 := Peer2} = proplists:get_value(peers, Config), - #{ct2 := Node2} = proplists:get_value(nodes, Config), - Tab = make_name(Config), - {ok, _Pid1} = start(Node1, Tab), - {ok, _Pid2} = start(Peer2, Tab), - F = fun(State) -> - {{ok, [Node1, Node2]}, State} - end, - DiscoName = disco_name(Config), - Disco = start_disco(Node1, #{ - name => DiscoName, backend_module => cets_discovery_fun, get_nodes_fn => F - }), - %% We have to start discovery server on both nodes for that feature to work - _Disco2 = start_disco(Node2, #{ - name => DiscoName, backend_module => cets_discovery_fun, get_nodes_fn => F - }), - cets_discovery:add_table(Disco, Tab), - ok = cets_discovery:wait_for_ready(Disco, 5000), + Setup = setup_two_nodes_and_discovery(Config, [wait, disco2]), + #{disco := Disco, node1 := Node1, node2 := Node2, peer2 := Peer2} = Setup, %% Check that a start timestamp from a remote node is stored Info = cets_discovery:system_info(Disco), ?assertMatch(#{node_start_timestamps := #{Node2 := _}}, Info), - rpc(Peer2, erlang, disconnect_node, [Node1]), + disconnect_node(Peer2, Node1), receive {log, ?FUNCTION_NAME, #{ level := warning, @@ -2484,79 +2380,19 @@ disco_logs_node_reconnects_after_downtime(Config) -> disco_nodeup_timestamp_is_updated_after_node_reconnects(Config) -> logger_debug_h:start(#{id => ?FUNCTION_NAME}), - Node1 = node(), - #{ct2 := Peer2} = proplists:get_value(peers, Config), - #{ct2 := Node2} = proplists:get_value(nodes, Config), - Tab = make_name(Config), - {ok, _Pid1} = start(Node1, Tab), - {ok, _Pid2} = start(Peer2, Tab), - F = fun(State) -> - {{ok, [Node1, Node2]}, State} - end, - DiscoName = disco_name(Config), - Disco = start_disco(Node1, #{ - name => DiscoName, backend_module => cets_discovery_fun, get_nodes_fn => F - }), - %% We have to start discovery server on both nodes for that feature to work - _Disco2 = start_disco(Node2, #{ - name => DiscoName, backend_module => cets_discovery_fun, get_nodes_fn => F - }), - cets_discovery:add_table(Disco, Tab), - ok = cets_discovery:wait_for_ready(Disco, 5000), - %% Get an old nodeup timestamp - Info1 = cets_discovery:system_info(Disco), - #{nodeup_timestamps := #{Node2 := OldTimestamp}} = Info1, - rpc(Peer2, erlang, disconnect_node, [Node1]), - cets_test_wait:wait_until( - fun() -> - Info2 = cets_discovery:system_info(Disco), - #{nodeup_timestamps := #{Node2 := NewTimestamp}} = Info2, - NewTimestamp =/= OldTimestamp - end, - true - ). + Setup = setup_two_nodes_and_discovery(Config, [wait, disco2]), + #{disco := Disco, node1 := Node1, node2 := Node2, peer2 := Peer2} = Setup, + OldTimestamp = get_disco_timestamp(Disco, nodeup_timestamps, Node2), + disconnect_node(Peer2, Node1), + wait_for_disco_timestamp_to_be_updated(Disco, nodeup_timestamps, Node2, OldTimestamp). disco_node_start_timestamp_is_updated_after_node_restarts(Config) -> logger_debug_h:start(#{id => ?FUNCTION_NAME}), - Node1 = node(), - #{ct2 := Peer2} = proplists:get_value(peers, Config), - #{ct2 := Node2} = proplists:get_value(nodes, Config), - Tab = make_name(Config), - {ok, _Pid1} = start(Node1, Tab), - {ok, _Pid2} = start(Peer2, Tab), - F = fun(State) -> - {{ok, [Node1, Node2]}, State} - end, - DiscoName = disco_name(Config), - Disco = start_disco(Node1, #{ - name => DiscoName, backend_module => cets_discovery_fun, get_nodes_fn => F - }), - %% We have to start discovery server on both nodes for that feature to work - Disco2 = start_disco(Node2, #{ - name => DiscoName, backend_module => cets_discovery_fun, get_nodes_fn => F - }), - cets_discovery:add_table(Disco, Tab), - ok = cets_discovery:wait_for_ready(Disco, 5000), - %% Get an old nodeup timestamp - Info1 = cets_discovery:system_info(Disco), - #{node_start_timestamps := #{Node2 := OldTimestamp}} = Info1, - %% Instead of restart the node, restart the process. It is enough to get - %% a new start_time. - rpc(Peer2, erlang, disconnect_node, [Node1]), - rpc(Peer2, cets, stop, [Disco2]), - %% We actually would not detect the case of us just stopping the remote disco - %% server. Because we use nodeup/nodedown to detect downs, not monitors. - _RestartedDisco2 = start_disco(Node2, #{ - name => DiscoName, backend_module => cets_discovery_fun, get_nodes_fn => F - }), - cets_test_wait:wait_until( - fun() -> - Info2 = cets_discovery:system_info(Disco), - #{node_start_timestamps := #{Node2 := NewTimestamp}} = Info2, - NewTimestamp =/= OldTimestamp - end, - true - ). + Setup = setup_two_nodes_and_discovery(Config, [wait, disco2]), + #{disco := Disco, node2 := Node2} = Setup, + OldTimestamp = get_disco_timestamp(Disco, node_start_timestamps, Node2), + simulate_disco_restart(Setup), + wait_for_disco_timestamp_to_be_updated(Disco, node_start_timestamps, Node2, OldTimestamp). format_data_does_not_return_table_duplicates(Config) -> Res = cets_status:format_data(test_data_for_duplicate_missing_table_in_status(Config)), @@ -2786,6 +2622,65 @@ given_n_servers(Config, N, Opts) -> ], #{pids => Pids, tabs => Tabs}. +setup_two_nodes_and_discovery(Config) -> + setup_two_nodes_and_discovery(Config, []). + +setup_two_nodes_and_discovery(Config, Flags) -> + Node1 = node(), + #{ct2 := Peer2} = proplists:get_value(peers, Config), + #{ct2 := Node2} = proplists:get_value(nodes, Config), + disconnect_node(Peer2, Node1), + Tab = make_name(Config), + {ok, _Pid1} = start(Node1, Tab), + {ok, _Pid2} = start(Peer2, Tab), + F = fun(State) -> + {{ok, [Node1, Node2]}, State} + end, + DiscoName = disco_name(Config), + DiscoOpts = #{ + name => DiscoName, backend_module => cets_discovery_fun, get_nodes_fn => F + }, + Disco = start_disco(Node1, DiscoOpts), + %% Start Disco on second node (it is not always needed) + Res = + case lists:member(disco2, Flags) of + true -> + Disco2 = start_disco(Node2, DiscoOpts), + #{disco2 => Disco2}; + false -> + #{} + end, + cets_discovery:add_table(Disco, Tab), + case lists:member(wait, Flags) of + true -> + ok = cets_discovery:wait_for_ready(Disco, 5000); + false -> + ok + end, + case lists:member(netsplit, Flags) of + true -> + %% Simulate a loss of connection between nodes + disconnect_node(Peer2, Node1); + false -> + ok + end, + Res#{disco_opts => DiscoOpts, disco => Disco, node1 => Node1, node2 => Node2, peer2 => Peer2}. + +simulate_disco_restart(#{ + disco_opts := DiscoOpts, + disco2 := Disco2, + node1 := Node1, + node2 := Node2, + peer2 := Peer2 +}) -> + %% Instead of restart the node, restart the process. It is enough to get + %% a new start_time. + disconnect_node(Peer2, Node1), + rpc(Peer2, cets, stop, [Disco2]), + %% We actually would not detect the case of us just stopping the remote disco + %% server. Because we use nodeup/nodedown to detect downs, not monitors. + _RestartedDisco2 = start_disco(Node2, DiscoOpts). + stopped_pid() -> %% Get a pid for a stopped process {Pid, Mon} = spawn_monitor(fun() -> ok end), @@ -2818,7 +2713,7 @@ wait_for_down(Pid) -> %% Disconnect node until manually connected block_node(Node, Peer) when is_atom(Node), is_pid(Peer) -> rpc(Peer, erlang, set_cookie, [node(), invalid_cookie]), - rpc(Peer, erlang, disconnect_node, [node()]), + disconnect_node(Peer, node()), %% Wait till node() is notified about the disconnect cets_test_wait:wait_until(fun() -> rpc(Peer, net_adm, ping, [node()]) end, pang), cets_test_wait:wait_until(fun() -> rpc(node(), net_adm, ping, [Node]) end, pang). @@ -2829,6 +2724,9 @@ reconnect_node(Node, Peer) when is_atom(Node), is_pid(Peer) -> cets_test_wait:wait_until(fun() -> rpc(Peer, net_adm, ping, [node()]) end, pong), cets_test_wait:wait_until(fun() -> rpc(node(), net_adm, ping, [Node]) end, pong). +disconnect_node(RPCNode, DisconnectNode) -> + rpc(RPCNode, erlang, disconnect_node, [DisconnectNode]). + not_leader(Leader, Other, Leader) -> Other; not_leader(Other, Leader, Leader) -> @@ -2907,3 +2805,22 @@ test_data_for_duplicate_missing_table_in_status(Config) -> return_same(X) -> X. + +wait_for_disco_timestamp_to_appear(Disco, MapName, NodeKey) -> + F = fun() -> + #{MapName := Map} = cets_discovery:system_info(Disco), + maps:is_key(NodeKey, Map) + end, + cets_test_wait:wait_until(F, true). + +wait_for_disco_timestamp_to_be_updated(Disco, MapName, NodeKey, OldTimestamp) -> + Cond = fun() -> + NewTimestamp = get_disco_timestamp(Disco, MapName, NodeKey), + NewTimestamp =/= OldTimestamp + end, + cets_test_wait:wait_until(Cond, true). + +get_disco_timestamp(Disco, MapName, NodeKey) -> + Info = cets_discovery:system_info(Disco), + #{MapName := #{NodeKey := Timestamp}} = Info, + Timestamp. From 49cf3f9652bee62e7b7410c55f0f91300e1d4830 Mon Sep 17 00:00:00 2001 From: Mikhail Uvarov Date: Fri, 24 Nov 2023 14:04:51 +0100 Subject: [PATCH 40/41] Trigger get_nodes on nodeup Add disco_nodeup_triggers_check_and_get_nodes testcase There is a very high chance that there could be something new in the DB on nodeup - so trigger the DB read. Useful if we wanna know IPs after nodeup and IPs are stored in DB --- src/cets_discovery.erl | 5 +++++ test/cets_SUITE.erl | 23 +++++++++++++++++++++++ 2 files changed, 28 insertions(+) diff --git a/src/cets_discovery.erl b/src/cets_discovery.erl index 44ab3e9e..d30c76d8 100644 --- a/src/cets_discovery.erl +++ b/src/cets_discovery.erl @@ -230,6 +230,11 @@ handle_info(check, State) -> handle_info({handle_check_result, Result, BackendState}, State) -> {noreply, handle_get_nodes_result(Result, BackendState, State)}; handle_info({nodeup, Node}, State) -> + %% nodeup triggers get_nodes call. + %% We are interested in up-to-date data + %% (in MongooseIM we want to know IPs of other nodes as soon as possible + %% after some node connects to us) + self() ! check, State2 = handle_nodeup(Node, State), State3 = remove_node_from_unavailable_list(Node, State2), {noreply, try_joining(State3)}; diff --git a/test/cets_SUITE.erl b/test/cets_SUITE.erl index 0a0382ab..88e35a07 100644 --- a/test/cets_SUITE.erl +++ b/test/cets_SUITE.erl @@ -188,6 +188,7 @@ seq_cases() -> disco_node_down_timestamp_is_remembered, disco_nodeup_timestamp_is_updated_after_node_reconnects, disco_node_start_timestamp_is_updated_after_node_restarts, + disco_nodeup_triggers_check_and_get_nodes, ping_pairs_returns_pongs, ping_pairs_returns_earlier ]. @@ -2394,6 +2395,13 @@ disco_node_start_timestamp_is_updated_after_node_restarts(Config) -> simulate_disco_restart(Setup), wait_for_disco_timestamp_to_be_updated(Disco, node_start_timestamps, Node2, OldTimestamp). +disco_nodeup_triggers_check_and_get_nodes(Config) -> + Setup = setup_two_nodes_and_discovery(Config, [wait, notify_get_nodes]), + #{disco := Disco, node2 := Node2} = Setup, + flush_message(get_nodes), + Disco ! {nodeup, Node2}, + receive_message(get_nodes). + format_data_does_not_return_table_duplicates(Config) -> Res = cets_status:format_data(test_data_for_duplicate_missing_table_in_status(Config)), ?assertMatch(#{remote_unknown_tables := [], remote_nodes_with_missing_tables := []}, Res). @@ -2556,6 +2564,14 @@ receive_message_with_arg(Tag) -> after 5000 -> error({receive_message_with_arg_timeout, Tag}) end. +flush_message(M) -> + receive + M -> + flush_message(M) + after 0 -> + ok + end. + make_name(Config) -> make_name(Config, 1). @@ -2626,6 +2642,7 @@ setup_two_nodes_and_discovery(Config) -> setup_two_nodes_and_discovery(Config, []). setup_two_nodes_and_discovery(Config, Flags) -> + Me = self(), Node1 = node(), #{ct2 := Peer2} = proplists:get_value(peers, Config), #{ct2 := Node2} = proplists:get_value(nodes, Config), @@ -2634,6 +2651,12 @@ setup_two_nodes_and_discovery(Config, Flags) -> {ok, _Pid1} = start(Node1, Tab), {ok, _Pid2} = start(Peer2, Tab), F = fun(State) -> + case lists:member(notify_get_nodes, Flags) of + true -> + Me ! get_nodes; + false -> + ok + end, {{ok, [Node1, Node2]}, State} end, DiscoName = disco_name(Config), From 4b1d86d418da8d692525f0e99e3633f623ed5335 Mon Sep 17 00:00:00 2001 From: Mikhail Uvarov Date: Fri, 24 Nov 2023 14:30:16 +0100 Subject: [PATCH 41/41] Remove outdated comment --- src/cets_discovery.erl | 1 - 1 file changed, 1 deletion(-) diff --git a/src/cets_discovery.erl b/src/cets_discovery.erl index d30c76d8..3358b31e 100644 --- a/src/cets_discovery.erl +++ b/src/cets_discovery.erl @@ -482,7 +482,6 @@ handle_system_info(State) -> handle_nodedown(Node, State) -> State2 = remember_nodedown_timestamp(Node, State), {NodeUpTime, State3} = remove_nodeup_timestamp(Node, State2), - %% Not inside of the macro to make code coverage happy ?LOG_WARNING( set_defined(connected_millisecond_duration, NodeUpTime, #{ what => nodedown,