Skip to content

Commit

Permalink
Extend the API with opaque runs
Browse files Browse the repository at this point in the history
Sometimes a library exposes a very complex API but uses a gen_server
under the hood, and we want to pool this gen_server, but we're not
supposed to explicitly do `gen_server:call/3` nor `gen_server:cast/2`,
but use the API instead. To enable this, we expose the possibility to
run a function callback with a worker once a worker has been found.
  • Loading branch information
NelsonVides committed Sep 22, 2024
1 parent f3ee060 commit eab9100
Show file tree
Hide file tree
Showing 5 changed files with 156 additions and 19 deletions.
60 changes: 57 additions & 3 deletions src/wpool.erl
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,27 @@
%% Callbacks can be added and removed later by `wpool_pool:add_callback_module/2' and
%% `wpool_pool:remove_callback_module/2'.

-type run(Result) :: fun((wpool:name() | pid()) -> Result).
%% A function to run with a given worker.
%%
%% It can be used to enable APIs that hide the gen_server behind a complex logic
%% that might for example curate parameters or run side-effects, for example, `supervisor'.
%%
%% For example:
%% ```
%% Opts =
%% #{workers => 3,
%% worker_shutdown => infinity,
%% worker => {supervisor, {Name, ModuleCallback, Args}}},
%% %% Note that the supervisor's `init/1' callback takes such 3-tuple.
%% {ok, Pid} = wpool:start_sup_pool(pool_of_supervisors, Opts),
%%
%% ...
%%
%% Run = fun(Sup) -> supervisor:start_child(Sup, Params) end,
%% {ok, Pid} = wpool:run(pool_of_supervisors, Run, next_worker),
%% '''

-type name() :: atom().
%% Name of the pool

Expand Down Expand Up @@ -274,13 +295,14 @@
%% Statistics about a given live pool.

-export_type([name/0, option/0, options/0, custom_strategy/0, strategy/0,
queue_type/0, worker_stats/0, stats/0]).
queue_type/0, run/1, worker_stats/0, stats/0]).

-export([start/0, start/2, stop/0, stop/1]).
-export([child_spec/2, start_pool/1, start_pool/2, start_sup_pool/1, start_sup_pool/2]).
-export([stop_pool/1, stop_sup_pool/1]).
-export([call/2, cast/2, call/3, cast/3, call/4, broadcall/3, broadcast/2]).
-export([send_request/2, send_request/3, send_request/4]).
-export([call/2, call/3, call/4, cast/2, cast/3,
run/2, run/3, run/4, broadcall/3, broadcast/2,
send_request/2, send_request/3, send_request/4]).
-export([stats/0, stats/1, get_workers/1]).
-export([default_strategy/0]).

Expand Down Expand Up @@ -370,6 +392,38 @@ default_strategy() ->
Strategy
end.

%% @equiv run(Sup, Run, default_strategy())
-spec run(name(), run(Result)) -> Result.
run(Sup, Run) ->
run(Sup, Run, default_strategy()).

%% @equiv run(Sup, Run, Strategy, 5000)
-spec run(name(), run(Result), strategy()) -> Result.
run(Sup, Run, Strategy) ->
run(Sup, Run, Strategy, 5000).

%% @doc Picks a server and issues the run to it.
%%
%% For all strategies except available_worker, Timeout applies only to the
%% time spent on the actual run to the worker, because time spent finding
%% the worker in other strategies is negligible.
%% For available_worker the time used choosing a worker is also considered
-spec run(name(), run(Result), strategy(), timeout()) -> Result.
run(Sup, Run, available_worker, Timeout) ->
wpool_pool:run_with_available_worker(Sup, Run, Timeout);
run(Sup, Run, next_available_worker, _Timeout) ->
wpool_process:run(wpool_pool:next_available_worker(Sup), Run);
run(Sup, Run, next_worker, _Timeout) ->
wpool_process:run(wpool_pool:next_worker(Sup), Run);
run(Sup, Run, random_worker, _Timeout) ->
wpool_process:run(wpool_pool:random_worker(Sup), Run);
run(Sup, Run, best_worker, _Timeout) ->
wpool_process:run(wpool_pool:best_worker(Sup), Run);
run(Sup, Run, {hash_worker, HashKey}, _Timeout) ->
wpool_process:run(wpool_pool:hash_worker(Sup, HashKey), Run);
run(Sup, Run, Fun, _Timeout) when is_function(Fun, 1) ->
wpool_process:run(Fun(Sup), Run).

%% @equiv call(Sup, Call, default_strategy())
-spec call(name(), term()) -> term().
call(Sup, Call) ->
Expand Down
40 changes: 33 additions & 7 deletions src/wpool_pool.erl
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@
%% API
-export([start_link/2]).
-export([best_worker/1, random_worker/1, next_worker/1, hash_worker/2,
next_available_worker/1, send_request_available_worker/3, call_available_worker/3]).
next_available_worker/1, send_request_available_worker/3, call_available_worker/3,
run_with_available_worker/3]).
-export([cast_to_available_worker/2, broadcast/2, broadcall/3]).
-export([stats/0, stats/1, get_workers/1]).
-export([worker_name/2, find_wpool/1]).
Expand Down Expand Up @@ -112,19 +113,44 @@ next_available_worker(Name) ->
end
end.

%% @doc Picks the first available worker and sends the call to it.
%% The timeout provided includes the time it takes to get a worker
%% and for it to process the call.
%% @throws no_workers | timeout
-spec run_with_available_worker(wpool:name(), wpool:run(Result), timeout()) -> Result.
run_with_available_worker(Name, Run, Timeout) ->
case find_wpool(Name) of
undefined ->
exit(no_workers);
#wpool{qmanager = QManager} ->
case wpool_queue_manager:run_with_available_worker(QManager, Run, Timeout) of
noproc ->
exit(no_workers);
timeout ->
exit(timeout);
Result ->
Result
end
end.

%% @doc Picks the first available worker and sends the call to it.
%% The timeout provided includes the time it takes to get a worker
%% and for it to process the call.
%% @throws no_workers | timeout
-spec call_available_worker(wpool:name(), any(), timeout()) -> any().
call_available_worker(Name, Call, Timeout) ->
case wpool_queue_manager:call_available_worker(queue_manager_name(Name), Call, Timeout) of
noproc ->
case find_wpool(Name) of
undefined ->
exit(no_workers);
timeout ->
exit(timeout);
Result ->
Result
#wpool{qmanager = QManager} ->
case wpool_queue_manager:call_available_worker(QManager, Call, Timeout) of
noproc ->
exit(no_workers);
timeout ->
exit(timeout);
Result ->
Result
end
end.

%% @doc Picks the first available worker and sends the request to it.
Expand Down
7 changes: 6 additions & 1 deletion src/wpool_process.erl
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@
-export_type([next_step/0]).

%% api
-export([start_link/4, call/3, cast/2, send_request/2]).
-export([start_link/4, run/2, call/3, cast/2, send_request/2]).

-ifdef(TEST).

Expand All @@ -91,6 +91,11 @@ start_link(Name, Module, InitArgs, Options) ->
{Name, Module, InitArgs, FullOpts},
WorkerOpt).

%% @doc Runs a function that takes as a parameter the given process
-spec run(wpool:name() | pid(), wpool:run(Result)) -> Result.
run(Process, Run) ->
Run(Process).

%% @equiv gen_server:call(Process, Call, Timeout)
-spec call(wpool:name() | pid(), term(), timeout()) -> term().
call(Process, Call, Timeout) ->
Expand Down
19 changes: 17 additions & 2 deletions src/wpool_queue_manager.erl
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@

%% api
-export([start_link/2, start_link/3]).
-export([call_available_worker/3, cast_to_available_worker/2, new_worker/2, worker_dead/2,
send_request_available_worker/3, worker_ready/2, worker_busy/2, pending_task_count/1]).
-export([run_with_available_worker/3, call_available_worker/3, cast_to_available_worker/2,
new_worker/2, worker_dead/2, send_request_available_worker/3, worker_ready/2,
worker_busy/2, pending_task_count/1]).
%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2]).

Expand Down Expand Up @@ -70,6 +71,20 @@ start_link(WPool, Name) ->
start_link(WPool, Name, Options) ->
gen_server:start_link({local, Name}, ?MODULE, [{pool, WPool} | Options], []).

%% @doc returns the first available worker in the pool
-spec run_with_available_worker(queue_mgr(), wpool:run(Result), timeout()) ->
noproc | timeout | Result.
run_with_available_worker(QueueManager, Call, Timeout) ->
case get_available_worker(QueueManager, Call, Timeout) of
{ok, TimeLeft, Worker} when TimeLeft > 0 ->
wpool_process:run(Worker, Call);
{ok, _, Worker} ->
worker_ready(QueueManager, Worker),
timeout;
Other ->
Other
end.

%% @doc returns the first available worker in the pool
-spec call_available_worker(queue_mgr(), any(), timeout()) -> noproc | timeout | any().
call_available_worker(QueueManager, Call, Timeout) ->
Expand Down
49 changes: 43 additions & 6 deletions test/wpool_pool_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -77,14 +77,25 @@ stop_worker(_Config) ->
-spec available_worker(config()) -> {comment, []}.
available_worker(_Config) ->
Pool = available_worker,
Run = fun(Worker) -> gen_server:call(Worker, {erlang, self, []}) end,
try wpool:call(not_a_pool, x) of
Result ->
no_result = Result
no_result ->
no_result
catch
_:no_workers ->
ok
end,

try wpool:run(not_a_pool, Run) of
no_result ->
no_result
catch
_:no_workers ->
ok
end,

{ok, _} = wpool:run(Pool, Run, available_worker),

ct:log("Put them all to work, each request should go to a different worker"),
[wpool:cast(Pool, {timer, sleep, [5000]}) || _ <- lists:seq(1, ?WORKERS)],

Expand All @@ -107,11 +118,19 @@ available_worker(_Config) ->

ct:log("If we can't wait we get no workers"),
try wpool:call(Pool, {erlang, self, []}, available_worker, 100) of
R ->
should_fail = R
should_fail ->
should_fail
catch
_:timeout ->
timeout
end,

try wpool:run(Pool, Run, available_worker, 100) of
should_fail ->
should_fail
catch
_:Error ->
timeout = Error
_:timeout ->
timeout
end,

ct:log("Let's wait until all workers are free"),
Expand Down Expand Up @@ -155,6 +174,9 @@ best_worker(_Config) ->
ok
end,

Run = fun(Worker) -> gen_server:call(Worker, {erlang, self, []}) end,
{ok, _} = wpool:run(Pool, Run, best_worker),

Req = wpool:send_request(Pool, {erlang, self, []}, best_worker),
{reply, {ok, _}} = gen_server:wait_response(Req, 5000),

Expand Down Expand Up @@ -185,6 +207,9 @@ next_available_worker(_Config) ->
ok
end,

Run = fun(Worker) -> gen_server:call(Worker, {erlang, self, []}) end,
{ok, _} = wpool:run(Pool, Run, next_available_worker),

ct:log("Put them all to work..."),
[wpool:cast(Pool, {timer, sleep, [1500 + I]}, next_available_worker)
|| I <- lists:seq(0, (?WORKERS - 1) * 60000, 60000)],
Expand Down Expand Up @@ -261,6 +286,9 @@ next_worker(_Config) ->
Req = wpool:send_request(Pool, {erlang, self, []}, next_worker),
{reply, {ok, _}} = gen_server:wait_response(Req, 5000),

Run = fun(Worker) -> gen_server:call(Worker, {erlang, self, []}) end,
{ok, _} = wpool:run(Pool, Run, next_worker),

{comment, []}.

-spec random_worker(config()) -> {comment, []}.
Expand All @@ -275,6 +303,9 @@ random_worker(_Config) ->
ok
end,

Run = fun(Worker) -> gen_server:call(Worker, {erlang, self, []}) end,
{ok, _} = wpool:run(Pool, Run, random_worker),

%% Ask for a random worker's identity 20x more than the number of workers
%% and expect to get an answer from every worker at least once.
Serial =
Expand Down Expand Up @@ -342,6 +373,9 @@ hash_worker(_Config) ->
sets:size(
sets:from_list(Spread)),

Run = fun(Worker) -> gen_server:call(Worker, {erlang, self, []}) end,
[{ok, _} = wpool:run(Pool, Run, {hash_worker, I}) || I <- lists:seq(1, 20 * ?WORKERS)],

%% Fill up their message queues...
[wpool:cast(Pool, {timer, sleep, [60000]}, {hash_worker, I})
|| I <- lists:seq(1, 20 * ?WORKERS)],
Expand Down Expand Up @@ -393,6 +427,9 @@ custom_worker(_Config) ->
Req = wpool:send_request(Pool, {erlang, self, []}, Strategy),
{reply, {ok, _}} = gen_server:wait_response(Req, 5000),

Run = fun(Worker) -> gen_server:call(Worker, {erlang, self, []}) end,
{ok, _} = wpool:run(Pool, Run, Strategy),

{comment, []}.

-spec manager_crash(config()) -> {comment, []}.
Expand Down

0 comments on commit eab9100

Please sign in to comment.