diff --git a/src/cets_discovery.erl b/src/cets_discovery.erl index 3358b31e..f8a74ced 100644 --- a/src/cets_discovery.erl +++ b/src/cets_discovery.erl @@ -35,7 +35,8 @@ get_tables/1, info/1, system_info/1, - wait_for_ready/2 + wait_for_ready/2, + wait_for_get_nodes/2 ]). -export([ init/1, @@ -55,6 +56,7 @@ info/1, system_info/1, wait_for_ready/2, + wait_for_get_nodes/2, behaviour_info/1 ]). @@ -91,6 +93,7 @@ should_retry_join := boolean(), timer_ref := reference() | undefined, pending_wait_for_ready := [gen_server:from()], + pending_wait_for_get_nodes := [gen_server:from()], nodeup_timestamps := #{node() => milliseconds()}, nodedown_timestamps := #{node() => milliseconds()}, node_start_timestamps := #{node() => milliseconds()}, @@ -146,14 +149,25 @@ info(Server) -> system_info(Server) -> gen_server:call(Server, system_info). -%% This calls blocks until the initial discovery is done -%% It also waits till the data is loaded from the remote nodes +%% This calls blocks until the initial discovery is done. +%% It also waits till the data is loaded from the remote nodes. -spec wait_for_ready(server(), timeout()) -> ok. wait_for_ready(Server, Timeout) -> F = fun() -> gen_server:call(Server, wait_for_ready, Timeout) end, Info = #{task => cets_wait_for_ready}, cets_long:run_tracked(Info, F). +%% Waits for the current get_nodes call to return. +%% Just returns if there is no gen_nodes call running. +%% Waits for another get_nodes, if should_retry_get_nodes flag is set. +%% It is different from wait_for_ready, because it does not wait for +%% unavailable nodes to return pang. +-spec wait_for_get_nodes(server(), timeout()) -> ok. +wait_for_get_nodes(Server, Timeout) -> + F = fun() -> gen_server:call(Server, wait_for_get_nodes, Timeout) end, + Info = #{task => cets_wait_for_get_nodes}, + cets_long:run_tracked(Info, F). + -spec init(term()) -> {ok, state()}. init(Opts) -> StartTime = erlang:system_time(millisecond), @@ -181,6 +195,7 @@ init(Opts) -> should_retry_join => false, timer_ref => undefined, pending_wait_for_ready => [], + pending_wait_for_get_nodes => [], nodeup_timestamps => #{}, node_start_timestamps => #{}, nodedown_timestamps => #{}, @@ -198,6 +213,10 @@ handle_call(system_info, _From, State) -> {reply, handle_system_info(State), State}; handle_call(wait_for_ready, From, State = #{pending_wait_for_ready := Pending}) -> {noreply, trigger_verify_ready(State#{pending_wait_for_ready := [From | Pending]})}; +handle_call(wait_for_get_nodes, _From, State = #{get_nodes_status := not_running}) -> + {reply, ok, State}; +handle_call(wait_for_get_nodes, From, State = #{pending_wait_for_get_nodes := Pending}) -> + {noreply, State#{pending_wait_for_get_nodes := [From | Pending]}}; handle_call(Msg, From, State) -> ?LOG_ERROR(#{what => unexpected_call, msg => Msg, from => From}), {reply, {error, unexpected_call}, State}. @@ -280,14 +299,23 @@ handle_check(State = #{backend_module := Mod, backend_state := BackendState}) -> -spec handle_get_nodes_result(Result, BackendState, State) -> State when Result :: get_nodes_result(), BackendState :: backend_state(), State :: state(). handle_get_nodes_result(Result, BackendState, State) -> - State2 = State#{ + State2 = maybe_reply_to_wait_for_get_nodes(State#{ backend_state := BackendState, get_nodes_status := not_running, last_get_nodes_result := Result - }, + }), State3 = set_nodes(Result, State2), schedule_check(trigger_verify_ready(State3)). +-spec maybe_reply_to_wait_for_get_nodes(state()) -> state(). +maybe_reply_to_wait_for_get_nodes( + State = #{should_retry_get_nodes := false, pending_wait_for_get_nodes := Pending = [_ | _]} +) -> + [gen_server:reply(From, ok) || From <- Pending], + State#{pending_wait_for_get_nodes := []}; +maybe_reply_to_wait_for_get_nodes(State) -> + State. + -spec set_nodes({error, term()} | {ok, [node()]}, state()) -> state(). set_nodes({error, _Reason}, State) -> State; @@ -486,7 +514,7 @@ handle_nodedown(Node, State) -> set_defined(connected_millisecond_duration, NodeUpTime, #{ what => nodedown, remote_node => Node, - alive_nodes => length(nodes()) + 1, + connected_nodes => length(nodes()) + 1, time_since_startup_in_milliseconds => time_since_startup_in_milliseconds(State) }) ), @@ -501,7 +529,7 @@ handle_nodeup(Node, State) -> set_defined(downtime_millisecond_duration, NodeDownTime, #{ what => nodeup, remote_node => Node, - alive_nodes => length(nodes()) + 1, + connected_nodes => length(nodes()) + 1, %% We report that time so we could work on minimizing that time. %% It says how long it took to discover nodes after startup. time_since_startup_in_milliseconds => time_since_startup_in_milliseconds(State) diff --git a/test/cets_SUITE.erl b/test/cets_SUITE.erl index 88e35a07..a5d13cbb 100644 --- a/test/cets_SUITE.erl +++ b/test/cets_SUITE.erl @@ -109,6 +109,9 @@ cases() -> status_remote_nodes_with_unknown_tables, status_remote_nodes_with_missing_nodes, status_conflict_nodes, + disco_wait_for_get_nodes_works, + disco_wait_for_get_nodes_blocks_and_returns, + disco_wait_for_get_nodes_when_get_nodes_needs_to_be_retried, get_nodes_request, test_locally, handle_down_is_called, @@ -1682,6 +1685,77 @@ status_conflict_nodes(Config) -> fun() -> maps:get(conflict_tables, cets_status:status(DiscoName)) end, [Tab2] ). +disco_wait_for_get_nodes_works(_Config) -> + F = fun(State) -> {{ok, []}, State} end, + {ok, Disco} = cets_discovery:start(#{backend_module => cets_discovery_fun, get_nodes_fn => F}), + ok = cets_discovery:wait_for_get_nodes(Disco, 5000). + +disco_wait_for_get_nodes_blocks_and_returns(Config) -> + Tab = make_name(Config, 1), + {ok, _Pid} = start_local(Tab, #{}), + SignallingPid = make_signalling_process(), + F = fun(State) -> + wait_for_down(SignallingPid), + {{ok, []}, State} + end, + {ok, Disco} = cets_discovery:start(#{backend_module => cets_discovery_fun, get_nodes_fn => F}), + cets_discovery:add_table(Disco, Tab), + %% Enter into a blocking get_nodes function + Disco ! check, + %% Do it async, because it would block is + WaitPid = spawn_link(fun() -> ok = cets_discovery:wait_for_get_nodes(Disco, 5000) end), + Cond = fun() -> + length(maps:get(pending_wait_for_get_nodes, cets_discovery:system_info(Disco))) + end, + cets_test_wait:wait_until(Cond, 1), + %% Unblock get_nodes call + SignallingPid ! stop, + %% wait_for_get_nodes returns + wait_for_down(WaitPid), + ok. + +%% Check that wait_for_get_nodes waits in case get_nodes should be retried +disco_wait_for_get_nodes_when_get_nodes_needs_to_be_retried(Config) -> + Me = self(), + Tab = make_name(Config, 1), + {ok, _Pid} = start_local(Tab, #{}), + SignallingPid1 = make_signalling_process(), + SignallingPid2 = make_signalling_process(), + F = fun + (State = #{step := 1}) -> + wait_for_down(SignallingPid1), + {{ok, []}, State#{step => 2}}; + (State = #{step := 2}) -> + Me ! entered_get_nodes2, + wait_for_down(SignallingPid2), + {{ok, []}, State#{step => 2}} + end, + {ok, Disco} = cets_discovery:start(#{ + backend_module => cets_discovery_fun, get_nodes_fn => F, step => 1 + }), + cets_discovery:add_table(Disco, Tab), + %% Enter into a blocking get_nodes function + Disco ! check, + %% Do it async, because it would block is + WaitPid = spawn_link(fun() -> ok = cets_discovery:wait_for_get_nodes(Disco, 5000) end), + Cond = fun() -> + length(maps:get(pending_wait_for_get_nodes, cets_discovery:system_info(Disco))) + end, + cets_test_wait:wait_until(Cond, 1), + %% Set should_retry_get_nodes + Disco ! check, + %% Ensure check message is received + cets_discovery:system_info(Disco), + %% Unblock first get_nodes call + SignallingPid1 ! stop, + receive_message(entered_get_nodes2), + %% Still waiting for get_nodes being retried + true = erlang:is_process_alive(WaitPid), + %% It returns finally after second get_nodes call + SignallingPid2 ! stop, + wait_for_down(WaitPid), + ok. + get_nodes_request(Config) -> #{ct2 := Node2, ct3 := Node3, ct4 := Node4} = proplists:get_value(nodes, Config), Tab = make_name(Config), @@ -2296,7 +2370,7 @@ disco_logs_nodeup(Config) -> meta := #{pid := Disco}, msg := {report, #{what := nodeup, remote_node := Node2} = R} }} = M -> - ?assert(is_integer(maps:get(alive_nodes, R)), M), + ?assert(is_integer(maps:get(connected_nodes, R)), M), ?assert(is_integer(maps:get(time_since_startup_in_milliseconds, R)), M) after 5000 -> ct:fail(timeout) @@ -2318,7 +2392,7 @@ disco_logs_nodedown(Config) -> meta := #{pid := Disco}, msg := {report, #{what := nodedown, remote_node := Node2} = R} }} = M -> - ?assert(is_integer(maps:get(alive_nodes, R)), M), + ?assert(is_integer(maps:get(connected_nodes, R)), M), ?assert(is_integer(maps:get(time_since_startup_in_milliseconds, R)), M), ?assert(is_integer(maps:get(connected_millisecond_duration, R)), M) after 5000 -> @@ -2349,7 +2423,7 @@ disco_logs_nodeup_after_downtime(Config) -> downtime_millisecond_duration := Downtime } = R} }} = M -> - ?assert(is_integer(maps:get(alive_nodes, R)), M), + ?assert(is_integer(maps:get(connected_nodes, R)), M), ?assert(is_integer(Downtime), M) after 5000 -> ct:fail(timeout) @@ -2847,3 +2921,10 @@ get_disco_timestamp(Disco, MapName, NodeKey) -> Info = cets_discovery:system_info(Disco), #{MapName := #{NodeKey := Timestamp}} = Info, Timestamp. + +make_signalling_process() -> + spawn_link(fun() -> + receive + stop -> ok + end + end).