-
Notifications
You must be signed in to change notification settings - Fork 7
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Improve http.Client connection handling #1581
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -447,6 +447,7 @@ actor Listener(cap: net.TCPListenCap, address: str, port: int, on_listen_error: | |
# TODO: default schema="https" | ||
# TODO: default port=None | ||
# TODO: default tls_verify=True | ||
# TODO: add arguments for configuring request buffering (size) and reconnection attempts and timeouts | ||
actor Client(cap: net.TCPConnectCap, scheme: str, address: str, port: ?int, tls_verify: bool, on_connect: action(Client) -> None, on_error: action(Client, str) -> None, log_handler: ?logging.Handler): | ||
"""HTTP(S) Client | ||
|
||
|
@@ -457,10 +458,11 @@ actor Client(cap: net.TCPConnectCap, scheme: str, address: str, port: ?int, tls_ | |
var _on_response: list[(bytes, action(Client, Response) -> None)] = [] | ||
var version: ?bytes = None | ||
var buf = b"" | ||
var close_connection: bool = True | ||
var tcp_conn: ?net.TCPConnection = None | ||
var tls_conn: ?net.TLSConnection = None | ||
|
||
var connecting: bool = True | ||
|
||
def _connect(): | ||
if scheme == "http": | ||
_log.verbose("Using http scheme and port 80", None) | ||
|
@@ -475,9 +477,18 @@ actor Client(cap: net.TCPConnectCap, scheme: str, address: str, port: ?int, tls_ | |
|
||
def _on_conn_connect(): | ||
# If there are outstanding requests, it probably means we were | ||
# disconnected or have not connected yet | ||
# TODO: do not flush entire buffer if we know the server will close the | ||
# connection. If the latency is so big that we're able to send the | ||
# entire buffer before the server closes the connection we're just | ||
# wasting resources. | ||
for r in _on_response: | ||
_log.trace("Sending outstanding request", {"request": r.0}) | ||
_conn_write(r.0) | ||
await async on_connect(self) | ||
if connecting: | ||
# Dispatch the on_connect callback on first connect but not for reconnects | ||
await async on_connect(self) | ||
connecting = False | ||
|
||
def _on_tcp_connect(conn: net.TCPConnection) -> None: | ||
_on_conn_connect() | ||
|
@@ -500,10 +511,14 @@ actor Client(cap: net.TCPConnectCap, scheme: str, address: str, port: ?int, tls_ | |
r, buf = parse_response(buf, _log) | ||
if r is not None: | ||
if "connection" in r.headers and r.headers["connection"] == "close": | ||
close_connection = True | ||
_conn_close() | ||
# Is this really the right thing to do here? If the client | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Add a TODO entry here. We should inspect our queue, i.e. _on_response list to see if there is more stuff. I'm also thinking like we can maybe have some basic heuristics, like when we create a http.Client the first time we obviously connect directly then we expect to run at least one query. Maybe we should not reconnect after that (in case of HTTP / 1.0 or not having persistent in later HTTP versions) until there is a second query. Now if we see a second query we can assume "there are multiple requests" and thus reconnect directly after the 2nd and subsequent requests. WDYT? |
||
# does not make a new request we just reconnected for nothing! | ||
# TODO: inspect the _on_response queue to see if there are more requests | ||
# TODO: for HTTP/1.0 do not reconnect right after the first | ||
# request, wait until the 2nd query and then assume there | ||
# will be more. | ||
_log.debug("Closing TCP connection due to header: Connection: close", None) | ||
_connect() | ||
_conn_reconnect() | ||
if len(_on_response) == 0: | ||
_log.notice("Data received with no on_response callback set", None) | ||
break | ||
|
@@ -524,22 +539,28 @@ actor Client(cap: net.TCPConnectCap, scheme: str, address: str, port: ?int, tls_ | |
def _on_con_error(error: str) -> None: | ||
on_error(self, error) | ||
|
||
def _conn_close() -> None: | ||
def _conn_reconnect() -> None: | ||
if tcp_conn is not None: | ||
def _noop(c): | ||
pass | ||
tcp_conn.close(_noop) | ||
tcp_conn.reconnect() | ||
elif tls_conn is not None: | ||
def _noop(c): | ||
pass | ||
tls_conn.close(_noop) | ||
tls_conn.reconnect() | ||
|
||
def _conn_write(data: bytes) -> None: | ||
_log.trace("Sending data", {"data": data}) | ||
if tcp_conn is not None: | ||
tcp_conn.write(data) | ||
elif tls_conn is not None: | ||
tls_conn.write(data) | ||
try: | ||
_log.trace("Sending data", {"data": data}) | ||
# We call the write method on the TCP or TLS connection actor | ||
# synchronously because we want to be able to catch exceptions | ||
# signaling the socket was closed. Note that this is *not* waiting | ||
# for network I/O, just waiting on system I/O for writing to the | ||
# local socket buffer. | ||
if tcp_conn is not None: | ||
await async tcp_conn.write(data) | ||
elif tls_conn is not None: | ||
await async tls_conn.write(data) | ||
except ConnectionError as exc: | ||
# HTTP/1.0 servers close the connection after each request by default | ||
_log.debug("TCP connection closed, reconnecting", {"error": str(exc)}) | ||
_conn_reconnect() | ||
|
||
# HTTP methods | ||
def get(path: str, headers: dict[str, str], on_response: action(Client, Response) -> None): | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -275,6 +275,12 @@ void on_connect6(uv_connect_t *connect_req, int status) { | |
if (r < 0) { | ||
char errmsg[1024] = "Failed to write to TCP socket: "; | ||
uv_strerror_r(r, errmsg + strlen(errmsg), sizeof(errmsg)-strlen(errmsg)); | ||
if (r == UV_EBADF) { | ||
// "bad file descriptor" error occurs when the socket is closed, se | ||
// we raise an exception and let the caller retry | ||
$RAISE(((B_BaseException)B_ConnectionErrorG_new(to$str(errmsg)))); | ||
return $R_CONT(c$cont, B_None); | ||
} | ||
log_warn(errmsg); | ||
$action2 f = ($action2)self->on_error; | ||
f->$class->__asyn__(f, self, to$str(errmsg)); | ||
|
@@ -311,6 +317,9 @@ static void after_shutdown(uv_shutdown_t* req, int status) { | |
uv_stream_t *stream = (uv_stream_t *)from$int(self->_sock); | ||
// fd == -1 means invalid FD and can happen after __resume__ | ||
if (stream == -1) | ||
// TODO: should we dispatch the on_close callback here too even though | ||
// we did not close anything? This is what we do for TLSConnection too | ||
// and it allows for chaining the callbacks | ||
return $R_CONT(c$cont, B_None); | ||
|
||
log_debug("Closing TCP connection"); | ||
|
@@ -629,9 +638,11 @@ void tls_write_cb(uv_write_t *wreq, int status) { | |
|
||
$R netQ_TLSConnectionD_closeG_local (netQ_TLSConnection self, $Cont c$cont, $action on_close) { | ||
uv_stream_t *stream = (uv_stream_t *)from$int(self->_stream); | ||
// fd == -1 means invalid FD and can happen after __resume__ | ||
if (stream == -1) | ||
// fd == -1 means invalid FD and can happen after __resume__ or if the socket is closed | ||
if (stream == -1) { | ||
on_close->$class->__asyn__(on_close, self); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We initially thought about raising an exception here instead, but There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think you should blindly accept async write as the correct thing. It's not a complete design as-is... There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Actually, for close, I'm not sure we even need to close on_close. Like why do we end up here with a -1 stream? If you call The stream must have gone to -1 at some earlier point at which the on_error() callback should be called, right? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Calling def reconnect():
close(_connect) |
||
return $R_CONT(c$cont, B_None); | ||
} | ||
|
||
self->_on_close = on_close; | ||
|
||
|
@@ -645,9 +656,14 @@ void tls_write_cb(uv_write_t *wreq, int status) { | |
|
||
$R netQ_TLSConnectionD_writeG_local (netQ_TLSConnection self, $Cont c$cont, B_bytes data) { | ||
uv_stream_t *stream = (uv_stream_t *)from$int(self->_stream); | ||
// fd == -1 means invalid FD and can happen after __resume__ | ||
if (stream == -1) | ||
// fd == -1 means invalid FD and can happen after __resume__ or if the socket is closed | ||
if (stream == -1) { | ||
// Raise an exception and let the caller retry | ||
char errmsg[] = "Failed to write to TLS TCP socket: bad stream"; | ||
log_debug(errmsg); | ||
$RAISE(((B_BaseException)B_ConnectionErrorG_new(to$str(errmsg)))); | ||
return $R_CONT(c$cont, B_None); | ||
} | ||
|
||
uv_write_t *wreq = (uv_write_t *)malloc(sizeof(uv_write_t)); | ||
wreq->data = self; | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I hadn't actually looked much at how Python does things beyond looking at OSError. I see now that it subclasses things quite nicely. Should we not just copy that whole thing verbatim? Like it feels like a better starting point than starting from scratch. In particular, in this case for the error you later raise, I think it's a BrokenPipeError.
We probably want to consider changing the
__init__
method of our OSError so we can take errno and stuff like that, again just like in Python. Without default args it'll be a little fiddly, but that's OK I guess.