From 8d8e4fa7b97d751b9b9ba994355f33b65f6e8c0c Mon Sep 17 00:00:00 2001 From: Mikhail Uvarov Date: Fri, 24 Nov 2023 15:42:37 +0100 Subject: [PATCH 1/3] Add cets_discovery:wait_for_get_nodes/2 This function blocks until get_nodes returns --- src/cets_discovery.erl | 28 ++++++++++++++++++++++++---- test/cets_SUITE.erl | 35 +++++++++++++++++++++++++++++++++++ 2 files changed, 59 insertions(+), 4 deletions(-) diff --git a/src/cets_discovery.erl b/src/cets_discovery.erl index 3358b31e..bd12f660 100644 --- a/src/cets_discovery.erl +++ b/src/cets_discovery.erl @@ -35,7 +35,8 @@ get_tables/1, info/1, system_info/1, - wait_for_ready/2 + wait_for_ready/2, + wait_for_get_nodes/2 ]). -export([ init/1, @@ -55,6 +56,7 @@ info/1, system_info/1, wait_for_ready/2, + wait_for_get_nodes/2, behaviour_info/1 ]). @@ -91,6 +93,7 @@ should_retry_join := boolean(), timer_ref := reference() | undefined, pending_wait_for_ready := [gen_server:from()], + pending_wait_for_get_nodes := [gen_server:from()], nodeup_timestamps := #{node() => milliseconds()}, nodedown_timestamps := #{node() => milliseconds()}, node_start_timestamps := #{node() => milliseconds()}, @@ -146,14 +149,24 @@ info(Server) -> system_info(Server) -> gen_server:call(Server, system_info). -%% This calls blocks until the initial discovery is done -%% It also waits till the data is loaded from the remote nodes +%% This calls blocks until the initial discovery is done. +%% It also waits till the data is loaded from the remote nodes. -spec wait_for_ready(server(), timeout()) -> ok. wait_for_ready(Server, Timeout) -> F = fun() -> gen_server:call(Server, wait_for_ready, Timeout) end, Info = #{task => cets_wait_for_ready}, cets_long:run_tracked(Info, F). +%% Waits for the currect get_nodes call to return. +%% Just returns if there is no gen_nodes call running. +%% It is different from wait_for_ready, because it does not wait for +%% unavailable nodes to return pang. +-spec wait_for_get_nodes(server(), timeout()) -> ok. +wait_for_get_nodes(Server, Timeout) -> + F = fun() -> gen_server:call(Server, wait_for_get_nodes, Timeout) end, + Info = #{task => cets_wait_for_get_nodes}, + cets_long:run_tracked(Info, F). + -spec init(term()) -> {ok, state()}. init(Opts) -> StartTime = erlang:system_time(millisecond), @@ -181,6 +194,7 @@ init(Opts) -> should_retry_join => false, timer_ref => undefined, pending_wait_for_ready => [], + pending_wait_for_get_nodes => [], nodeup_timestamps => #{}, node_start_timestamps => #{}, nodedown_timestamps => #{}, @@ -198,6 +212,10 @@ handle_call(system_info, _From, State) -> {reply, handle_system_info(State), State}; handle_call(wait_for_ready, From, State = #{pending_wait_for_ready := Pending}) -> {noreply, trigger_verify_ready(State#{pending_wait_for_ready := [From | Pending]})}; +handle_call(wait_for_get_nodes, _From, State = #{get_nodes_status := not_running}) -> + {reply, ok, State}; +handle_call(wait_for_get_nodes, From, State = #{pending_wait_for_get_nodes := Pending}) -> + {noreply, State#{pending_wait_for_get_nodes := [From | Pending]}}; handle_call(Msg, From, State) -> ?LOG_ERROR(#{what => unexpected_call, msg => Msg, from => From}), {reply, {error, unexpected_call}, State}. @@ -279,10 +297,12 @@ handle_check(State = #{backend_module := Mod, backend_state := BackendState}) -> -spec handle_get_nodes_result(Result, BackendState, State) -> State when Result :: get_nodes_result(), BackendState :: backend_state(), State :: state(). -handle_get_nodes_result(Result, BackendState, State) -> +handle_get_nodes_result(Result, BackendState, State = #{pending_wait_for_get_nodes := Pending}) -> + [gen_server:reply(From, ok) || From <- Pending], State2 = State#{ backend_state := BackendState, get_nodes_status := not_running, + pending_wait_for_get_nodes := [], last_get_nodes_result := Result }, State3 = set_nodes(Result, State2), diff --git a/test/cets_SUITE.erl b/test/cets_SUITE.erl index 88e35a07..7dc21ac6 100644 --- a/test/cets_SUITE.erl +++ b/test/cets_SUITE.erl @@ -109,6 +109,8 @@ cases() -> status_remote_nodes_with_unknown_tables, status_remote_nodes_with_missing_nodes, status_conflict_nodes, + disco_wait_for_get_nodes_works, + disco_wait_for_get_nodes_blocks_and_returns, get_nodes_request, test_locally, handle_down_is_called, @@ -1682,6 +1684,39 @@ status_conflict_nodes(Config) -> fun() -> maps:get(conflict_tables, cets_status:status(DiscoName)) end, [Tab2] ). +disco_wait_for_get_nodes_works(_Config) -> + F = fun(State) -> {{ok, []}, State} end, + {ok, Disco} = cets_discovery:start(#{backend_module => cets_discovery_fun, get_nodes_fn => F}), + ok = cets_discovery:wait_for_get_nodes(Disco, 5000). + +disco_wait_for_get_nodes_blocks_and_returns(Config) -> + Tab = make_name(Config, 1), + {ok, _Pid} = start_local(Tab, #{}), + SignallingPid = spawn_link(fun() -> + receive + stop -> ok + end + end), + F = fun(State) -> + wait_for_down(SignallingPid), + {{ok, []}, State} + end, + {ok, Disco} = cets_discovery:start(#{backend_module => cets_discovery_fun, get_nodes_fn => F}), + cets_discovery:add_table(Disco, Tab), + %% Enter into a blocking get_nodes function + Disco ! check, + %% Do it async, because it would block is + WaitPid = spawn_link(fun() -> ok = cets_discovery:wait_for_get_nodes(Disco, 5000) end), + Cond = fun() -> + length(maps:get(pending_wait_for_get_nodes, cets_discovery:system_info(Disco))) + end, + cets_test_wait:wait_until(Cond, 1), + %% Unblock get_nodes call + SignallingPid ! stop, + %% wait_for_get_nodes returns + wait_for_down(WaitPid), + ok. + get_nodes_request(Config) -> #{ct2 := Node2, ct3 := Node3, ct4 := Node4} = proplists:get_value(nodes, Config), Tab = make_name(Config), From 264366fc8d4d3b903c1dde5283e1a69ee8d588a6 Mon Sep 17 00:00:00 2001 From: Mikhail Uvarov Date: Fri, 24 Nov 2023 21:39:38 +0100 Subject: [PATCH 2/3] Rename alive_nodes to connected_nodes to avoid confusion --- src/cets_discovery.erl | 4 ++-- test/cets_SUITE.erl | 6 +++--- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/src/cets_discovery.erl b/src/cets_discovery.erl index bd12f660..da562cb9 100644 --- a/src/cets_discovery.erl +++ b/src/cets_discovery.erl @@ -506,7 +506,7 @@ handle_nodedown(Node, State) -> set_defined(connected_millisecond_duration, NodeUpTime, #{ what => nodedown, remote_node => Node, - alive_nodes => length(nodes()) + 1, + connected_nodes => length(nodes()) + 1, time_since_startup_in_milliseconds => time_since_startup_in_milliseconds(State) }) ), @@ -521,7 +521,7 @@ handle_nodeup(Node, State) -> set_defined(downtime_millisecond_duration, NodeDownTime, #{ what => nodeup, remote_node => Node, - alive_nodes => length(nodes()) + 1, + connected_nodes => length(nodes()) + 1, %% We report that time so we could work on minimizing that time. %% It says how long it took to discover nodes after startup. time_since_startup_in_milliseconds => time_since_startup_in_milliseconds(State) diff --git a/test/cets_SUITE.erl b/test/cets_SUITE.erl index 7dc21ac6..5019be62 100644 --- a/test/cets_SUITE.erl +++ b/test/cets_SUITE.erl @@ -2331,7 +2331,7 @@ disco_logs_nodeup(Config) -> meta := #{pid := Disco}, msg := {report, #{what := nodeup, remote_node := Node2} = R} }} = M -> - ?assert(is_integer(maps:get(alive_nodes, R)), M), + ?assert(is_integer(maps:get(connected_nodes, R)), M), ?assert(is_integer(maps:get(time_since_startup_in_milliseconds, R)), M) after 5000 -> ct:fail(timeout) @@ -2353,7 +2353,7 @@ disco_logs_nodedown(Config) -> meta := #{pid := Disco}, msg := {report, #{what := nodedown, remote_node := Node2} = R} }} = M -> - ?assert(is_integer(maps:get(alive_nodes, R)), M), + ?assert(is_integer(maps:get(connected_nodes, R)), M), ?assert(is_integer(maps:get(time_since_startup_in_milliseconds, R)), M), ?assert(is_integer(maps:get(connected_millisecond_duration, R)), M) after 5000 -> @@ -2384,7 +2384,7 @@ disco_logs_nodeup_after_downtime(Config) -> downtime_millisecond_duration := Downtime } = R} }} = M -> - ?assert(is_integer(maps:get(alive_nodes, R)), M), + ?assert(is_integer(maps:get(connected_nodes, R)), M), ?assert(is_integer(Downtime), M) after 5000 -> ct:fail(timeout) From ac1afc3b478b98efdb9aa5adc9f7ab10b5f50901 Mon Sep 17 00:00:00 2001 From: Mikhail Uvarov Date: Tue, 28 Nov 2023 16:44:17 +0100 Subject: [PATCH 3/3] Wait for get_nodes to be retried in wait_for_get_nodes Add disco_wait_for_get_nodes_when_get_nodes_needs_to_be_retried testcase --- src/cets_discovery.erl | 20 ++++++++++----- test/cets_SUITE.erl | 56 ++++++++++++++++++++++++++++++++++++++---- 2 files changed, 65 insertions(+), 11 deletions(-) diff --git a/src/cets_discovery.erl b/src/cets_discovery.erl index da562cb9..f8a74ced 100644 --- a/src/cets_discovery.erl +++ b/src/cets_discovery.erl @@ -157,8 +157,9 @@ wait_for_ready(Server, Timeout) -> Info = #{task => cets_wait_for_ready}, cets_long:run_tracked(Info, F). -%% Waits for the currect get_nodes call to return. +%% Waits for the current get_nodes call to return. %% Just returns if there is no gen_nodes call running. +%% Waits for another get_nodes, if should_retry_get_nodes flag is set. %% It is different from wait_for_ready, because it does not wait for %% unavailable nodes to return pang. -spec wait_for_get_nodes(server(), timeout()) -> ok. @@ -297,17 +298,24 @@ handle_check(State = #{backend_module := Mod, backend_state := BackendState}) -> -spec handle_get_nodes_result(Result, BackendState, State) -> State when Result :: get_nodes_result(), BackendState :: backend_state(), State :: state(). -handle_get_nodes_result(Result, BackendState, State = #{pending_wait_for_get_nodes := Pending}) -> - [gen_server:reply(From, ok) || From <- Pending], - State2 = State#{ +handle_get_nodes_result(Result, BackendState, State) -> + State2 = maybe_reply_to_wait_for_get_nodes(State#{ backend_state := BackendState, get_nodes_status := not_running, - pending_wait_for_get_nodes := [], last_get_nodes_result := Result - }, + }), State3 = set_nodes(Result, State2), schedule_check(trigger_verify_ready(State3)). +-spec maybe_reply_to_wait_for_get_nodes(state()) -> state(). +maybe_reply_to_wait_for_get_nodes( + State = #{should_retry_get_nodes := false, pending_wait_for_get_nodes := Pending = [_ | _]} +) -> + [gen_server:reply(From, ok) || From <- Pending], + State#{pending_wait_for_get_nodes := []}; +maybe_reply_to_wait_for_get_nodes(State) -> + State. + -spec set_nodes({error, term()} | {ok, [node()]}, state()) -> state(). set_nodes({error, _Reason}, State) -> State; diff --git a/test/cets_SUITE.erl b/test/cets_SUITE.erl index 5019be62..a5d13cbb 100644 --- a/test/cets_SUITE.erl +++ b/test/cets_SUITE.erl @@ -111,6 +111,7 @@ cases() -> status_conflict_nodes, disco_wait_for_get_nodes_works, disco_wait_for_get_nodes_blocks_and_returns, + disco_wait_for_get_nodes_when_get_nodes_needs_to_be_retried, get_nodes_request, test_locally, handle_down_is_called, @@ -1692,11 +1693,7 @@ disco_wait_for_get_nodes_works(_Config) -> disco_wait_for_get_nodes_blocks_and_returns(Config) -> Tab = make_name(Config, 1), {ok, _Pid} = start_local(Tab, #{}), - SignallingPid = spawn_link(fun() -> - receive - stop -> ok - end - end), + SignallingPid = make_signalling_process(), F = fun(State) -> wait_for_down(SignallingPid), {{ok, []}, State} @@ -1717,6 +1714,48 @@ disco_wait_for_get_nodes_blocks_and_returns(Config) -> wait_for_down(WaitPid), ok. +%% Check that wait_for_get_nodes waits in case get_nodes should be retried +disco_wait_for_get_nodes_when_get_nodes_needs_to_be_retried(Config) -> + Me = self(), + Tab = make_name(Config, 1), + {ok, _Pid} = start_local(Tab, #{}), + SignallingPid1 = make_signalling_process(), + SignallingPid2 = make_signalling_process(), + F = fun + (State = #{step := 1}) -> + wait_for_down(SignallingPid1), + {{ok, []}, State#{step => 2}}; + (State = #{step := 2}) -> + Me ! entered_get_nodes2, + wait_for_down(SignallingPid2), + {{ok, []}, State#{step => 2}} + end, + {ok, Disco} = cets_discovery:start(#{ + backend_module => cets_discovery_fun, get_nodes_fn => F, step => 1 + }), + cets_discovery:add_table(Disco, Tab), + %% Enter into a blocking get_nodes function + Disco ! check, + %% Do it async, because it would block is + WaitPid = spawn_link(fun() -> ok = cets_discovery:wait_for_get_nodes(Disco, 5000) end), + Cond = fun() -> + length(maps:get(pending_wait_for_get_nodes, cets_discovery:system_info(Disco))) + end, + cets_test_wait:wait_until(Cond, 1), + %% Set should_retry_get_nodes + Disco ! check, + %% Ensure check message is received + cets_discovery:system_info(Disco), + %% Unblock first get_nodes call + SignallingPid1 ! stop, + receive_message(entered_get_nodes2), + %% Still waiting for get_nodes being retried + true = erlang:is_process_alive(WaitPid), + %% It returns finally after second get_nodes call + SignallingPid2 ! stop, + wait_for_down(WaitPid), + ok. + get_nodes_request(Config) -> #{ct2 := Node2, ct3 := Node3, ct4 := Node4} = proplists:get_value(nodes, Config), Tab = make_name(Config), @@ -2882,3 +2921,10 @@ get_disco_timestamp(Disco, MapName, NodeKey) -> Info = cets_discovery:system_info(Disco), #{MapName := #{NodeKey := Timestamp}} = Info, Timestamp. + +make_signalling_process() -> + spawn_link(fun() -> + receive + stop -> ok + end + end).