From 87431eb28e771cd1d8063ab2c3bffb01d9131728 Mon Sep 17 00:00:00 2001 From: Gourav Date: Wed, 5 Feb 2025 23:49:59 +0530 Subject: [PATCH 1/2] Previously, the app was not notified when the client disconnection. This caused issues especially in cases of websocket connections and SSE Events where the app continued to send data to the router, which could not deliver it to the client due to the disconnection. Changes made: Added functionality to send a port message to notify the app of client disconnection in form of port message(_NXT_PORT_MSG_CLIENT_ERROR). On the App side, handled this message and called the registered close_hanlder callback if registered. --- src/nxt_port.h | 6 ++++++ src/nxt_router.c | 11 ++++++++++- src/nxt_unit.c | 44 ++++++++++++++++++++++++++++++++++---------- 3 files changed, 50 insertions(+), 11 deletions(-) diff --git a/src/nxt_port.h b/src/nxt_port.h index 772fb41ae..bb428b575 100644 --- a/src/nxt_port.h +++ b/src/nxt_port.h @@ -59,6 +59,8 @@ struct nxt_port_handlers_s { /* Status report. */ nxt_port_handler_t status; + nxt_port_handler_t client_error; + nxt_port_handler_t oosm; nxt_port_handler_t shm_ack; nxt_port_handler_t read_queue; @@ -115,6 +117,8 @@ typedef enum { _NXT_PORT_MSG_APP_RESTART = nxt_port_handler_idx(app_restart), _NXT_PORT_MSG_STATUS = nxt_port_handler_idx(status), + _NXT_PORT_MSG_CLIENT_ERROR = nxt_port_handler_idx(client_error), + _NXT_PORT_MSG_OOSM = nxt_port_handler_idx(oosm), _NXT_PORT_MSG_SHM_ACK = nxt_port_handler_idx(shm_ack), _NXT_PORT_MSG_READ_QUEUE = nxt_port_handler_idx(read_queue), @@ -160,6 +164,8 @@ typedef enum { NXT_PORT_MSG_APP_RESTART = nxt_msg_last(_NXT_PORT_MSG_APP_RESTART), NXT_PORT_MSG_STATUS = nxt_msg_last(_NXT_PORT_MSG_STATUS), + NXT_PORT_MSG_CLIENT_ERROR = nxt_msg_last(_NXT_PORT_MSG_CLIENT_ERROR), + NXT_PORT_MSG_OOSM = nxt_msg_last(_NXT_PORT_MSG_OOSM), NXT_PORT_MSG_SHM_ACK = nxt_msg_last(_NXT_PORT_MSG_SHM_ACK), NXT_PORT_MSG_READ_QUEUE = _NXT_PORT_MSG_READ_QUEUE, diff --git a/src/nxt_router.c b/src/nxt_router.c index 44ea823b7..b4cdf8df8 100644 --- a/src/nxt_router.c +++ b/src/nxt_router.c @@ -5300,7 +5300,16 @@ nxt_router_http_request_done(nxt_task_t *task, void *obj, void *data) nxt_debug(task, "router http request done (rpc_data %p)", r->req_rpc_data); if (r->req_rpc_data != NULL) { - nxt_request_rpc_data_unlink(task, r->req_rpc_data); + nxt_request_rpc_data_t *req_rpc_data = r->req_rpc_data; + + if (r->error) { + nxt_port_socket_write(task, req_rpc_data->app_port, + NXT_PORT_MSG_CLIENT_ERROR, + -1, req_rpc_data->stream, + task->thread->engine->port->id, NULL); + } + + nxt_request_rpc_data_unlink(task, req_rpc_data); } nxt_http_request_close_handler(task, r, r->proto.any); diff --git a/src/nxt_unit.c b/src/nxt_unit.c index 966a6c0fa..3cd7f3d7a 100644 --- a/src/nxt_unit.c +++ b/src/nxt_unit.c @@ -74,6 +74,8 @@ static int nxt_unit_request_check_response_port(nxt_unit_request_info_t *req, static int nxt_unit_send_req_headers_ack(nxt_unit_request_info_t *req); static int nxt_unit_process_websocket(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg); +static int nxt_unit_process_client_error(nxt_unit_ctx_t *ctx, + nxt_unit_recv_msg_t *recv_msg); static int nxt_unit_process_shm_ack(nxt_unit_ctx_t *ctx); static nxt_unit_request_info_impl_t *nxt_unit_request_info_get( nxt_unit_ctx_t *ctx); @@ -1121,6 +1123,10 @@ nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf, rc = nxt_unit_process_websocket(ctx, &recv_msg); break; + case _NXT_PORT_MSG_CLIENT_ERROR: + rc = nxt_unit_process_client_error(ctx, &recv_msg); + break; + case _NXT_PORT_MSG_REMOVE_PID: if (nxt_slow_path(recv_msg.size != sizeof(pid))) { nxt_unit_alert(ctx, "#%"PRIu32": remove_pid: invalid message size " @@ -1377,18 +1383,16 @@ nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg, lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); + res = nxt_unit_request_hash_add(ctx, req); + if (nxt_slow_path(res != NXT_UNIT_OK)) { + nxt_unit_req_warn(req, "failed to add request to hash"); + nxt_unit_request_done(req, NXT_UNIT_ERROR); + return NXT_UNIT_ERROR; + } + if (req->content_length > (uint64_t) (req->content_buf->end - req->content_buf->free)) { - res = nxt_unit_request_hash_add(ctx, req); - if (nxt_slow_path(res != NXT_UNIT_OK)) { - nxt_unit_req_warn(req, "failed to add request to hash"); - - nxt_unit_request_done(req, NXT_UNIT_ERROR); - - return NXT_UNIT_ERROR; - } - /* * If application have separate data handler, we may start * request processing and process data when it is arrived. @@ -1418,7 +1422,7 @@ nxt_unit_process_req_body(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) nxt_unit_mmap_buf_t *b; nxt_unit_request_info_t *req; - req = nxt_unit_request_hash_find(ctx, recv_msg->stream, recv_msg->last); + req = nxt_unit_request_hash_find(ctx, recv_msg->stream, 0); if (req == NULL) { return NXT_UNIT_OK; } @@ -1722,6 +1726,26 @@ nxt_unit_process_websocket(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) return NXT_UNIT_OK; } +static int +nxt_unit_process_client_error(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) +{ + nxt_unit_impl_t *lib; + nxt_unit_request_info_t *req; + + req = nxt_unit_request_hash_find(ctx, recv_msg->stream, 0); + + if (req == NULL) { + return NXT_UNIT_OK; + } + + lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); + + if (lib->callbacks.close_handler) { + lib->callbacks.close_handler(req); + } + + return NXT_UNIT_OK; +} static int nxt_unit_process_shm_ack(nxt_unit_ctx_t *ctx) From 8deb39ce6b111797d3b9912412ee582955148630 Mon Sep 17 00:00:00 2001 From: Gourav Date: Thu, 6 Feb 2025 21:01:33 +0530 Subject: [PATCH 2/2] fix: Handle client disconnection for HTTP and WebSocket according to ASGI spec For HTTP connections: - If the app is sending data using the send callable, according to the ASGI spec, it should throw an exception in case of client disconnection. Previously, even if we processed the client_error message and set the http->closed state, it wouldn't throw an error because it wasn't handled. This change ensures that the exception is raised as per the ASGI spec. For WebSocket connections: - If the app is awaiting on receive, it would get a 'websocket.disconnect' event. However, if the app continues to send data using the send callable after receiving this event, it wouldn't raise an error because ws->state = NXT_WS_DISCONNECTED was never set in that case. According to the ASGI spec, if send is called after receiving a 'websocket.disconnect' event or on a closed client, it should raise an exception. This change ensures that the exception is raised as per the ASGI spec. --- src/python/nxt_python_asgi_http.c | 5 +++++ src/python/nxt_python_asgi_websocket.c | 16 ++++++++-------- 2 files changed, 13 insertions(+), 8 deletions(-) diff --git a/src/python/nxt_python_asgi_http.c b/src/python/nxt_python_asgi_http.c index cdd6357e6..bdb8e21d5 100644 --- a/src/python/nxt_python_asgi_http.c +++ b/src/python/nxt_python_asgi_http.c @@ -368,6 +368,11 @@ nxt_py_asgi_http_response_body(nxt_py_asgi_http_t *http, PyObject *dict) "sent, after response already completed"); } + if (nxt_slow_path(http->closed)) { + return PyErr_Format(PyExc_ConnectionResetError, + "Connection Closed "); + } + if (nxt_slow_path(http->send_future != NULL)) { return PyErr_Format(PyExc_RuntimeError, "Concurrent send"); } diff --git a/src/python/nxt_python_asgi_websocket.c b/src/python/nxt_python_asgi_websocket.c index ab1d0324e..513e0ea33 100644 --- a/src/python/nxt_python_asgi_websocket.c +++ b/src/python/nxt_python_asgi_websocket.c @@ -273,10 +273,10 @@ nxt_py_asgi_websocket_accept(nxt_py_asgi_websocket_t *ws, PyObject *dict) return PyErr_Format(PyExc_RuntimeError, "WebSocket already accepted"); case NXT_WS_DISCONNECTED: - return PyErr_Format(PyExc_RuntimeError, "WebSocket disconnected"); + return PyErr_Format(PyExc_ConnectionResetError, "WebSocket disconnected"); case NXT_WS_CLOSED: - return PyErr_Format(PyExc_RuntimeError, "WebSocket already closed"); + return PyErr_Format(PyExc_ConnectionResetError, "WebSocket already closed"); } if (nxt_slow_path(nxt_unit_response_is_websocket(ws->req))) { @@ -368,11 +368,11 @@ nxt_py_asgi_websocket_close(nxt_py_asgi_websocket_t *ws, PyObject *dict) } if (nxt_slow_path(ws->state == NXT_WS_DISCONNECTED)) { - return PyErr_Format(PyExc_RuntimeError, "WebSocket disconnected"); + return PyErr_Format(PyExc_ConnectionResetError, "WebSocket disconnected"); } if (nxt_slow_path(ws->state == NXT_WS_CLOSED)) { - return PyErr_Format(PyExc_RuntimeError, "WebSocket already closed"); + return PyErr_Format(PyExc_ConnectionResetError, "WebSocket already closed"); } if (nxt_unit_response_is_websocket(ws->req)) { @@ -433,11 +433,11 @@ nxt_py_asgi_websocket_send_frame(nxt_py_asgi_websocket_t *ws, PyObject *dict) } if (nxt_slow_path(ws->state == NXT_WS_DISCONNECTED)) { - return PyErr_Format(PyExc_RuntimeError, "WebSocket disconnected"); + return PyErr_Format(PyExc_ConnectionResetError, "WebSocket disconnected"); } if (nxt_slow_path(ws->state == NXT_WS_CLOSED)) { - return PyErr_Format(PyExc_RuntimeError, "WebSocket already closed"); + return PyErr_Format(PyExc_ConnectionResetError, "WebSocket already closed"); } bytes = PyDict_GetItem(dict, nxt_py_bytes_str); @@ -984,9 +984,9 @@ nxt_py_asgi_websocket_close_handler(nxt_unit_request_info_t *req) return; } - if (ws->receive_future == NULL) { - ws->state = NXT_WS_DISCONNECTED; + ws->state = NXT_WS_DISCONNECTED; + if (ws->receive_future == NULL) { return; }