Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve http.Client connection handling #1581

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions base/builtin/registration.c
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ void $register_builtin() {
$register_force(MEMORYERROR_ID,&B_MemoryErrorG_methods);
$register_force(OSERROR_ID,&B_OSErrorG_methods);
$register_force(RUNTIMEERROR_ID,&B_RuntimeErrorG_methods);
$register_force(CONNECTIONERROR_ID,&B_ConnectionErrorG_methods);
$register_force(NOTIMPLEMENTEDERROR_ID,&B_NotImplementedErrorG_methods);
$register_force(VALUEERROR_ID,&B_ValueErrorG_methods);
// $register_builtin_protocols();
Expand Down
25 changes: 13 additions & 12 deletions base/builtin/registration.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,22 +54,23 @@
#define KEYERROR_ID 39
#define MEMORYERROR_ID 40
#define OSERROR_ID 41
#define RUNTIMEERROR_ID 42
#define NOTIMPLEMENTEDERROR_ID 43
#define VALUEERROR_ID 44
#define CONNECTIONERROR_ID 42
#define RUNTIMEERROR_ID 43
#define NOTIMPLEMENTEDERROR_ID 44
#define VALUEERROR_ID 45

#define PROC_ID 45
#define ACTION_ID 46
#define MUT_ID 47
#define PURE_ID 48
#define PROC_ID 46
#define ACTION_ID 47
#define MUT_ID 48
#define PURE_ID 49

#define SEQ_ID 49
#define BRK_ID 50
#define CNT_ID 51
#define RET_ID 52
#define SEQ_ID 50
#define BRK_ID 51
#define CNT_ID 52
#define RET_ID 53


#define PREASSIGNED 53
#define PREASSIGNED 54


/*
Expand Down
3 changes: 3 additions & 0 deletions base/src/__builtin__.act
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,9 @@ class MemoryError (Exception):
class OSError (Exception):
pass

class ConnectionError (OSError):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I hadn't actually looked much at how Python does things beyond looking at OSError. I see now that it subclasses things quite nicely. Should we not just copy that whole thing verbatim? Like it feels like a better starting point than starting from scratch. In particular, in this case for the error you later raise, I think it's a BrokenPipeError.

We probably want to consider changing the __init__ method of our OSError so we can take errno and stuff like that, again just like in Python. Without default args it'll be a little fiddly, but that's OK I guess.

pass

class RuntimeError (Exception):
pass

Expand Down
55 changes: 38 additions & 17 deletions base/src/http.act
Original file line number Diff line number Diff line change
Expand Up @@ -447,6 +447,7 @@ actor Listener(cap: net.TCPListenCap, address: str, port: int, on_listen_error:
# TODO: default schema="https"
# TODO: default port=None
# TODO: default tls_verify=True
# TODO: add arguments for configuring request buffering (size) and reconnection attempts and timeouts
actor Client(cap: net.TCPConnectCap, scheme: str, address: str, port: ?int, tls_verify: bool, on_connect: action(Client) -> None, on_error: action(Client, str) -> None, log_handler: ?logging.Handler):
"""HTTP(S) Client

Expand All @@ -457,10 +458,11 @@ actor Client(cap: net.TCPConnectCap, scheme: str, address: str, port: ?int, tls_
var _on_response: list[(bytes, action(Client, Response) -> None)] = []
var version: ?bytes = None
var buf = b""
var close_connection: bool = True
var tcp_conn: ?net.TCPConnection = None
var tls_conn: ?net.TLSConnection = None

var connecting: bool = True

def _connect():
if scheme == "http":
_log.verbose("Using http scheme and port 80", None)
Expand All @@ -475,9 +477,18 @@ actor Client(cap: net.TCPConnectCap, scheme: str, address: str, port: ?int, tls_

def _on_conn_connect():
# If there are outstanding requests, it probably means we were
# disconnected or have not connected yet
# TODO: do not flush entire buffer if we know the server will close the
# connection. If the latency is so big that we're able to send the
# entire buffer before the server closes the connection we're just
# wasting resources.
for r in _on_response:
_log.trace("Sending outstanding request", {"request": r.0})
_conn_write(r.0)
await async on_connect(self)
if connecting:
# Dispatch the on_connect callback on first connect but not for reconnects
await async on_connect(self)
connecting = False

def _on_tcp_connect(conn: net.TCPConnection) -> None:
_on_conn_connect()
Expand All @@ -500,10 +511,14 @@ actor Client(cap: net.TCPConnectCap, scheme: str, address: str, port: ?int, tls_
r, buf = parse_response(buf, _log)
if r is not None:
if "connection" in r.headers and r.headers["connection"] == "close":
close_connection = True
_conn_close()
# Is this really the right thing to do here? If the client
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a TODO entry here. We should inspect our queue, i.e. _on_response list to see if there is more stuff.

I'm also thinking like we can maybe have some basic heuristics, like when we create a http.Client the first time we obviously connect directly then we expect to run at least one query. Maybe we should not reconnect after that (in case of HTTP / 1.0 or not having persistent in later HTTP versions) until there is a second query. Now if we see a second query we can assume "there are multiple requests" and thus reconnect directly after the 2nd and subsequent requests. WDYT?

# does not make a new request we just reconnected for nothing!
# TODO: inspect the _on_response queue to see if there are more requests
# TODO: for HTTP/1.0 do not reconnect right after the first
# request, wait until the 2nd query and then assume there
# will be more.
_log.debug("Closing TCP connection due to header: Connection: close", None)
_connect()
_conn_reconnect()
if len(_on_response) == 0:
_log.notice("Data received with no on_response callback set", None)
break
Expand All @@ -524,22 +539,28 @@ actor Client(cap: net.TCPConnectCap, scheme: str, address: str, port: ?int, tls_
def _on_con_error(error: str) -> None:
on_error(self, error)

def _conn_close() -> None:
def _conn_reconnect() -> None:
if tcp_conn is not None:
def _noop(c):
pass
tcp_conn.close(_noop)
tcp_conn.reconnect()
elif tls_conn is not None:
def _noop(c):
pass
tls_conn.close(_noop)
tls_conn.reconnect()

def _conn_write(data: bytes) -> None:
_log.trace("Sending data", {"data": data})
if tcp_conn is not None:
tcp_conn.write(data)
elif tls_conn is not None:
tls_conn.write(data)
try:
_log.trace("Sending data", {"data": data})
# We call the write method on the TCP or TLS connection actor
# synchronously because we want to be able to catch exceptions
# signaling the socket was closed. Note that this is *not* waiting
# for network I/O, just waiting on system I/O for writing to the
# local socket buffer.
if tcp_conn is not None:
await async tcp_conn.write(data)
elif tls_conn is not None:
await async tls_conn.write(data)
except ConnectionError as exc:
# HTTP/1.0 servers close the connection after each request by default
_log.debug("TCP connection closed, reconnecting", {"error": str(exc)})
_conn_reconnect()

# HTTP methods
def get(path: str, headers: dict[str, str], on_response: action(Client, Response) -> None):
Expand Down
24 changes: 20 additions & 4 deletions base/src/net.ext.c
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,12 @@ void on_connect6(uv_connect_t *connect_req, int status) {
if (r < 0) {
char errmsg[1024] = "Failed to write to TCP socket: ";
uv_strerror_r(r, errmsg + strlen(errmsg), sizeof(errmsg)-strlen(errmsg));
if (r == UV_EBADF) {
// "bad file descriptor" error occurs when the socket is closed, se
// we raise an exception and let the caller retry
$RAISE(((B_BaseException)B_ConnectionErrorG_new(to$str(errmsg))));
return $R_CONT(c$cont, B_None);
}
log_warn(errmsg);
$action2 f = ($action2)self->on_error;
f->$class->__asyn__(f, self, to$str(errmsg));
Expand Down Expand Up @@ -311,6 +317,9 @@ static void after_shutdown(uv_shutdown_t* req, int status) {
uv_stream_t *stream = (uv_stream_t *)from$int(self->_sock);
// fd == -1 means invalid FD and can happen after __resume__
if (stream == -1)
// TODO: should we dispatch the on_close callback here too even though
// we did not close anything? This is what we do for TLSConnection too
// and it allows for chaining the callbacks
return $R_CONT(c$cont, B_None);

log_debug("Closing TCP connection");
Expand Down Expand Up @@ -629,9 +638,11 @@ void tls_write_cb(uv_write_t *wreq, int status) {

$R netQ_TLSConnectionD_closeG_local (netQ_TLSConnection self, $Cont c$cont, $action on_close) {
uv_stream_t *stream = (uv_stream_t *)from$int(self->_stream);
// fd == -1 means invalid FD and can happen after __resume__
if (stream == -1)
// fd == -1 means invalid FD and can happen after __resume__ or if the socket is closed
if (stream == -1) {
on_close->$class->__asyn__(on_close, self);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We initially thought about raising an exception here instead, but http.Client calls net.TLSConnection.write() asynchronously. And I guess others would too?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think you should blindly accept async write as the correct thing. It's not a complete design as-is...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, for close, I'm not sure we even need to close on_close. Like why do we end up here with a -1 stream?

If you call TLSConnection.close() 10 times in a row, would we expect the on_close() callback to also be called 10 times? I think it makes more sense to treat it as a declarative thing, so if TLSConnection.close() is called on a closed connection, the goal of having the connection closed is already reached. We don't actually transition, i.e. we don't go from established -> closed, and so on_close will not be called!?

The stream must have gone to -1 at some earlier point at which the on_error() callback should be called, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Calling on_close even though we're already closed (stream == -1) allows for chaining the callbacks in reconnect():

    def reconnect():
        close(_connect)

return $R_CONT(c$cont, B_None);
}

self->_on_close = on_close;

Expand All @@ -645,9 +656,14 @@ void tls_write_cb(uv_write_t *wreq, int status) {

$R netQ_TLSConnectionD_writeG_local (netQ_TLSConnection self, $Cont c$cont, B_bytes data) {
uv_stream_t *stream = (uv_stream_t *)from$int(self->_stream);
// fd == -1 means invalid FD and can happen after __resume__
if (stream == -1)
// fd == -1 means invalid FD and can happen after __resume__ or if the socket is closed
if (stream == -1) {
// Raise an exception and let the caller retry
char errmsg[] = "Failed to write to TLS TCP socket: bad stream";
log_debug(errmsg);
$RAISE(((B_BaseException)B_ConnectionErrorG_new(to$str(errmsg))));
return $R_CONT(c$cont, B_None);
}

uv_write_t *wreq = (uv_write_t *)malloc(sizeof(uv_write_t));
wreq->data = self;
Expand Down