diff --git a/src/libp2p_connection.erl b/src/libp2p_connection.erl index 91a2f6fb..3a61dcb3 100644 --- a/src/libp2p_connection.erl +++ b/src/libp2p_connection.erl @@ -11,7 +11,7 @@ -export_type([connection/0, close_state/0]). -export([new/2, send/2, send/3, - recv/1, recv/2, recv/3, + recv/1, recv/2, recv/3, unrecv/2, acknowledge/2, fdset/1, socket/1, fdclr/1, addr_info/1, close/1, close_state/1, controlling_process/2, session/1, monitor/1, @@ -21,6 +21,7 @@ -callback acknowledge(any(), any()) -> ok. -callback send(any(), iodata(), non_neg_integer() | infinity) -> ok | {error, term()}. -callback recv(any(), non_neg_integer(), non_neg_integer()) -> {ok, binary()} | {error, term()}. +-callback unrecv(any(), iodata()) -> ok. -callback close(any()) -> ok. -callback close_state(any()) -> close_state(). -callback fdset(any()) -> ok | {error, term()}. @@ -58,6 +59,10 @@ recv(Conn=#connection{}, Length) -> recv(#connection{module=Module, state=State}, Length, Timeout) -> Module:recv(State, Length, Timeout). +-spec unrecv(connection(), iodata()) -> ok. +unrecv(#connection{module=Module, state=State}, Data) -> + Module:unrecv(State, Data). + -spec acknowledge(connection(), any()) -> ok. acknowledge(#connection{module=Module, state=State}, Ref) -> Module:acknowledge(State, Ref). diff --git a/src/libp2p_framed_stream.erl b/src/libp2p_framed_stream.erl index 4f25a113..bf8cc690 100644 --- a/src/libp2p_framed_stream.erl +++ b/src/libp2p_framed_stream.erl @@ -379,11 +379,15 @@ recv(Connection, Timeout) -> {error, Error} -> {error, Error}; {ok, <>} -> %% TODO: Limit max message size we're willing to - %% TODO if we read the prefix length, but time out on the payload, we should handle this? case libp2p_connection:recv(Connection, Size, Timeout) of {ok, Data} when byte_size(Data) == Size -> {ok, Data}; {ok, _Data} -> {error, frame_size_mismatch}; - {error, Error} -> {error, Error} + {error, timeout} -> + %% the underlying buffer didn't have everything in time, so put the length prefix back in + %% the buffer + libp2p_connection:unrecv(Connection, <>), + {error, payload_timeout}; + {error, Error} ->{error, Error} end end. @@ -402,7 +406,11 @@ dispatch_handle_data(Kind, Bin, State=#state{module=Module}) -> {noreply, handle_fdset(handle_resp_send({stop, Reason}, Response, State#state{state=ModuleState}))} end. - +handle_recv_result({error, payload_timeout}, State) -> + %% we have pending data, but it's not complete + %% schedule another fdset so we can check again later + libp2p_connection:fdset(State#state.connection), + {noreply, State}; handle_recv_result({error, timeout}, State) -> {noreply, State}; handle_recv_result({error, closed}, State) -> diff --git a/src/libp2p_transport_tcp.erl b/src/libp2p_transport_tcp.erl index c3d245dd..5417ddb9 100644 --- a/src/libp2p_transport_tcp.erl +++ b/src/libp2p_transport_tcp.erl @@ -41,7 +41,7 @@ -export([start_link/1, init/1, handle_call/3, handle_info/2, handle_cast/2, terminate/2]). %% libp2p_connection --export([send/3, recv/3, acknowledge/2, addr_info/1, +-export([send/3, recv/3, unrecv/2, acknowledge/2, addr_info/1, close/1, close_state/1, controlling_process/2, session/1, fdset/1, socket/1, fdclr/1, monitor/1, set_idle_timeout/2 @@ -161,6 +161,10 @@ send(#tcp_state{socket=Socket, transport=Transport}, Data, _Timeout) -> recv(#tcp_state{socket=Socket, transport=Transport}, Length, Timeout) -> Transport:recv(Socket, Length, Timeout). +-spec unrecv(tcp_state(), iodata()) -> ok. +unrecv(#tcp_state{socket=Socket}, Data) -> + gen_tcp:unrecv(Socket, Data). + -spec close(tcp_state()) -> ok. close(#tcp_state{socket=Socket, transport=Transport}) -> Transport:close(Socket). diff --git a/src/libp2p_yamux_stream.erl b/src/libp2p_yamux_stream.erl index f9c3cbdf..fd264dc0 100644 --- a/src/libp2p_yamux_stream.erl +++ b/src/libp2p_yamux_stream.erl @@ -58,7 +58,7 @@ % API -export([new_connection/1, open_stream/3, receive_stream/3, update_window/3, receive_data/2]). % libp2p_connection --export([close/1, close_state/1, send/3, recv/3, acknowledge/2, +-export([close/1, close_state/1, send/3, recv/3, unrecv/2, acknowledge/2, fdset/1, fdclr/1, addr_info/1, controlling_process/2, session/1, monitor/1, set_idle_timeout/2]). %% libp2p_info @@ -132,6 +132,9 @@ send(Pid, Data, Timeout) -> recv(Pid, Size, Timeout) -> statem(Pid, {recv, Size, Timeout}). +unrecv(Pid, Data) -> + gen_statem:cast(Pid, {unrecv, Data}). + acknowledge(_, _) -> ok. @@ -284,6 +287,8 @@ handle_event({call, From}, {recv, Size, Timeout}, _State, Data0=#state{}) when ? lager:debug("Remote closed and no receivable data; closing"), {stop, normal, notify_inert(Data)} end; +handle_event(cast, {unrecv, Bin}, _State, Data=#state{recv_state=#recv_state{waiter_data=WaiterData}=RecvState}) -> + {keep_state, Data#state{recv_state=RecvState#recv_state{waiter_data = <>}}}; handle_event({call, From}, {recv, Size, Timeout}, _State, Data=#state{}) -> % Normal open state {keep_state, data_recv(From, Size, Timeout, Data)};