Skip to content

Commit

Permalink
Wait for get_nodes to be retried in wait_for_get_nodes
Browse files Browse the repository at this point in the history
Add disco_wait_for_get_nodes_when_get_nodes_needs_to_be_retried testcase
  • Loading branch information
arcusfelis committed Nov 28, 2023
1 parent 264366f commit ac1afc3
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 11 deletions.
20 changes: 14 additions & 6 deletions src/cets_discovery.erl
Original file line number Diff line number Diff line change
Expand Up @@ -157,8 +157,9 @@ wait_for_ready(Server, Timeout) ->
Info = #{task => cets_wait_for_ready},
cets_long:run_tracked(Info, F).

%% Waits for the currect get_nodes call to return.
%% Waits for the current get_nodes call to return.
%% Just returns if there is no gen_nodes call running.
%% Waits for another get_nodes, if should_retry_get_nodes flag is set.
%% It is different from wait_for_ready, because it does not wait for
%% unavailable nodes to return pang.
-spec wait_for_get_nodes(server(), timeout()) -> ok.
Expand Down Expand Up @@ -297,17 +298,24 @@ handle_check(State = #{backend_module := Mod, backend_state := BackendState}) ->

-spec handle_get_nodes_result(Result, BackendState, State) -> State when
Result :: get_nodes_result(), BackendState :: backend_state(), State :: state().
handle_get_nodes_result(Result, BackendState, State = #{pending_wait_for_get_nodes := Pending}) ->
[gen_server:reply(From, ok) || From <- Pending],
State2 = State#{
handle_get_nodes_result(Result, BackendState, State) ->
State2 = maybe_reply_to_wait_for_get_nodes(State#{
backend_state := BackendState,
get_nodes_status := not_running,
pending_wait_for_get_nodes := [],
last_get_nodes_result := Result
},
}),
State3 = set_nodes(Result, State2),
schedule_check(trigger_verify_ready(State3)).

-spec maybe_reply_to_wait_for_get_nodes(state()) -> state().
maybe_reply_to_wait_for_get_nodes(
State = #{should_retry_get_nodes := false, pending_wait_for_get_nodes := Pending = [_ | _]}
) ->
[gen_server:reply(From, ok) || From <- Pending],
State#{pending_wait_for_get_nodes := []};
maybe_reply_to_wait_for_get_nodes(State) ->
State.

-spec set_nodes({error, term()} | {ok, [node()]}, state()) -> state().
set_nodes({error, _Reason}, State) ->
State;
Expand Down
56 changes: 51 additions & 5 deletions test/cets_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ cases() ->
status_conflict_nodes,
disco_wait_for_get_nodes_works,
disco_wait_for_get_nodes_blocks_and_returns,
disco_wait_for_get_nodes_when_get_nodes_needs_to_be_retried,
get_nodes_request,
test_locally,
handle_down_is_called,
Expand Down Expand Up @@ -1692,11 +1693,7 @@ disco_wait_for_get_nodes_works(_Config) ->
disco_wait_for_get_nodes_blocks_and_returns(Config) ->
Tab = make_name(Config, 1),
{ok, _Pid} = start_local(Tab, #{}),
SignallingPid = spawn_link(fun() ->
receive
stop -> ok
end
end),
SignallingPid = make_signalling_process(),
F = fun(State) ->
wait_for_down(SignallingPid),
{{ok, []}, State}
Expand All @@ -1717,6 +1714,48 @@ disco_wait_for_get_nodes_blocks_and_returns(Config) ->
wait_for_down(WaitPid),
ok.

%% Check that wait_for_get_nodes waits in case get_nodes should be retried
disco_wait_for_get_nodes_when_get_nodes_needs_to_be_retried(Config) ->
Me = self(),
Tab = make_name(Config, 1),
{ok, _Pid} = start_local(Tab, #{}),
SignallingPid1 = make_signalling_process(),
SignallingPid2 = make_signalling_process(),
F = fun
(State = #{step := 1}) ->
wait_for_down(SignallingPid1),
{{ok, []}, State#{step => 2}};
(State = #{step := 2}) ->
Me ! entered_get_nodes2,
wait_for_down(SignallingPid2),
{{ok, []}, State#{step => 2}}
end,
{ok, Disco} = cets_discovery:start(#{
backend_module => cets_discovery_fun, get_nodes_fn => F, step => 1
}),
cets_discovery:add_table(Disco, Tab),
%% Enter into a blocking get_nodes function
Disco ! check,
%% Do it async, because it would block is
WaitPid = spawn_link(fun() -> ok = cets_discovery:wait_for_get_nodes(Disco, 5000) end),
Cond = fun() ->
length(maps:get(pending_wait_for_get_nodes, cets_discovery:system_info(Disco)))
end,
cets_test_wait:wait_until(Cond, 1),
%% Set should_retry_get_nodes
Disco ! check,
%% Ensure check message is received
cets_discovery:system_info(Disco),
%% Unblock first get_nodes call
SignallingPid1 ! stop,
receive_message(entered_get_nodes2),
%% Still waiting for get_nodes being retried
true = erlang:is_process_alive(WaitPid),
%% It returns finally after second get_nodes call
SignallingPid2 ! stop,
wait_for_down(WaitPid),
ok.

get_nodes_request(Config) ->
#{ct2 := Node2, ct3 := Node3, ct4 := Node4} = proplists:get_value(nodes, Config),
Tab = make_name(Config),
Expand Down Expand Up @@ -2882,3 +2921,10 @@ get_disco_timestamp(Disco, MapName, NodeKey) ->
Info = cets_discovery:system_info(Disco),
#{MapName := #{NodeKey := Timestamp}} = Info,
Timestamp.

make_signalling_process() ->
spawn_link(fun() ->
receive
stop -> ok
end
end).

0 comments on commit ac1afc3

Please sign in to comment.