Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add more flexible support for long running callback handlers #52

Draft
wants to merge 21 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
ffdef20
fix compat with latest chatterbox. Easier support for long lived stre…
andymck Dec 14, 2020
88109d3
update to accomodate introspection functions returning props
andymck Dec 16, 2020
94368ce
update plugin version
andymck Dec 18, 2020
fcca953
add support for initializing handler state. pass thru info msgs to h…
andymck Jan 4, 2021
46fa309
interim changes to handle client side streaming
andymck Jan 11, 2021
5b2ffcf
fix specs
andymck Jan 15, 2021
c3edd68
export end stream
andymck Jan 27, 2021
58e7061
tidy up
andymck Feb 23, 2021
0fa60ef
fix plugin version
andymck Feb 24, 2021
5d32b2a
add getters and setters for stream handler state
andymck Feb 24, 2021
b172276
accomodate introspection functions returning props or maps
andymck Dec 16, 2020
9ed3957
allow output streams to handle error response from handlers
andymck Jan 11, 2021
082c05f
handle incoming stream msgs within stream process, dont spawn off han…
andymck Mar 2, 2021
d5caf9e
pass function name to handle info if supported
andymck Apr 8, 2021
a403545
reuse existing listen socket if available during a grpcbox_socket res…
andymck Jul 27, 2021
ce13bcd
shutdown existing socket when eaddrinuse hit during socket process re…
andymck Aug 11, 2021
a72c0b6
send headers distinct from data msgs
andymck Aug 12, 2021
cd961f3
stop/reset a stream when sending an end_stream flag
andymck May 6, 2022
e98a846
uncomment trap exit
andymck May 7, 2022
a96af67
refactor socket mod to avoid restart when listener goes down
andymck May 12, 2022
1c5c9fb
rebase fixes
jeffgrunewald May 16, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 8 additions & 4 deletions rebar.config
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
{erl_opts, [debug_info]}.

{deps, [{chatterbox, {pkg, ts_chatterbox}},
{deps, [
{chatterbox, ".*", {git, "https://github.com/novalabsxyz/chatterbox", {branch, "master"}}},
ctx,
acceptor_pool,
gproc]}.
{acceptor_pool, {git, "https://github.com/novalabsxyz/acceptor_pool", {branch, "master"}}},
gproc
]}.

{grpc, [{protos, ["proto"]},
{service_modules, [{'grpc.health.v1.Health', "grpcbox_health"},
Expand Down Expand Up @@ -48,7 +50,9 @@
deprecated_function_calls, deprecated_functions]}.

{project_plugins, [covertool,
{grpcbox_plugin, "~> 0.7.0"},
{grpcbox_plugin,
{git, "https://github.com/novalabsxyz/grpcbox_plugin.git",
{branch, "master"}}},
rebar3_lint]}.

{cover_enabled, true}.
Expand Down
14 changes: 8 additions & 6 deletions rebar.lock
Original file line number Diff line number Diff line change
@@ -1,19 +1,21 @@
{"1.2.0",
[{<<"acceptor_pool">>,{pkg,<<"acceptor_pool">>,<<"1.0.0">>},0},
{<<"chatterbox">>,{pkg,<<"ts_chatterbox">>,<<"0.11.0">>},0},
[{<<"acceptor_pool">>,
{git,"https://github.com/novalabsxyz/acceptor_pool",
{ref,"56d676e00c11fd071a6bcc4059e3454960900af7"}},
0},
{<<"chatterbox">>,
{git,"https://github.com/novalabsxyz/chatterbox",
{ref,"cbfe6e46b273f1552b57685c9f6daf710473c609"}},
0},
{<<"ctx">>,{pkg,<<"ctx">>,<<"0.6.0">>},0},
{<<"gproc">>,{pkg,<<"gproc">>,<<"0.8.0">>},0},
{<<"hpack">>,{pkg,<<"hpack_erl">>,<<"0.2.3">>},1}]}.
[
{pkg_hash,[
{<<"acceptor_pool">>, <<"43C20D2ACAE35F0C2BCD64F9D2BDE267E459F0F3FD23DAB26485BF518C281B21">>},
{<<"chatterbox">>, <<"B8F372C706023EB0DE5BF2976764EDB27C70FE67052C88C1F6A66B3A5626847F">>},
{<<"ctx">>, <<"8FF88B70E6400C4DF90142E7F130625B82086077A45364A78D208ED3ED53C7FE">>},
{<<"gproc">>, <<"CEA02C578589C61E5341FCE149EA36CCEF236CC2ECAC8691FBA408E7EA77EC2F">>},
{<<"hpack">>, <<"17670F83FF984AE6CD74B1C456EDDE906D27FF013740EE4D9EFAA4F1BF999633">>}]},
{pkg_hash_ext,[
{<<"acceptor_pool">>, <<"0CBCD83FDC8B9AD2EEE2067EF8B91A14858A5883CB7CD800E6FCD5803E158788">>},
{<<"chatterbox">>, <<"722FE2BAD52913AB7E87D849FC6370375F0C961FFB2F0B5E6D647C9170C382A6">>},
{<<"ctx">>, <<"A14ED2D1B67723DBEBBE423B28D7615EB0BDCBA6FF28F2D1F1B0A7E1D4AA5FC2">>},
{<<"gproc">>, <<"580ADAFA56463B75263EF5A5DF4C86AF321F68694E7786CB057FD805D1E2A7DE">>},
{<<"hpack">>, <<"06F580167C4B8B8A6429040DF36CC93BBA6D571FAEAEC1B28816523379CBB23A">>}]}
Expand Down
6 changes: 5 additions & 1 deletion src/grpcbox_pool.erl
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
-behaviour(acceptor_pool).

-export([start_link/4,
accept_socket/3]).
accept_socket/3,
pool_sockets/1]).

-export([init/1]).

Expand All @@ -13,6 +14,9 @@ start_link(Name, ServerOpts, ChatterboxOpts, TransportOpts) ->
accept_socket(Pool, Socket, Acceptors) ->
acceptor_pool:accept_socket(Pool, Socket, Acceptors).

pool_sockets(Pool) ->
acceptor_pool:which_sockets(Pool).

init([ServerOpts, ChatterboxOpts, TransportOpts]) ->
{Transport, SslOpts} = case TransportOpts of
#{ssl := true,
Expand Down
37 changes: 19 additions & 18 deletions src/grpcbox_reflection_service.erl
Original file line number Diff line number Diff line change
Expand Up @@ -9,38 +9,39 @@
#{error_code => 12,
error_message => "unimplemented method since extensions removed in proto3"}}).

server_reflection_info(Ref, Stream) ->
receive
{Ref, eos} ->
ok;
{Ref, Message} ->
handle_message(Message, Stream),
server_reflection_info(Ref, Stream)
end.
server_reflection_info(Message, Stream) ->
handle_message(Message, Stream).

handle_message(eos=_OriginalRequest, Stream) ->
{stop, Stream};
handle_message(#{message_request := {list_services, _}}=OriginalRequest, Stream) ->
Services = list_services(),
grpcbox_stream:send(#{original_request => OriginalRequest,
Stream0 = grpcbox_stream:send(false, #{original_request => OriginalRequest,
message_response => {list_services_response,
#{service => Services}}}, Stream);
#{service => Services}}}, Stream),
{ok, Stream0};
handle_message(#{message_request := {file_by_filename, Filename}}=OriginalRequest, Stream) ->
Response = file_by_filename(Filename),
grpcbox_stream:send(#{original_request => OriginalRequest,
message_response => Response}, Stream);
Stream0 = grpcbox_stream:send(false, #{original_request => OriginalRequest,
message_response => Response}, Stream),
{ok, Stream0};
handle_message(#{message_request := {file_containing_symbol, Symbol}}=OriginalRequest, Stream) ->
Response = file_containing_symbol(Symbol),
grpcbox_stream:send(#{original_request => OriginalRequest,
message_response => Response}, Stream);
Stream0 = grpcbox_stream:send(false, #{original_request => OriginalRequest,
message_response => Response}, Stream),
{ok, Stream0};

%% proto3 dropped extensions so we'll just return an empty result

handle_message(#{message_request := {all_extension_numbers_of_type, _}}=OriginalRequest, Stream) ->
grpcbox_stream:send(#{original_request => OriginalRequest,
Stream0 = grpcbox_stream:send(false, #{original_request => OriginalRequest,
message_response => ?UNIMPLEMENTED_RESPONSE},
Stream);
Stream),
{ok, Stream0};
handle_message(#{message_request := {file_containing_extension, _}}=OriginalRequest, Stream) ->
grpcbox_stream:send(#{original_request => OriginalRequest,
message_response => ?UNIMPLEMENTED_RESPONSE}, Stream).
Stream0 = grpcbox_stream:send(false, #{original_request => OriginalRequest,
message_response => ?UNIMPLEMENTED_RESPONSE}, Stream),
{ok, Stream0}.

%%

Expand Down
25 changes: 17 additions & 8 deletions src/grpcbox_services_sup.erl
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ init([ServerOpts, GrpcOpts, ListenOpts, PoolOpts, TransportOpts, ServiceSupName]
%% unique name for pool based on the ip and port it will listen on
Name = pool_name(ListenOpts),

RestartStrategy = #{strategy => rest_for_one},
RestartStrategy = #{strategy => rest_for_one, intensity => 5, period => 2},
Pool = #{id => grpcbox_pool,
start => {grpcbox_pool, start_link, [Name, chatterbox:settings(server, ServerOpts),
ChatterboxOpts, TransportOpts]}},
Expand Down Expand Up @@ -127,13 +127,22 @@ load_services([], _, _) ->
ok;
load_services([ServicePbModule | Rest], Services, ServicesTable) ->
ServiceNames = ServicePbModule:get_service_names(),
%% NOTE: Methods value may be a map or a prop depending on gpb options when generating the services
[begin
%% NOTE: Methods value may be a map or a prop depending on gpb options when generating the services
{{service, _}, Methods} = ServicePbModule:get_service_def(ServiceName),
%% throws exception if ServiceName isn't in the map or doesn't exist
try ServiceModule = maps:get(ServiceName, Services),
try
ServiceModule = maps:get(ServiceName, Services),
{ServiceModule, ServiceModule:module_info(exports)} of
{ServiceModule1, Exports} ->
[begin
#{name := Name,
input := Input,
output := Output,
input_stream := InputStream,
output_stream := OutputStream,
opts := Opts} = ensure_map(P),
SnakedMethodName = atom_snake_case(Name),
case lists:member({SnakedMethodName, 2}, Exports) of
true ->
Expand All @@ -149,12 +158,7 @@ load_services([ServicePbModule | Rest], Services, ServicesTable) ->
%% TODO: error? log? insert into ets as unimplemented?
unimplemented_method
end
end || #{name := Name,
input := Input,
output := Output,
input_stream := InputStream,
output_stream := OutputStream,
opts := Opts} <- Methods]
end || P <- Methods]
catch
_:_ ->
%% TODO: error? log? insert into ets as unimplemented?
Expand All @@ -179,3 +183,8 @@ atom_snake_case(Name) ->
Snaked1 = string:replace(Snaked, ".", "_", all),
Snaked2 = string:replace(Snaked1, "__", "_", all),
list_to_atom(string:to_lower(unicode:characters_to_list(Snaked2))).

ensure_map(S) when is_map(S)->
S;
ensure_map(S) when is_list(S)->
maps:from_list(S).
120 changes: 94 additions & 26 deletions src/grpcbox_socket.erl
Original file line number Diff line number Diff line change
Expand Up @@ -11,53 +11,121 @@
code_change/3,
terminate/2]).

%% public api
-record(state, {
pool,
listen_opts,
pool_opts,
socket,
mref
}).

%% public api
start_link(Pool, ListenOpts, AcceptorOpts) ->
gen_server:start_link(?MODULE, [Pool, ListenOpts, AcceptorOpts], []).

%% gen_server api

init([Pool, ListenOpts, PoolOpts]) ->
Port = maps:get(port, ListenOpts, 8080),
IPAddress = maps:get(ip, ListenOpts, {0, 0, 0, 0}),
AcceptorPoolSize = maps:get(size, PoolOpts, 10),
SocketOpts = maps:get(socket_options, ListenOpts, [{reuseaddr, true},
{nodelay, true},
{reuseaddr, true},
{backlog, 32768},
{keepalive, true}]),
%% Trapping exit so can close socket in terminate/2
_ = process_flag(trap_exit, true),
Opts = [{active, false}, {mode, binary}, {packet, raw}, {ip, IPAddress} | SocketOpts],
case gen_tcp:listen(Port, Opts) of
{ok, Socket} ->
%% acceptor could close the socket if there is a problem
MRef = monitor(port, Socket),
grpcbox_pool:accept_socket(Pool, Socket, AcceptorPoolSize),
{ok, {Socket, MRef}};
{error, Reason} ->
{stop, Reason}
end.
{ok, #state{pool = Pool, pool_opts = PoolOpts, listen_opts = ListenOpts}, 0}.

handle_call(Req, _, State) ->
{stop, {bad_call, Req}, State}.

handle_cast(Req, State) ->
{stop, {bad_cast, Req}, State}.

handle_info({'DOWN', MRef, port, Socket, Reason}, {Socket, MRef} = State) ->
{stop, Reason, State};
handle_info(_, State) ->
handle_info(timeout, State) ->
case start_listener(State) of
{ok, {Socket, MRef}} ->
{noreply, State#state{socket = Socket, mref = MRef}};
_ ->
erlang:send_after(5000, self(), timeout),
{noreply, State}
end;
handle_info({'DOWN', MRef, port, Socket, _Reason}, #state{mref = MRef, socket = Socket} = State) ->
catch gen_tcp:close(Socket),
erlang:send_after(5000, self(), timeout),
{noreply, State};
handle_info(_Msg, State) ->
{noreply, State}.

code_change(_, State, _) ->
{ok, State}.

terminate(_, {Socket, MRef}) ->
terminate(_Reason, {Socket, MRef}) ->
%% Socket may already be down but need to ensure it is closed to avoid
%% eaddrinuse error on restart
%% this takes care of that, unless of course this process is killed...
case demonitor(MRef, [flush, info]) of
true -> gen_tcp:close(Socket);
false -> ok
end.

%% ------------------------------------------------------------------
%% Internal functions
%% ------------------------------------------------------------------
start_listener(#state{
pool = Pool,
listen_opts = ListenOpts,
pool_opts = PoolOpts} = _State) ->
Port = maps:get(port, ListenOpts, 8080),
IPAddress = maps:get(ip, ListenOpts, {0, 0, 0, 0}),
AcceptorPoolSize = maps:get(size, PoolOpts, 10),
SocketOpts = maps:get(socket_options, ListenOpts, [{reuseaddr, true},
{nodelay, true},
{reuseaddr, true},
{backlog, 32768},
{keepalive, true}]),

Opts = [{active, false}, {mode, binary}, {packet, raw}, {ip, IPAddress} | SocketOpts],
case gen_tcp:listen(Port, Opts) of
{ok, Socket} ->
%% acceptor could close the socket if there is a problem
MRef = monitor(port, Socket),
{ok, _} = grpcbox_pool:accept_socket(Pool, Socket, AcceptorPoolSize),
{ok, {Socket, MRef}};
{error, eaddrinuse} ->
%% our desired port is already in use
%% its likely this grpcbox_socket server has been killed ( for reason unknown ) and is restarting
%% previously it would have bound to the port before passing control to our acceptor pool
%% the socket remains open
%% in the restart scenario, the socket process would attempt to bind again
%% to the port and then stop, the sup would keep restarting it
%% and we would end up breaching the restart strategy of the parent sup
%% eventually taking down the entire tree
%% result of which is we have no active listener and grpcbox is effectively down
%% so now if we hit eaddrinuse, we check if our acceptor pool using it
%% if so we close the port here and stop this process
%% NOTE: issuing stop in init wont trigger terminate and so cant rely on
%% the socket being closed there
%% This allows the sup to restart things cleanly
%% We could try to reuse the exising port rather than closing it
%% but side effects were encountered there, so deliberately avoiding

%% NOTE: acceptor_pool has a grace period for connections before it terminates
%% grpcbox_pool sets this to a default of 5 secs
%% this needs considered when deciding on related supervisor restart strategies
%% AND keep in mind the acceptor pool will continue accepting new connections
%% during this grace period

%% get the current sockets in use by the acceptor pool
%% if one is bound to our target port then close it
%% need to allow for possibility of multiple services, each with its own socket
%% so we need to identify our interested socket via port number
PoolSockets = grpcbox_pool:pool_sockets(Pool),
MaybeHaveExistingSocket =
lists:foldl(
fun({inet_tcp, {_IP, BoundPortNumber}, Socket, _SockRef}, _Acc) when BoundPortNumber =:= Port ->
{ok, Socket};
(_, Acc) ->
Acc
end, socket_not_found, PoolSockets),
case MaybeHaveExistingSocket of
{ok, Socket} ->
gen_tcp:close(Socket);
socket_not_found ->
noop
end,
{error, eaddrinuse};
{error, Reason} ->
{error, Reason}
end.
Loading