Skip to content

Commit

Permalink
Improve http.Client connection handling
Browse files Browse the repository at this point in the history
HTTP 1.0 servers close the connection automatically after sending the
response. We updated the http.Client actor such that the users can use
the high-level API like making several requests in a row without having
to worry about reconnects. http.Client will buffer the requests and
flush the buffer once it (re)connects the socket.

Writing to a closed socket is bad and guaranteed to fail, so the user
(in this case the http.Client actor) should be notified ASAP before
making further writes. Rather than using the on_error callback from the
underlying TCPConnection and TLSConnection actors we raise an exception
from their write() methods. This implies the calls to write() are
synchronous, but since we're not waiting on network I/O it should be
fine?!
  • Loading branch information
mzagozen committed Nov 17, 2023
1 parent 08bd695 commit 1178aa0
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 21 deletions.
54 changes: 37 additions & 17 deletions base/src/http.act
Original file line number Diff line number Diff line change
Expand Up @@ -447,6 +447,7 @@ actor Listener(cap: net.TCPListenCap, address: str, port: int, on_listen_error:
# TODO: default schema="https"
# TODO: default port=None
# TODO: default tls_verify=True
# TODO: add arguments for configuring request buffering (size) and reconnection attempts and timeouts
actor Client(cap: net.TCPConnectCap, scheme: str, address: str, port: ?int, tls_verify: bool, on_connect: action(Client) -> None, on_error: action(Client, str) -> None, log_handler: ?logging.Handler):
"""HTTP(S) Client

Expand All @@ -457,10 +458,11 @@ actor Client(cap: net.TCPConnectCap, scheme: str, address: str, port: ?int, tls_
var _on_response: list[(bytes, action(Client, Response) -> None)] = []
var version: ?bytes = None
var buf = b""
var close_connection: bool = True
var tcp_conn: ?net.TCPConnection = None
var tls_conn: ?net.TLSConnection = None

var connecting: bool = True

def _connect():
if scheme == "http":
_log.verbose("Using http scheme and port 80", None)
Expand All @@ -475,9 +477,18 @@ actor Client(cap: net.TCPConnectCap, scheme: str, address: str, port: ?int, tls_

def _on_conn_connect():
# If there are outstanding requests, it probably means we were
# disconnected or have not connected yet
# TODO: do not flush entire buffer if we know the server will close the
# connection. If the latency is so big that we're able to send the
# entire buffer before the server closes the connection we're just
# wasting resources.
for r in _on_response:
_log.trace("Sending outstanding request", {"request": r.0})
_conn_write(r.0)
await async on_connect(self)
if connecting:
# Dispatch the on_connect callback on first connect but not for reconnects
await async on_connect(self)
connecting = False

def _on_tcp_connect(conn: net.TCPConnection) -> None:
_on_conn_connect()
Expand All @@ -500,10 +511,10 @@ actor Client(cap: net.TCPConnectCap, scheme: str, address: str, port: ?int, tls_
r, buf = parse_response(buf, _log)
if r is not None:
if "connection" in r.headers and r.headers["connection"] == "close":
close_connection = True
_conn_close()
# Is this really the right thing to do here? If the client
# does not make a new request we just reconnected for nothing!
_log.debug("Closing TCP connection due to header: Connection: close", None)
_connect()
_conn_reconnect()
if len(_on_response) == 0:
_log.notice("Data received with no on_response callback set", None)
break
Expand All @@ -524,22 +535,31 @@ actor Client(cap: net.TCPConnectCap, scheme: str, address: str, port: ?int, tls_
def _on_con_error(error: str) -> None:
on_error(self, error)

def _conn_close() -> None:
def _conn_reconnect() -> None:
if tcp_conn is not None:
def _noop(c):
pass
tcp_conn.close(_noop)
tcp_conn.reconnect()
elif tls_conn is not None:
def _noop(c):
pass
tls_conn.close(_noop)
tls_conn.reconnect()

def _conn_write(data: bytes) -> None:
_log.trace("Sending data", {"data": data})
if tcp_conn is not None:
tcp_conn.write(data)
elif tls_conn is not None:
tls_conn.write(data)
try:
_log.trace("Sending data", {"data": data})
# We call the write method on the TCP or TLS connection actor
# synchronously because we want to be able to catch exceptions
# signaling the socket was closed. Note that this is *not* waiting
# for network I/O, just waiting on system I/O for writing to the
# local socket buffer.
if tcp_conn is not None:
await async tcp_conn.write(data)
elif tls_conn is not None:
await async tls_conn.write(data)
except RuntimeError as exc:
# HTTP/1.0 servers close the connection after each request by default
if "bad file descriptor" in str(exc) or "bad stream" in str(exc):
_log.debug("TCP connection closed, reconnecting", {"error": str(exc)})
_conn_reconnect()
else:
_on_con_error(str(exc))

# HTTP methods
def get(path: str, headers: dict[str, str], on_response: action(Client, Response) -> None):
Expand Down
24 changes: 20 additions & 4 deletions base/src/net.ext.c
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,12 @@ void on_connect6(uv_connect_t *connect_req, int status) {
char errmsg[1024] = "Failed to write to TCP socket: ";
uv_strerror_r(r, errmsg + strlen(errmsg), sizeof(errmsg)-strlen(errmsg));
log_warn(errmsg);
if (strstr(errmsg, "bad file descriptor")) {
// This can happen if the socket is closed, se we raise an exception
// and let the caller retry
$RAISE(((B_BaseException)B_RuntimeErrorG_new(to$str(errmsg))));
return $R_CONT(c$cont, B_None);
}
$action2 f = ($action2)self->on_error;
f->$class->__asyn__(f, self, to$str(errmsg));
}
Expand Down Expand Up @@ -311,6 +317,9 @@ static void after_shutdown(uv_shutdown_t* req, int status) {
uv_stream_t *stream = (uv_stream_t *)from$int(self->_sock);
// fd == -1 means invalid FD and can happen after __resume__
if (stream == -1)
// TODO: should we dispatch the on_close callback here too even though
// we did not close anything? This is what we do for TLSConnection too
// and it allows for chaining the callbacks
return $R_CONT(c$cont, B_None);

log_debug("Closing TCP connection");
Expand Down Expand Up @@ -629,9 +638,11 @@ void tls_write_cb(uv_write_t *wreq, int status) {

$R netQ_TLSConnectionD_closeG_local (netQ_TLSConnection self, $Cont c$cont, $action on_close) {
uv_stream_t *stream = (uv_stream_t *)from$int(self->_stream);
// fd == -1 means invalid FD and can happen after __resume__
if (stream == -1)
// fd == -1 means invalid FD and can happen after __resume__ or if the socket is closed
if (stream == -1) {
on_close->$class->__asyn__(on_close, self);
return $R_CONT(c$cont, B_None);
}

self->_on_close = on_close;

Expand All @@ -645,9 +656,14 @@ void tls_write_cb(uv_write_t *wreq, int status) {

$R netQ_TLSConnectionD_writeG_local (netQ_TLSConnection self, $Cont c$cont, B_bytes data) {
uv_stream_t *stream = (uv_stream_t *)from$int(self->_stream);
// fd == -1 means invalid FD and can happen after __resume__
if (stream == -1)
// fd == -1 means invalid FD and can happen after __resume__ or if the socket is closed
if (stream == -1) {
// Raise an exception and let the caller retry
char errmsg[] = "Failed to write to TLS TCP socket: bad stream";
log_debug(errmsg);
$RAISE(((B_BaseException)B_RuntimeErrorG_new(to$str(errmsg))));
return $R_CONT(c$cont, B_None);
}

uv_write_t *wreq = (uv_write_t *)malloc(sizeof(uv_write_t));
wreq->data = self;
Expand Down

0 comments on commit 1178aa0

Please sign in to comment.