Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve http.Client connection handling #1581

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 37 additions & 17 deletions base/src/http.act
Original file line number Diff line number Diff line change
Expand Up @@ -447,6 +447,7 @@ actor Listener(cap: net.TCPListenCap, address: str, port: int, on_listen_error:
# TODO: default schema="https"
# TODO: default port=None
# TODO: default tls_verify=True
# TODO: add arguments for configuring request buffering (size) and reconnection attempts and timeouts
actor Client(cap: net.TCPConnectCap, scheme: str, address: str, port: ?int, tls_verify: bool, on_connect: action(Client) -> None, on_error: action(Client, str) -> None, log_handler: ?logging.Handler):
"""HTTP(S) Client

Expand All @@ -457,10 +458,11 @@ actor Client(cap: net.TCPConnectCap, scheme: str, address: str, port: ?int, tls_
var _on_response: list[(bytes, action(Client, Response) -> None)] = []
var version: ?bytes = None
var buf = b""
var close_connection: bool = True
var tcp_conn: ?net.TCPConnection = None
var tls_conn: ?net.TLSConnection = None

var connecting: bool = True

def _connect():
if scheme == "http":
_log.verbose("Using http scheme and port 80", None)
Expand All @@ -475,9 +477,18 @@ actor Client(cap: net.TCPConnectCap, scheme: str, address: str, port: ?int, tls_

def _on_conn_connect():
# If there are outstanding requests, it probably means we were
# disconnected or have not connected yet
# TODO: do not flush entire buffer if we know the server will close the
# connection. If the latency is so big that we're able to send the
# entire buffer before the server closes the connection we're just
# wasting resources.
for r in _on_response:
_log.trace("Sending outstanding request", {"request": r.0})
_conn_write(r.0)
await async on_connect(self)
if connecting:
# Dispatch the on_connect callback on first connect but not for reconnects
await async on_connect(self)
connecting = False

def _on_tcp_connect(conn: net.TCPConnection) -> None:
_on_conn_connect()
Expand All @@ -500,10 +511,10 @@ actor Client(cap: net.TCPConnectCap, scheme: str, address: str, port: ?int, tls_
r, buf = parse_response(buf, _log)
if r is not None:
if "connection" in r.headers and r.headers["connection"] == "close":
close_connection = True
_conn_close()
# Is this really the right thing to do here? If the client
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a TODO entry here. We should inspect our queue, i.e. _on_response list to see if there is more stuff.

I'm also thinking like we can maybe have some basic heuristics, like when we create a http.Client the first time we obviously connect directly then we expect to run at least one query. Maybe we should not reconnect after that (in case of HTTP / 1.0 or not having persistent in later HTTP versions) until there is a second query. Now if we see a second query we can assume "there are multiple requests" and thus reconnect directly after the 2nd and subsequent requests. WDYT?

# does not make a new request we just reconnected for nothing!
_log.debug("Closing TCP connection due to header: Connection: close", None)
_connect()
_conn_reconnect()
if len(_on_response) == 0:
_log.notice("Data received with no on_response callback set", None)
break
Expand All @@ -524,22 +535,31 @@ actor Client(cap: net.TCPConnectCap, scheme: str, address: str, port: ?int, tls_
def _on_con_error(error: str) -> None:
on_error(self, error)

def _conn_close() -> None:
def _conn_reconnect() -> None:
if tcp_conn is not None:
def _noop(c):
pass
tcp_conn.close(_noop)
tcp_conn.reconnect()
elif tls_conn is not None:
def _noop(c):
pass
tls_conn.close(_noop)
tls_conn.reconnect()

def _conn_write(data: bytes) -> None:
_log.trace("Sending data", {"data": data})
if tcp_conn is not None:
tcp_conn.write(data)
elif tls_conn is not None:
tls_conn.write(data)
try:
_log.trace("Sending data", {"data": data})
# We call the write method on the TCP or TLS connection actor
# synchronously because we want to be able to catch exceptions
# signaling the socket was closed. Note that this is *not* waiting
# for network I/O, just waiting on system I/O for writing to the
# local socket buffer.
if tcp_conn is not None:
await async tcp_conn.write(data)
elif tls_conn is not None:
await async tls_conn.write(data)
except RuntimeError as exc:
# HTTP/1.0 servers close the connection after each request by default
if "bad file descriptor" in str(exc) or "bad stream" in str(exc):
mzagozen marked this conversation as resolved.
Show resolved Hide resolved
_log.debug("TCP connection closed, reconnecting", {"error": str(exc)})
_conn_reconnect()
else:
_on_con_error(str(exc))

# HTTP methods
def get(path: str, headers: dict[str, str], on_response: action(Client, Response) -> None):
Expand Down
24 changes: 20 additions & 4 deletions base/src/net.ext.c
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,12 @@ void on_connect6(uv_connect_t *connect_req, int status) {
char errmsg[1024] = "Failed to write to TCP socket: ";
uv_strerror_r(r, errmsg + strlen(errmsg), sizeof(errmsg)-strlen(errmsg));
log_warn(errmsg);
mzagozen marked this conversation as resolved.
Show resolved Hide resolved
if (strstr(errmsg, "bad file descriptor")) {
mzagozen marked this conversation as resolved.
Show resolved Hide resolved
// This can happen if the socket is closed, se we raise an exception
// and let the caller retry
$RAISE(((B_BaseException)B_RuntimeErrorG_new(to$str(errmsg))));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are generally doing Python-like-exception stuff. It's not a specific design, i.e. we can deviate where we think there is good reason but if we have no idea we might as well follow Python. I think Python uses OSError for most of these, not RunTimeError.

return $R_CONT(c$cont, B_None);
}
$action2 f = ($action2)self->on_error;
f->$class->__asyn__(f, self, to$str(errmsg));
}
Expand Down Expand Up @@ -311,6 +317,9 @@ static void after_shutdown(uv_shutdown_t* req, int status) {
uv_stream_t *stream = (uv_stream_t *)from$int(self->_sock);
// fd == -1 means invalid FD and can happen after __resume__
if (stream == -1)
// TODO: should we dispatch the on_close callback here too even though
// we did not close anything? This is what we do for TLSConnection too
// and it allows for chaining the callbacks
return $R_CONT(c$cont, B_None);

log_debug("Closing TCP connection");
Expand Down Expand Up @@ -629,9 +638,11 @@ void tls_write_cb(uv_write_t *wreq, int status) {

$R netQ_TLSConnectionD_closeG_local (netQ_TLSConnection self, $Cont c$cont, $action on_close) {
uv_stream_t *stream = (uv_stream_t *)from$int(self->_stream);
// fd == -1 means invalid FD and can happen after __resume__
if (stream == -1)
// fd == -1 means invalid FD and can happen after __resume__ or if the socket is closed
if (stream == -1) {
on_close->$class->__asyn__(on_close, self);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We initially thought about raising an exception here instead, but http.Client calls net.TLSConnection.write() asynchronously. And I guess others would too?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think you should blindly accept async write as the correct thing. It's not a complete design as-is...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, for close, I'm not sure we even need to close on_close. Like why do we end up here with a -1 stream?

If you call TLSConnection.close() 10 times in a row, would we expect the on_close() callback to also be called 10 times? I think it makes more sense to treat it as a declarative thing, so if TLSConnection.close() is called on a closed connection, the goal of having the connection closed is already reached. We don't actually transition, i.e. we don't go from established -> closed, and so on_close will not be called!?

The stream must have gone to -1 at some earlier point at which the on_error() callback should be called, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Calling on_close even though we're already closed (stream == -1) allows for chaining the callbacks in reconnect():

    def reconnect():
        close(_connect)

return $R_CONT(c$cont, B_None);
}

self->_on_close = on_close;

Expand All @@ -645,9 +656,14 @@ void tls_write_cb(uv_write_t *wreq, int status) {

$R netQ_TLSConnectionD_writeG_local (netQ_TLSConnection self, $Cont c$cont, B_bytes data) {
uv_stream_t *stream = (uv_stream_t *)from$int(self->_stream);
// fd == -1 means invalid FD and can happen after __resume__
if (stream == -1)
// fd == -1 means invalid FD and can happen after __resume__ or if the socket is closed
if (stream == -1) {
// Raise an exception and let the caller retry
char errmsg[] = "Failed to write to TLS TCP socket: bad stream";
log_debug(errmsg);
$RAISE(((B_BaseException)B_RuntimeErrorG_new(to$str(errmsg))));
return $R_CONT(c$cont, B_None);
}

uv_write_t *wreq = (uv_write_t *)malloc(sizeof(uv_write_t));
wreq->data = self;
Expand Down
Loading