Skip to content

Commit 1ceee49

Browse files
committed
grpcbox-stream: Fix to support multiple simultaneous unary/strem calls
This commit addresses stress-test failure mentioned in tsloughter#29. Added two new APIs: add_channel(Name, Endpoints, Options) delete_channel(Pid) This would give ability to user to add and delete channels on the fly. Also modified stress_test test case to use this logic. With out this change, stress test fails around 10 simultaneous connections. With this change I can see around 90 simultaneous connections. Signed-off-by: Vasu Dasari <[email protected]>
1 parent 3ad2a98 commit 1ceee49

File tree

2 files changed

+44
-5
lines changed

2 files changed

+44
-5
lines changed

src/grpcbox_channel.erl

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,16 @@
55
-export([start_link/3,
66
is_ready/1,
77
pick/2,
8-
stop/1]).
8+
stop/1,
9+
add_channel/3,
10+
delete_channel/1]).
911
-export([init/1,
1012
callback_mode/0,
1113
terminate/3,
1214
connected/3,
1315
idle/3]).
1416

17+
-include_lib("stdlib/include/ms_transform.hrl").
1518
-include("grpcbox.hrl").
1619

1720
-define(CHANNEL(Name), {via, gproc, {n, l, {?MODULE, Name}}}).
@@ -45,6 +48,16 @@
4548
stats_handler :: module() | undefined,
4649
refresh_interval :: timer:time()}).
4750

51+
%% @doc add a new channel
52+
-spec add_channel(name(), [endpoint()], options()) -> {ok, pid()}.
53+
add_channel(Name, Endpoints, Options) ->
54+
grpcbox_channel_sup:start_child(Name, Endpoints, Options).
55+
56+
%% @doc Delete a channel
57+
-spec delete_channel(pid()) -> any().
58+
delete_channel(Pid) when is_pid(Pid) ->
59+
ok = supervisor:terminate_child(grpcbox_channel_sup, Pid).
60+
4861
-spec start_link(name(), [endpoint()], options()) -> {ok, pid()}.
4962
start_link(Name, Endpoints, Options) ->
5063
gen_statem:start_link(?CHANNEL(Name), ?MODULE, [Name, Endpoints, Options], []).
@@ -129,6 +142,12 @@ handle_event(_, _, Data) ->
129142
{keep_state, Data}.
130143

131144
terminate(_Reason, _State, #data{pool=Name}) ->
145+
[ets:delete(?CHANNELS_TAB, Key) || Key <-
146+
ets:select(?CHANNELS_TAB, ets:fun2ms(fun
147+
({{N, V}, _}) when N == Name ->
148+
{N, V}
149+
end))
150+
],
132151
gproc_pool:force_delete(Name),
133152
ok.
134153

test/grpcbox_SUITE.erl

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -477,8 +477,8 @@ multiple_servers(_Config) ->
477477
unary(_Config),
478478
unary(_Config).
479479

480-
bidirectional(_Config) ->
481-
{ok, S} = routeguide_route_guide_client:route_chat(ctx:new()),
480+
bidirectional(Config) ->
481+
{ok, S} = routeguide_route_guide_client:route_chat(ctx:new(), proplists:get_value(options, Config, #{})),
482482
%% send 2 before receiving since the server only sends what it already had in its list of messages for the
483483
%% location of your last send.
484484
ok = grpcbox_client:send(S, #{location => #{latitude => 1, longitude => 1}, message => <<"hello there">>}),
@@ -561,10 +561,30 @@ stress_test(Config) ->
561561

562562
stress_test(Config, Count) ->
563563
lists:foreach(fun
564-
(Ref) ->
564+
(ProcId) ->
565565
Parent = self(),
566566
spawn(fun() ->
567-
stress_test_function(fun bidirectional/1, Config, Ref, Parent) end)
567+
Channel = erlang:list_to_atom("proc_" ++ erlang:integer_to_list(ProcId)),
568+
erlang:register(Channel, self()),
569+
{ok, _Pid} = grpcbox_channel:add_channel(
570+
Channel,
571+
[{http, "localhost", 8080, []}],
572+
#{}
573+
),
574+
lists:foldl(fun
575+
(_, not_ready) ->
576+
timer:sleep(10),
577+
grpcbox_channel:is_ready(Channel);
578+
(_,Acc) ->
579+
Acc
580+
end, not_ready, lists:seq(1, 100)),
581+
582+
stress_test_function(fun bidirectional/1,
583+
[{options,#{channel => Channel}} | Config],
584+
ProcId, Parent),
585+
ok
586+
%% grpcbox_channel:delete_channel(Pid)
587+
end)
568588
end, lists:seq(1, Count)),
569589

570590
Loop = fun Loop(LoopCount) ->

0 commit comments

Comments
 (0)