Skip to content

Commit

Permalink
Merge pull request #41 from esl/rewrite-ping
Browse files Browse the repository at this point in the history
Rewrite ping
  • Loading branch information
chrzaszcz committed Nov 24, 2023
2 parents 2ca3105 + 4b1d86d commit 0202120
Show file tree
Hide file tree
Showing 7 changed files with 587 additions and 40 deletions.
16 changes: 12 additions & 4 deletions src/cets.erl
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,8 @@
is_leader := boolean(),
opts := start_opts(),
backlog := [backlog_entry()],
pause_monitors := [pause_monitor()]
pause_monitors := [pause_monitor()],
node_down_history := [node()]
}.

-type long_msg() ::
Expand All @@ -154,7 +155,8 @@
memory := non_neg_integer(),
ack_pid := ack_pid(),
join_ref := join_ref(),
opts := start_opts()
opts := start_opts(),
node_down_history := [node()]
}.

-type handle_down_fun() :: fun((#{remote_pid := server_pid(), table := table_name()}) -> ok).
Expand Down Expand Up @@ -417,7 +419,8 @@ init({Tab, Opts}) ->
is_leader => true,
opts => Opts,
backlog => [],
pause_monitors => []
pause_monitors => [],
node_down_history => []
}}.

-spec handle_call(long_msg() | {op, op()}, from(), state()) ->
Expand Down Expand Up @@ -524,7 +527,7 @@ handle_down2(RemotePid, State = #{other_servers := Servers, ack_pid := AckPid})
cets_ack:send_remote_down(AckPid, RemotePid),
call_user_handle_down(RemotePid, State),
Servers2 = lists:delete(RemotePid, Servers),
set_other_servers(Servers2, State);
update_node_down_history(RemotePid, set_other_servers(Servers2, State));
false ->
%% This should not happen
?LOG_ERROR(#{
Expand All @@ -535,6 +538,9 @@ handle_down2(RemotePid, State = #{other_servers := Servers, ack_pid := AckPid})
State
end.

update_node_down_history(RemotePid, State = #{node_down_history := History}) ->
State#{node_down_history := [node(RemotePid) | History]}.

%% Merge two lists of pids, create the missing monitors.
-spec add_servers(Servers, Servers) -> Servers when Servers :: servers().
add_servers(Pids, Servers) ->
Expand Down Expand Up @@ -742,6 +748,7 @@ handle_get_info(
other_servers := Servers,
ack_pid := AckPid,
join_ref := JoinRef,
node_down_history := DownHistory,
opts := Opts
}
) ->
Expand All @@ -752,6 +759,7 @@ handle_get_info(
memory => ets:info(Tab, memory),
ack_pid => AckPid,
join_ref => JoinRef,
node_down_history => DownHistory,
opts => Opts
}.

Expand Down
140 changes: 131 additions & 9 deletions src/cets_discovery.erl
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,13 @@
join_status := not_running | running,
should_retry_join := boolean(),
timer_ref := reference() | undefined,
pending_wait_for_ready := [gen_server:from()]
pending_wait_for_ready := [gen_server:from()],
nodeup_timestamps := #{node() => milliseconds()},
nodedown_timestamps := #{node() => milliseconds()},
node_start_timestamps := #{node() => milliseconds()},
start_time := milliseconds()
}.
-type milliseconds() :: integer().

%% Backend could define its own options
-type opts() :: #{name := atom(), _ := _}.
Expand Down Expand Up @@ -151,6 +156,7 @@ wait_for_ready(Server, Timeout) ->

-spec init(term()) -> {ok, state()}.
init(Opts) ->
StartTime = erlang:system_time(millisecond),
%% Sends nodeup / nodedown
ok = net_kernel:monitor_nodes(true),
Mod = maps:get(backend_module, Opts, cets_discovery_file),
Expand All @@ -159,7 +165,7 @@ init(Opts) ->
BackendState = Mod:init(Opts),
%% Changes phase from initial to regular (affects the check interval)
erlang:send_after(timer:minutes(5), self(), enter_regular_phase),
{ok, #{
State = #{
phase => initial,
results => [],
nodes => [],
Expand All @@ -174,8 +180,16 @@ init(Opts) ->
join_status => not_running,
should_retry_join => false,
timer_ref => undefined,
pending_wait_for_ready => []
}}.
pending_wait_for_ready => [],
nodeup_timestamps => #{},
node_start_timestamps => #{},
nodedown_timestamps => #{},
start_time => StartTime
},
%% Set initial timestamps because we would not receive nodeup events for
%% already connected nodes
State2 = lists:foldl(fun handle_nodeup/2, State, nodes()),
{ok, State2}.

-spec handle_call(term(), from(), state()) -> {reply, term(), state()} | {noreply, state()}.
handle_call(get_tables, _From, State = #{tables := Tables}) ->
Expand Down Expand Up @@ -216,12 +230,21 @@ handle_info(check, State) ->
handle_info({handle_check_result, Result, BackendState}, State) ->
{noreply, handle_get_nodes_result(Result, BackendState, State)};
handle_info({nodeup, Node}, State) ->
State2 = remove_node_from_unavailable_list(Node, State),
{noreply, try_joining(State2)};
handle_info({nodedown, _Node}, State) ->
%% nodeup triggers get_nodes call.
%% We are interested in up-to-date data
%% (in MongooseIM we want to know IPs of other nodes as soon as possible
%% after some node connects to us)
self() ! check,
State2 = handle_nodeup(Node, State),
State3 = remove_node_from_unavailable_list(Node, State2),
{noreply, try_joining(State3)};
handle_info({nodedown, Node}, State) ->
State2 = handle_nodedown(Node, State),
%% Do another check to update unavailable_nodes list
self() ! check,
{noreply, State};
{noreply, State2};
handle_info({start_time, Node, StartTime}, State) ->
{noreply, handle_receive_start_time(Node, StartTime, State)};
handle_info({joining_finished, Results}, State) ->
{noreply, handle_joining_finished(Results, State)};
handle_info({ping_result, Node, Result}, State) ->
Expand Down Expand Up @@ -315,7 +338,7 @@ ping_not_connected_nodes(Nodes) ->
Self = self(),
NotConNodes = Nodes -- [node() | nodes()],
[
spawn(fun() -> Self ! {ping_result, Node, net_adm:ping(Node)} end)
spawn(fun() -> Self ! {ping_result, Node, cets_ping:ping(Node)} end)
|| Node <- lists:sort(NotConNodes)
],
ok.
Expand Down Expand Up @@ -454,3 +477,102 @@ has_join_result_for(Node, Table, #{results := Results}) ->
-spec handle_system_info(state()) -> system_info().
handle_system_info(State) ->
State#{verify_ready => verify_ready(State)}.

-spec handle_nodedown(node(), state()) -> state().
handle_nodedown(Node, State) ->
State2 = remember_nodedown_timestamp(Node, State),
{NodeUpTime, State3} = remove_nodeup_timestamp(Node, State2),
?LOG_WARNING(
set_defined(connected_millisecond_duration, NodeUpTime, #{
what => nodedown,
remote_node => Node,
alive_nodes => length(nodes()) + 1,
time_since_startup_in_milliseconds => time_since_startup_in_milliseconds(State)
})
),
State3.

-spec handle_nodeup(node(), state()) -> state().
handle_nodeup(Node, State) ->
send_start_time_to(Node, State),
State2 = remember_nodeup_timestamp(Node, State),
NodeDownTime = get_downtime(Node, State2),
?LOG_WARNING(
set_defined(downtime_millisecond_duration, NodeDownTime, #{
what => nodeup,
remote_node => Node,
alive_nodes => length(nodes()) + 1,
%% We report that time so we could work on minimizing that time.
%% It says how long it took to discover nodes after startup.
time_since_startup_in_milliseconds => time_since_startup_in_milliseconds(State)
})
),
State2.

-spec remember_nodeup_timestamp(node(), state()) -> state().
remember_nodeup_timestamp(Node, State = #{nodeup_timestamps := Map}) ->
Time = erlang:system_time(millisecond),
Map2 = Map#{Node => Time},
State#{nodeup_timestamps := Map2}.

-spec remember_nodedown_timestamp(node(), state()) -> state().
remember_nodedown_timestamp(Node, State = #{nodedown_timestamps := Map}) ->
Time = erlang:system_time(millisecond),
Map2 = Map#{Node => Time},
State#{nodedown_timestamps := Map2}.

-spec remove_nodeup_timestamp(node(), state()) -> {integer(), state()}.
remove_nodeup_timestamp(Node, State = #{nodeup_timestamps := Map}) ->
StartTime = maps:get(Node, Map, undefined),
NodeUpTime = calculate_uptime(StartTime),
Map2 = maps:remove(Node, Map),
{NodeUpTime, State#{nodeup_timestamps := Map2}}.

calculate_uptime(undefined) ->
undefined;
calculate_uptime(StartTime) ->
time_since(StartTime).

get_downtime(Node, #{nodedown_timestamps := Map}) ->
case maps:get(Node, Map, undefined) of
undefined ->
undefined;
WentDown ->
time_since(WentDown)
end.

set_defined(_Key, undefined, Map) ->
Map;
set_defined(Key, Value, Map) ->
Map#{Key => Value}.

time_since_startup_in_milliseconds(#{start_time := StartTime}) ->
time_since(StartTime).

time_since(StartTime) ->
erlang:system_time(millisecond) - StartTime.

send_start_time_to(Node, #{start_time := StartTime}) ->
case erlang:process_info(self(), registered_name) of
{registered_name, Name} ->
erlang:send({Name, Node}, {start_time, node(), StartTime});
_ ->
ok
end.

handle_receive_start_time(Node, StartTime, State = #{node_start_timestamps := Map}) ->
case maps:get(Node, Map, undefined) of
undefined ->
ok;
StartTime ->
?LOG_WARNING(#{
what => node_reconnects,
remote_node => Node,
start_time => StartTime,
text => <<"Netsplit recovery. The remote node has been connected to us before.">>
});
_ ->
%% Restarted node reconnected, this is fine during the rolling updates
ok
end,
State#{node_start_timestamps := maps:put(Node, StartTime, Map)}.
19 changes: 6 additions & 13 deletions src/cets_join.erl
Original file line number Diff line number Diff line change
Expand Up @@ -84,10 +84,12 @@ join_loop(LockKey, Info, LocalPid, RemotePid, Start, JoinOpts) ->
%% Just lock all nodes, no magic here :)
Nodes = [node() | nodes()],
Retries = 1,
%% global could abort the transaction when one of the nodes goes down.
%% It could usually abort it during startup or update.
case global:trans(LockRequest, F, Nodes, Retries) of
aborted ->
checkpoint(before_retry, JoinOpts),
?LOG_ERROR(Info#{what => join_retry, reason => lock_aborted}),
?LOG_INFO(Info#{what => join_retry, reason => lock_aborted}),
join_loop(LockKey, Info, LocalPid, RemotePid, Start, JoinOpts);
Result ->
Result
Expand Down Expand Up @@ -195,9 +197,9 @@ get_pids(Pid) ->
check_pids(Info, LocPids, RemPids, JoinOpts) ->
check_do_not_overlap(Info, LocPids, RemPids),
checkpoint(before_check_fully_connected, JoinOpts),
check_could_reach_each_other(Info, LocPids, RemPids),
check_fully_connected(Info, LocPids),
check_fully_connected(Info, RemPids),
check_could_reach_each_other(Info, LocPids, RemPids).
check_fully_connected(Info, RemPids).

-spec check_could_reach_each_other(cets_long:log_info(), cets:servers(), cets:servers()) -> ok.
check_could_reach_each_other(Info, LocPids, RemPids) ->
Expand All @@ -207,16 +209,7 @@ check_could_reach_each_other(Info, LocPids, RemPids) ->
{min(LocNode, RemNode), max(LocNode, RemNode)}
|| LocNode <- LocNodes, RemNode <- RemNodes, LocNode =/= RemNode
]),
Results =
[
{Node1, Node2,
cets_long:run_tracked(
#{task => ping_node, node1 => Node1, node2 => Node2}, fun() ->
rpc:call(Node1, net_adm, ping, [Node2], 10000)
end
)}
|| {Node1, Node2} <- Pairs
],
Results = cets_ping:ping_pairs(Pairs),
NotConnected = [X || {_Node1, _Node2, Res} = X <- Results, Res =/= pong],
case NotConnected of
[] ->
Expand Down
3 changes: 3 additions & 0 deletions src/cets_long.erl
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,9 @@ run_monitor(Info, Ref, Parent, Start) ->

monitor_loop(Mon, Info, Parent, Start, Interval) ->
receive
{'DOWN', _MonRef, process, _Pid, shutdown} ->
%% Special case, the long task is stopped using exit(Pid, shutdown)
ok;
{'DOWN', MonRef, process, _Pid, Reason} when Mon =:= MonRef ->
?LOG_ERROR(Info#{
what => task_failed,
Expand Down
79 changes: 79 additions & 0 deletions src/cets_ping.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
-module(cets_ping).
-export([ping/1, ping_pairs/1]).

-ifdef(TEST).
-export([net_family/1]).
-endif.

ping(Node) when is_atom(Node) ->
%% It is important to understand, that initial setup for dist connections
%% is done by the single net_kernel process.
%% It calls net_kernel:setup, which calls inet_tcp_dist, which calls
%% erl_epmd:address_please/3, which does a DNS request.
%% If DNS is slow - net_kernel process would become busy.
%% But if we have a lot of offline nodes in the CETS discovery table,
%% we would try to call net_kernel for each node (even if we probably would receive
%% {error, nxdomain} from erl_epmd:address_please/3).
%% So, we first do nslookup here and only after that we try to connect.
case lists:member(Node, nodes()) of
true ->
pong;
false ->
{node, Name, Host} = dist_util:split_node(Node),
Epmd = net_kernel:epmd_module(),
case Epmd:address_please(Name, Host, net_family()) of
{error, _} ->
pang;
_ ->
connect_ping(Node)
end
end.

%% The user should use proto_dist flag to enable inet6.
-spec net_family() -> inet | inet6.
net_family() ->
net_family(init:get_argument(proto_dist)).

net_family({ok, [["inet6" ++ _]]}) ->
inet6;
net_family(_) ->
inet.

connect_ping(Node) ->
%% We could use net_adm:ping/1 but it:
%% - disconnects node on pang - we don't want that
%% (because it could disconnect an already connected node because of race conditions)
%% - calls net_kernel's gen_server of the remote server,
%% but it could be busy doing something, which means slower response time.
case net_kernel:connect_node(Node) of
true ->
pong;
_ ->
pang
end.

-spec ping_pairs([{node(), node()}]) -> [{node(), node(), pong | Reason :: term()}].
ping_pairs(Pairs) ->
%% We could use rpc:multicall(Nodes, cets_ping, ping, Args).
%% But it means more chance of nodes trying to contact each other.
ping_pairs_stop_on_pang(Pairs).

ping_pairs_stop_on_pang([{Node1, Node2} | Pairs]) ->
F = fun() -> rpc:call(Node1, cets_ping, ping, [Node2], 10000) end,
Info = #{task => ping_node, node1 => Node1, node2 => Node2},
Res = cets_long:run_tracked(Info, F),
case Res of
pong ->
[{Node1, Node2, pong} | ping_pairs_stop_on_pang(Pairs)];
Other ->
%% We do not need to ping the rest of nodes -
%% one node returning pang is enough to cancel join.
%% We could exit earlier and save some time
%% (connect_node to the dead node could be time consuming)
[{Node1, Node2, Other} | fail_pairs(Pairs, skipped)]
end;
ping_pairs_stop_on_pang([]) ->
[].

fail_pairs(Pairs, Reason) ->
[{Node1, Node2, Reason} || {Node1, Node2} <- Pairs].
2 changes: 1 addition & 1 deletion src/cets_status.erl
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ gather_data(Disco) ->
ThisNode = node(),
Info = cets_discovery:system_info(Disco),
#{tables := Tables} = Info,
OnlineNodes = [ThisNode | nodes()],
OnlineNodes = lists:sort([ThisNode | nodes()]),
AvailNodes = available_nodes(Disco, OnlineNodes),
Expected = get_local_table_to_other_nodes_map(Tables),
OtherTabNodes = get_node_to_tab_nodes_map(AvailNodes, Disco),
Expand Down
Loading

0 comments on commit 0202120

Please sign in to comment.