Skip to content

Commit

Permalink
Merge pull request #42 from esl/wait-for-get-nodes
Browse files Browse the repository at this point in the history
Add cets_discovery:wait_for_get_nodes/2
  • Loading branch information
chrzaszcz committed Nov 28, 2023
2 parents 0202120 + ac1afc3 commit 4ef4a34
Show file tree
Hide file tree
Showing 2 changed files with 119 additions and 10 deletions.
42 changes: 35 additions & 7 deletions src/cets_discovery.erl
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@
get_tables/1,
info/1,
system_info/1,
wait_for_ready/2
wait_for_ready/2,
wait_for_get_nodes/2
]).
-export([
init/1,
Expand All @@ -55,6 +56,7 @@
info/1,
system_info/1,
wait_for_ready/2,
wait_for_get_nodes/2,
behaviour_info/1
]).

Expand Down Expand Up @@ -91,6 +93,7 @@
should_retry_join := boolean(),
timer_ref := reference() | undefined,
pending_wait_for_ready := [gen_server:from()],
pending_wait_for_get_nodes := [gen_server:from()],
nodeup_timestamps := #{node() => milliseconds()},
nodedown_timestamps := #{node() => milliseconds()},
node_start_timestamps := #{node() => milliseconds()},
Expand Down Expand Up @@ -146,14 +149,25 @@ info(Server) ->
system_info(Server) ->
gen_server:call(Server, system_info).

%% This calls blocks until the initial discovery is done
%% It also waits till the data is loaded from the remote nodes
%% This calls blocks until the initial discovery is done.
%% It also waits till the data is loaded from the remote nodes.
-spec wait_for_ready(server(), timeout()) -> ok.
wait_for_ready(Server, Timeout) ->
F = fun() -> gen_server:call(Server, wait_for_ready, Timeout) end,
Info = #{task => cets_wait_for_ready},
cets_long:run_tracked(Info, F).

%% Waits for the current get_nodes call to return.
%% Just returns if there is no gen_nodes call running.
%% Waits for another get_nodes, if should_retry_get_nodes flag is set.
%% It is different from wait_for_ready, because it does not wait for
%% unavailable nodes to return pang.
-spec wait_for_get_nodes(server(), timeout()) -> ok.
wait_for_get_nodes(Server, Timeout) ->
F = fun() -> gen_server:call(Server, wait_for_get_nodes, Timeout) end,
Info = #{task => cets_wait_for_get_nodes},
cets_long:run_tracked(Info, F).

-spec init(term()) -> {ok, state()}.
init(Opts) ->
StartTime = erlang:system_time(millisecond),
Expand Down Expand Up @@ -181,6 +195,7 @@ init(Opts) ->
should_retry_join => false,
timer_ref => undefined,
pending_wait_for_ready => [],
pending_wait_for_get_nodes => [],
nodeup_timestamps => #{},
node_start_timestamps => #{},
nodedown_timestamps => #{},
Expand All @@ -198,6 +213,10 @@ handle_call(system_info, _From, State) ->
{reply, handle_system_info(State), State};
handle_call(wait_for_ready, From, State = #{pending_wait_for_ready := Pending}) ->
{noreply, trigger_verify_ready(State#{pending_wait_for_ready := [From | Pending]})};
handle_call(wait_for_get_nodes, _From, State = #{get_nodes_status := not_running}) ->
{reply, ok, State};
handle_call(wait_for_get_nodes, From, State = #{pending_wait_for_get_nodes := Pending}) ->
{noreply, State#{pending_wait_for_get_nodes := [From | Pending]}};
handle_call(Msg, From, State) ->
?LOG_ERROR(#{what => unexpected_call, msg => Msg, from => From}),
{reply, {error, unexpected_call}, State}.
Expand Down Expand Up @@ -280,14 +299,23 @@ handle_check(State = #{backend_module := Mod, backend_state := BackendState}) ->
-spec handle_get_nodes_result(Result, BackendState, State) -> State when
Result :: get_nodes_result(), BackendState :: backend_state(), State :: state().
handle_get_nodes_result(Result, BackendState, State) ->
State2 = State#{
State2 = maybe_reply_to_wait_for_get_nodes(State#{
backend_state := BackendState,
get_nodes_status := not_running,
last_get_nodes_result := Result
},
}),
State3 = set_nodes(Result, State2),
schedule_check(trigger_verify_ready(State3)).

-spec maybe_reply_to_wait_for_get_nodes(state()) -> state().
maybe_reply_to_wait_for_get_nodes(
State = #{should_retry_get_nodes := false, pending_wait_for_get_nodes := Pending = [_ | _]}
) ->
[gen_server:reply(From, ok) || From <- Pending],
State#{pending_wait_for_get_nodes := []};
maybe_reply_to_wait_for_get_nodes(State) ->
State.

-spec set_nodes({error, term()} | {ok, [node()]}, state()) -> state().
set_nodes({error, _Reason}, State) ->
State;
Expand Down Expand Up @@ -486,7 +514,7 @@ handle_nodedown(Node, State) ->
set_defined(connected_millisecond_duration, NodeUpTime, #{
what => nodedown,
remote_node => Node,
alive_nodes => length(nodes()) + 1,
connected_nodes => length(nodes()) + 1,
time_since_startup_in_milliseconds => time_since_startup_in_milliseconds(State)
})
),
Expand All @@ -501,7 +529,7 @@ handle_nodeup(Node, State) ->
set_defined(downtime_millisecond_duration, NodeDownTime, #{
what => nodeup,
remote_node => Node,
alive_nodes => length(nodes()) + 1,
connected_nodes => length(nodes()) + 1,
%% We report that time so we could work on minimizing that time.
%% It says how long it took to discover nodes after startup.
time_since_startup_in_milliseconds => time_since_startup_in_milliseconds(State)
Expand Down
87 changes: 84 additions & 3 deletions test/cets_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,9 @@ cases() ->
status_remote_nodes_with_unknown_tables,
status_remote_nodes_with_missing_nodes,
status_conflict_nodes,
disco_wait_for_get_nodes_works,
disco_wait_for_get_nodes_blocks_and_returns,
disco_wait_for_get_nodes_when_get_nodes_needs_to_be_retried,
get_nodes_request,
test_locally,
handle_down_is_called,
Expand Down Expand Up @@ -1682,6 +1685,77 @@ status_conflict_nodes(Config) ->
fun() -> maps:get(conflict_tables, cets_status:status(DiscoName)) end, [Tab2]
).

disco_wait_for_get_nodes_works(_Config) ->
F = fun(State) -> {{ok, []}, State} end,
{ok, Disco} = cets_discovery:start(#{backend_module => cets_discovery_fun, get_nodes_fn => F}),
ok = cets_discovery:wait_for_get_nodes(Disco, 5000).

disco_wait_for_get_nodes_blocks_and_returns(Config) ->
Tab = make_name(Config, 1),
{ok, _Pid} = start_local(Tab, #{}),
SignallingPid = make_signalling_process(),
F = fun(State) ->
wait_for_down(SignallingPid),
{{ok, []}, State}
end,
{ok, Disco} = cets_discovery:start(#{backend_module => cets_discovery_fun, get_nodes_fn => F}),
cets_discovery:add_table(Disco, Tab),
%% Enter into a blocking get_nodes function
Disco ! check,
%% Do it async, because it would block is
WaitPid = spawn_link(fun() -> ok = cets_discovery:wait_for_get_nodes(Disco, 5000) end),
Cond = fun() ->
length(maps:get(pending_wait_for_get_nodes, cets_discovery:system_info(Disco)))
end,
cets_test_wait:wait_until(Cond, 1),
%% Unblock get_nodes call
SignallingPid ! stop,
%% wait_for_get_nodes returns
wait_for_down(WaitPid),
ok.

%% Check that wait_for_get_nodes waits in case get_nodes should be retried
disco_wait_for_get_nodes_when_get_nodes_needs_to_be_retried(Config) ->
Me = self(),
Tab = make_name(Config, 1),
{ok, _Pid} = start_local(Tab, #{}),
SignallingPid1 = make_signalling_process(),
SignallingPid2 = make_signalling_process(),
F = fun
(State = #{step := 1}) ->
wait_for_down(SignallingPid1),
{{ok, []}, State#{step => 2}};
(State = #{step := 2}) ->
Me ! entered_get_nodes2,
wait_for_down(SignallingPid2),
{{ok, []}, State#{step => 2}}
end,
{ok, Disco} = cets_discovery:start(#{
backend_module => cets_discovery_fun, get_nodes_fn => F, step => 1
}),
cets_discovery:add_table(Disco, Tab),
%% Enter into a blocking get_nodes function
Disco ! check,
%% Do it async, because it would block is
WaitPid = spawn_link(fun() -> ok = cets_discovery:wait_for_get_nodes(Disco, 5000) end),
Cond = fun() ->
length(maps:get(pending_wait_for_get_nodes, cets_discovery:system_info(Disco)))
end,
cets_test_wait:wait_until(Cond, 1),
%% Set should_retry_get_nodes
Disco ! check,
%% Ensure check message is received
cets_discovery:system_info(Disco),
%% Unblock first get_nodes call
SignallingPid1 ! stop,
receive_message(entered_get_nodes2),
%% Still waiting for get_nodes being retried
true = erlang:is_process_alive(WaitPid),
%% It returns finally after second get_nodes call
SignallingPid2 ! stop,
wait_for_down(WaitPid),
ok.

get_nodes_request(Config) ->
#{ct2 := Node2, ct3 := Node3, ct4 := Node4} = proplists:get_value(nodes, Config),
Tab = make_name(Config),
Expand Down Expand Up @@ -2296,7 +2370,7 @@ disco_logs_nodeup(Config) ->
meta := #{pid := Disco},
msg := {report, #{what := nodeup, remote_node := Node2} = R}
}} = M ->
?assert(is_integer(maps:get(alive_nodes, R)), M),
?assert(is_integer(maps:get(connected_nodes, R)), M),
?assert(is_integer(maps:get(time_since_startup_in_milliseconds, R)), M)
after 5000 ->
ct:fail(timeout)
Expand All @@ -2318,7 +2392,7 @@ disco_logs_nodedown(Config) ->
meta := #{pid := Disco},
msg := {report, #{what := nodedown, remote_node := Node2} = R}
}} = M ->
?assert(is_integer(maps:get(alive_nodes, R)), M),
?assert(is_integer(maps:get(connected_nodes, R)), M),
?assert(is_integer(maps:get(time_since_startup_in_milliseconds, R)), M),
?assert(is_integer(maps:get(connected_millisecond_duration, R)), M)
after 5000 ->
Expand Down Expand Up @@ -2349,7 +2423,7 @@ disco_logs_nodeup_after_downtime(Config) ->
downtime_millisecond_duration := Downtime
} = R}
}} = M ->
?assert(is_integer(maps:get(alive_nodes, R)), M),
?assert(is_integer(maps:get(connected_nodes, R)), M),
?assert(is_integer(Downtime), M)
after 5000 ->
ct:fail(timeout)
Expand Down Expand Up @@ -2847,3 +2921,10 @@ get_disco_timestamp(Disco, MapName, NodeKey) ->
Info = cets_discovery:system_info(Disco),
#{MapName := #{NodeKey := Timestamp}} = Info,
Timestamp.

make_signalling_process() ->
spawn_link(fun() ->
receive
stop -> ok
end
end).

0 comments on commit 4ef4a34

Please sign in to comment.