Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

hibernate sockjs_session process #25

Open
wants to merge 8 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -111,10 +111,10 @@ simple. It has just a couple of methods:
after the client was last connected (in ms).
* `{response_limit, integer()}` - the maximum size of a single
http streaming response (in bytes).
* `{hib_timeout, integer() | hibernate}` - hibernate websocket
process after hib_timeout milliseconds of inactivity (5000 by
default) to reduce memory footprint. Set to 'hibernate' atom to
hibernate always (may be inefficient). (implementation is
* `{hib_timeout, integer() | hibernate | infinity}` - hibernate
websocket process after hib_timeout milliseconds of inactivity
(5000 by default) to reduce memory footprint. Set to 'hibernate'
atom to hibernate always (may be inefficient). (implementation is
incomplete, see #15)
* `{logger, fun/3}` - a function called on every request, used
to print request to the logs (or on the screen by default).
Expand Down
45 changes: 24 additions & 21 deletions src/sockjs_action.erl
Original file line number Diff line number Diff line change
Expand Up @@ -189,27 +189,8 @@ chunk_start(Req, Headers, ContentType) ->
reply_loop(Req, SessionId, ResponseLimit, Fmt, Service) ->
Req0 = sockjs_http:hook_tcp_close(Req),
case sockjs_session:reply(SessionId) of
wait -> receive
%% In Cowboy we need to capture async
%% messages from the tcp connection -
%% ie: {active, once}.
{tcp_closed, _} ->
Req0;
%% In Cowboy we may in theory get real
%% http requests, this is bad.
{tcp, _S, Data} ->
error_logger:error_msg(
"Received unexpected data on a "
"long-polling http connection: ~p. "
"Connection aborted.~n",
[Data]),
Req1 = sockjs_http:abruptly_kill(Req),
Req1;
go ->
Req1 = sockjs_http:unhook_tcp_close(Req0),
reply_loop(Req1, SessionId, ResponseLimit,
Fmt, Service)
end;
{wait, hibernate} -> catch erlang:hibernate(?MODULE, reply_loop_wait, [Req0, SessionId, ResponseLimit, Fmt, Service, infinity]);
{wait, Timeout} -> reply_loop_wait(Req0, SessionId, ResponseLimit, Fmt, Service, Timeout);
session_in_use -> Frame = sockjs_util:encode_frame({close, ?STILL_OPEN}),
chunk_end(Req0, Frame, Fmt);
{close, Frame} -> Frame1 = sockjs_util:encode_frame(Frame),
Expand All @@ -222,6 +203,28 @@ reply_loop(Req, SessionId, ResponseLimit, Fmt, Service) ->
Fmt, Service)
end.

reply_loop_wait(Req0, SessionId, ResponseLimit, Fmt, Service, Timeout) ->
receive
%% In Cowboy we need to capture async
%% messages from the tcp connection -
%% ie: {active, once}.
{tcp_closed, _} -> Req0;
%% In Cowboy we may in theory get real
%% http requests, this is bad.
{tcp, _S, Data} ->
error_logger:error_msg(
"Received unexpected data on a "
"long-polling http connection: ~p. "
"Connection aborted.~n",
[Data]),
sockjs_http:abruptly_kill(Req0);
go ->
Req1 = sockjs_http:unhook_tcp_close(Req0),
reply_loop(Req1, SessionId, ResponseLimit,
Fmt, Service)
after
Timeout -> catch erlang:hibernate(?MODULE, reply_loop_wait, [Req0, SessionId, ResponseLimit, Fmt, Service, infinity])
end.
reply_loop0(Req, _SessionId, ResponseLimit, _Fmt, _Service) when ResponseLimit =< 0 ->
chunk_end(Req);
reply_loop0(Req, SessionId, ResponseLimit, Fmt, Service) ->
Expand Down
41 changes: 15 additions & 26 deletions src/sockjs_cowboy_handler.erl
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ terminate(_Req, _Service) ->
%% --------------------------------------------------------------------------

websocket_init(_TransportName, Req,
Service = #service{logger = Logger, hib_timeout = HibTimeout}) ->
Service = #service{logger = Logger}) ->
Req0 = Logger(Service, {cowboy, Req}, websocket),

{Info, Req1} = sockjs_handler:extract_info(Req0),
Expand All @@ -43,43 +43,32 @@ websocket_init(_TransportName, Req,
{WS, Req2}
end,
self() ! go,
mh({ok, Req3, {RawWebsocket, SessionPid, {undefined, HibTimeout}}}).
{ok, Req3, {RawWebsocket, SessionPid}}.

websocket_handle({text, Data}, Req, {RawWebsocket, SessionPid, _HT} = S) ->
websocket_handle({text, Data}, Req, {RawWebsocket, SessionPid} = S) ->
case sockjs_ws_handler:received(RawWebsocket, SessionPid, Data) of
ok -> mh({ok, Req, S});
shutdown -> {shutdown, Req, S}
{ok, hibernate} -> {ok, Req, S, hibernate};
{ok, _Timeout} -> {ok, Req, S};
shutdown -> {shutdown, Req, S}
end;
websocket_handle(_Unknown, Req, S) ->
{shutdown, Req, S}.

websocket_info(go, Req, {RawWebsocket, SessionPid, _HT} = S) ->
websocket_info(go, Req, {RawWebsocket, SessionPid} = S) ->
case sockjs_ws_handler:reply(RawWebsocket, SessionPid) of
wait -> mh({ok, Req, S});
{ok, Data} -> self() ! go,
{reply, {text, Data}, Req, S};
{close, <<>>} -> {shutdown, Req, S};
{close, Data} -> self() ! shutdown,
{reply, {text, Data}, Req, S}
{wait, hibernate} -> {ok, Req, S, hibernate};
{wait, _Timeout} -> {ok, Req, S};
{ok, Data} -> self() ! go,
{reply, {text, Data}, Req, S};
{close, <<>>} -> {shutdown, Req, S};
{close, Data} -> self() ! shutdown,
{reply, {text, Data}, Req, S}
end;
websocket_info(shutdown, Req, S) ->
{shutdown, Req, S};
websocket_info(hibernate_triggered, Req, S) ->
{ok, Req, S, hibernate}.

websocket_terminate(_Reason, _Req, {RawWebsocket, SessionPid, _HT}) ->
websocket_terminate(_Reason, _Req, {RawWebsocket, SessionPid}) ->
sockjs_ws_handler:close(RawWebsocket, SessionPid),
ok.

%% --------------------------------------------------------------------------

mh({ok, Req, {RawWebsocket, SessionPid, {TRef, hibernate}}}) ->
{ok, Req, {RawWebsocket, SessionPid, {TRef, hibernate}}, hibernate};

mh({ok, Req, {RawWebsocket, SessionPid, {TRef, HibTimeout}}}) ->
case TRef of
undefined -> ok;
_ -> sockjs_util:cancel_send_after(TRef, hibernate_triggered)
end,
TRef2 = erlang:send_after(HibTimeout, self(), hibernate_triggered),
{ok, Req, {RawWebsocket, SessionPid, {TRef2, HibTimeout}}}.
2 changes: 1 addition & 1 deletion src/sockjs_internal.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
disconnect_delay :: non_neg_integer(),
heartbeat_delay :: non_neg_integer(),
response_limit :: non_neg_integer(),
hib_timeout :: non_neg_integer() | hibernate,
hib_timeout :: non_neg_integer() | hibernate | infinity,
logger :: logger()
}).

Expand Down
56 changes: 44 additions & 12 deletions src/sockjs_session.erl
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
disconnect_delay = 5000 :: non_neg_integer(),
heartbeat_tref :: reference() | triggered,
heartbeat_delay = 25000 :: non_neg_integer(),
hibernate_tref :: reference(),
hibernate_delay :: non_neg_integer() | hibernate | infinity,
ready_state = connecting :: connecting | open | closed,
close_msg :: {non_neg_integer(), string()},
callback,
Expand Down Expand Up @@ -56,8 +58,8 @@ maybe_create(SessionId, Service, Info) ->
-spec received(list(iodata()), session_or_pid()) -> ok.
received(Messages, SessionPid) when is_pid(SessionPid) ->
case gen_server:call(SessionPid, {received, Messages}, infinity) of
ok -> ok;
error -> throw(no_session)
{ok, Timeout} -> {ok, Timeout};
error -> throw(no_session)
%% TODO: should we respond 404 when session is closed?
end;
received(Messages, SessionId) ->
Expand Down Expand Up @@ -166,28 +168,48 @@ emit(What, State = #session{callback = Callback,
ok -> State
end.

mh(#session{hibernate_delay = hibernate} = State) -> {State, hibernate};

mh(#session{hibernate_delay = infinity} = State) -> {State, infinity};

mh(#session{hibernate_delay = HibTimeout, heartbeat_delay = HeartbeatDelay} = State) when HibTimeout >= HeartbeatDelay -> {State, infinity};

mh(#session{hibernate_delay = HibTimeout, hibernate_tref = TRef} = State) ->
case TRef of
undefined -> ok;
_ -> sockjs_util:cancel_send_after(TRef, hibernate_triggered)
end,
TRef2 = erlang:send_after(HibTimeout, self(), hibernate_triggered),
{State#session{hibernate_tref = TRef2}, infinity}.

%% --------------------------------------------------------------------------

-spec init({session_or_undefined(), service(), info()}) -> {ok, #session{}}.
-spec init({session_or_undefined(), service(), info()}) -> {ok, #session{}, infinity | hibernate}.
init({SessionId, #service{callback = Callback,
state = UserState,
disconnect_delay = DisconnectDelay,
hib_timeout = HibTimeout,
heartbeat_delay = HeartbeatDelay}, Info}) ->
case SessionId of
undefined -> ok;
_Else -> ets:insert(?ETS, {SessionId, self()})
end,
process_flag(trap_exit, true),
TRef = erlang:send_after(DisconnectDelay, self(), session_timeout),
{ok, #session{id = SessionId,
State = #session{id = SessionId,
callback = Callback,
state = UserState,
response_pid = undefined,
disconnect_tref = TRef,
disconnect_delay = DisconnectDelay,
heartbeat_tref = undefined,
heartbeat_delay = HeartbeatDelay,
handle = {?MODULE, {self(), Info}}}}.
hibernate_tref = undefined,
hibernate_delay = HibTimeout,
handle = {?MODULE, {self(), Info}}},
{State2, Timeout} = mh(State),
{ok, State2, Timeout}.



handle_call({reply, Pid, _Multiple}, _From, State = #session{
Expand All @@ -212,10 +234,10 @@ handle_call({reply, Pid, _Multiple}, _From, State = #session{
{reply, session_in_use, State};

handle_call({reply, Pid, Multiple}, _From, State = #session{
ready_state = open,
response_pid = RPid,
heartbeat_tref = HeartbeatTRef,
outbound_queue = Q})
ready_state = open,
response_pid = RPid,
heartbeat_tref = HeartbeatTRef,
outbound_queue = Q})
when RPid == undefined orelse RPid == Pid ->
{Messages, Q1} = case Multiple of
true -> {queue:to_list(Q), queue:new()};
Expand All @@ -228,7 +250,8 @@ handle_call({reply, Pid, Multiple}, _From, State = #session{
{[], triggered} -> State1 = unmark_waiting(Pid, State),
{reply, {ok, {heartbeat, nil}}, State1};
{[], _TRef} -> State1 = mark_waiting(Pid, State),
{reply, wait, State1};
{State2, Timeout} = mh(State1),
{reply, {wait, Timeout}, State2, Timeout};
_More -> State1 = unmark_waiting(Pid, State),
{reply, {ok, {data, Messages}},
State1#session{outbound_queue = Q1}}
Expand All @@ -238,7 +261,8 @@ handle_call({received, Messages}, _From, State = #session{ready_state = open}) -
State2 = lists:foldl(fun(Msg, State1) ->
emit({recv, iolist_to_binary(Msg)}, State1)
end, State, Messages),
{reply, ok, State2};
{State3, Timeout} = mh(State2),
{reply, {ok, Timeout}, State3, Timeout};

handle_call({received, _Data}, _From, State = #session{ready_state = _Any}) ->
{reply, error, State};
Expand Down Expand Up @@ -281,9 +305,17 @@ handle_info(force_shutdown, State) ->
handle_info(session_timeout, State = #session{response_pid = undefined}) ->
{stop, normal, State};

handle_info(hibernate_triggered, #session{response_pid = RPid} = State) ->
case RPid of
undefined -> ok;
_ -> RPid ! hibernate_triggered
end,
{noreply, State#session{hibernate_tref = undefined}, hibernate};

handle_info(heartbeat_triggered, State = #session{response_pid = RPid}) when RPid =/= undefined ->
RPid ! go,
{noreply, State#session{heartbeat_tref = triggered}};
{State2, Timeout} = mh(State),
{noreply, State2#session{heartbeat_tref = triggered}, Timeout};

handle_info(Info, State) ->
{stop, {odd_info, Info}, State}.
Expand Down
12 changes: 6 additions & 6 deletions src/sockjs_ws_handler.erl
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@ received(rawwebsocket, SessionPid, Data) ->

session_received(Messages, SessionPid) ->
try sockjs_session:received(Messages, SessionPid) of
ok -> ok
{ok, Timeout} -> {ok, Timeout}
catch
no_session -> shutdown
no_session -> shutdown
end.

-spec reply(websocket|rawwebsocket, pid()) -> {close|open, binary()} | wait.
Expand All @@ -36,8 +36,8 @@ reply(websocket, SessionPid) ->
{W, Frame} when W =:= ok orelse W =:= close->
Frame1 = sockjs_util:encode_frame(Frame),
{W, iolist_to_binary(Frame1)};
wait ->
wait
{wait, Timeout} ->
{wait, Timeout}
end;
reply(rawwebsocket, SessionPid) ->
case sockjs_session:reply(SessionPid, false) of
Expand All @@ -48,8 +48,8 @@ reply(rawwebsocket, SessionPid) ->
{data, [Msg]} -> {ok, iolist_to_binary(Msg)};
{heartbeat, nil} -> reply(rawwebsocket, SessionPid)
end;
wait ->
wait
{wait, Timeout} ->
{wait, Timeout}
end.

-spec close(websocket|rawwebsocket, pid()) -> ok.
Expand Down