diff --git a/src/nxt_port.h b/src/nxt_port.h index 772fb41ae..bb428b575 100644 --- a/src/nxt_port.h +++ b/src/nxt_port.h @@ -59,6 +59,8 @@ struct nxt_port_handlers_s { /* Status report. */ nxt_port_handler_t status; + nxt_port_handler_t client_error; + nxt_port_handler_t oosm; nxt_port_handler_t shm_ack; nxt_port_handler_t read_queue; @@ -115,6 +117,8 @@ typedef enum { _NXT_PORT_MSG_APP_RESTART = nxt_port_handler_idx(app_restart), _NXT_PORT_MSG_STATUS = nxt_port_handler_idx(status), + _NXT_PORT_MSG_CLIENT_ERROR = nxt_port_handler_idx(client_error), + _NXT_PORT_MSG_OOSM = nxt_port_handler_idx(oosm), _NXT_PORT_MSG_SHM_ACK = nxt_port_handler_idx(shm_ack), _NXT_PORT_MSG_READ_QUEUE = nxt_port_handler_idx(read_queue), @@ -160,6 +164,8 @@ typedef enum { NXT_PORT_MSG_APP_RESTART = nxt_msg_last(_NXT_PORT_MSG_APP_RESTART), NXT_PORT_MSG_STATUS = nxt_msg_last(_NXT_PORT_MSG_STATUS), + NXT_PORT_MSG_CLIENT_ERROR = nxt_msg_last(_NXT_PORT_MSG_CLIENT_ERROR), + NXT_PORT_MSG_OOSM = nxt_msg_last(_NXT_PORT_MSG_OOSM), NXT_PORT_MSG_SHM_ACK = nxt_msg_last(_NXT_PORT_MSG_SHM_ACK), NXT_PORT_MSG_READ_QUEUE = _NXT_PORT_MSG_READ_QUEUE, diff --git a/src/nxt_router.c b/src/nxt_router.c index 44ea823b7..b4cdf8df8 100644 --- a/src/nxt_router.c +++ b/src/nxt_router.c @@ -5300,7 +5300,16 @@ nxt_router_http_request_done(nxt_task_t *task, void *obj, void *data) nxt_debug(task, "router http request done (rpc_data %p)", r->req_rpc_data); if (r->req_rpc_data != NULL) { - nxt_request_rpc_data_unlink(task, r->req_rpc_data); + nxt_request_rpc_data_t *req_rpc_data = r->req_rpc_data; + + if (r->error) { + nxt_port_socket_write(task, req_rpc_data->app_port, + NXT_PORT_MSG_CLIENT_ERROR, + -1, req_rpc_data->stream, + task->thread->engine->port->id, NULL); + } + + nxt_request_rpc_data_unlink(task, req_rpc_data); } nxt_http_request_close_handler(task, r, r->proto.any); diff --git a/src/nxt_unit.c b/src/nxt_unit.c index 966a6c0fa..3cd7f3d7a 100644 --- a/src/nxt_unit.c +++ b/src/nxt_unit.c @@ -74,6 +74,8 @@ static int nxt_unit_request_check_response_port(nxt_unit_request_info_t *req, static int nxt_unit_send_req_headers_ack(nxt_unit_request_info_t *req); static int nxt_unit_process_websocket(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg); +static int nxt_unit_process_client_error(nxt_unit_ctx_t *ctx, + nxt_unit_recv_msg_t *recv_msg); static int nxt_unit_process_shm_ack(nxt_unit_ctx_t *ctx); static nxt_unit_request_info_impl_t *nxt_unit_request_info_get( nxt_unit_ctx_t *ctx); @@ -1121,6 +1123,10 @@ nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf, rc = nxt_unit_process_websocket(ctx, &recv_msg); break; + case _NXT_PORT_MSG_CLIENT_ERROR: + rc = nxt_unit_process_client_error(ctx, &recv_msg); + break; + case _NXT_PORT_MSG_REMOVE_PID: if (nxt_slow_path(recv_msg.size != sizeof(pid))) { nxt_unit_alert(ctx, "#%"PRIu32": remove_pid: invalid message size " @@ -1377,18 +1383,16 @@ nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg, lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); + res = nxt_unit_request_hash_add(ctx, req); + if (nxt_slow_path(res != NXT_UNIT_OK)) { + nxt_unit_req_warn(req, "failed to add request to hash"); + nxt_unit_request_done(req, NXT_UNIT_ERROR); + return NXT_UNIT_ERROR; + } + if (req->content_length > (uint64_t) (req->content_buf->end - req->content_buf->free)) { - res = nxt_unit_request_hash_add(ctx, req); - if (nxt_slow_path(res != NXT_UNIT_OK)) { - nxt_unit_req_warn(req, "failed to add request to hash"); - - nxt_unit_request_done(req, NXT_UNIT_ERROR); - - return NXT_UNIT_ERROR; - } - /* * If application have separate data handler, we may start * request processing and process data when it is arrived. @@ -1418,7 +1422,7 @@ nxt_unit_process_req_body(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) nxt_unit_mmap_buf_t *b; nxt_unit_request_info_t *req; - req = nxt_unit_request_hash_find(ctx, recv_msg->stream, recv_msg->last); + req = nxt_unit_request_hash_find(ctx, recv_msg->stream, 0); if (req == NULL) { return NXT_UNIT_OK; } @@ -1722,6 +1726,26 @@ nxt_unit_process_websocket(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) return NXT_UNIT_OK; } +static int +nxt_unit_process_client_error(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) +{ + nxt_unit_impl_t *lib; + nxt_unit_request_info_t *req; + + req = nxt_unit_request_hash_find(ctx, recv_msg->stream, 0); + + if (req == NULL) { + return NXT_UNIT_OK; + } + + lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); + + if (lib->callbacks.close_handler) { + lib->callbacks.close_handler(req); + } + + return NXT_UNIT_OK; +} static int nxt_unit_process_shm_ack(nxt_unit_ctx_t *ctx) diff --git a/src/python/nxt_python_asgi_http.c b/src/python/nxt_python_asgi_http.c index cdd6357e6..bdb8e21d5 100644 --- a/src/python/nxt_python_asgi_http.c +++ b/src/python/nxt_python_asgi_http.c @@ -368,6 +368,11 @@ nxt_py_asgi_http_response_body(nxt_py_asgi_http_t *http, PyObject *dict) "sent, after response already completed"); } + if (nxt_slow_path(http->closed)) { + return PyErr_Format(PyExc_ConnectionResetError, + "Connection Closed "); + } + if (nxt_slow_path(http->send_future != NULL)) { return PyErr_Format(PyExc_RuntimeError, "Concurrent send"); } diff --git a/src/python/nxt_python_asgi_websocket.c b/src/python/nxt_python_asgi_websocket.c index ab1d0324e..513e0ea33 100644 --- a/src/python/nxt_python_asgi_websocket.c +++ b/src/python/nxt_python_asgi_websocket.c @@ -273,10 +273,10 @@ nxt_py_asgi_websocket_accept(nxt_py_asgi_websocket_t *ws, PyObject *dict) return PyErr_Format(PyExc_RuntimeError, "WebSocket already accepted"); case NXT_WS_DISCONNECTED: - return PyErr_Format(PyExc_RuntimeError, "WebSocket disconnected"); + return PyErr_Format(PyExc_ConnectionResetError, "WebSocket disconnected"); case NXT_WS_CLOSED: - return PyErr_Format(PyExc_RuntimeError, "WebSocket already closed"); + return PyErr_Format(PyExc_ConnectionResetError, "WebSocket already closed"); } if (nxt_slow_path(nxt_unit_response_is_websocket(ws->req))) { @@ -368,11 +368,11 @@ nxt_py_asgi_websocket_close(nxt_py_asgi_websocket_t *ws, PyObject *dict) } if (nxt_slow_path(ws->state == NXT_WS_DISCONNECTED)) { - return PyErr_Format(PyExc_RuntimeError, "WebSocket disconnected"); + return PyErr_Format(PyExc_ConnectionResetError, "WebSocket disconnected"); } if (nxt_slow_path(ws->state == NXT_WS_CLOSED)) { - return PyErr_Format(PyExc_RuntimeError, "WebSocket already closed"); + return PyErr_Format(PyExc_ConnectionResetError, "WebSocket already closed"); } if (nxt_unit_response_is_websocket(ws->req)) { @@ -433,11 +433,11 @@ nxt_py_asgi_websocket_send_frame(nxt_py_asgi_websocket_t *ws, PyObject *dict) } if (nxt_slow_path(ws->state == NXT_WS_DISCONNECTED)) { - return PyErr_Format(PyExc_RuntimeError, "WebSocket disconnected"); + return PyErr_Format(PyExc_ConnectionResetError, "WebSocket disconnected"); } if (nxt_slow_path(ws->state == NXT_WS_CLOSED)) { - return PyErr_Format(PyExc_RuntimeError, "WebSocket already closed"); + return PyErr_Format(PyExc_ConnectionResetError, "WebSocket already closed"); } bytes = PyDict_GetItem(dict, nxt_py_bytes_str); @@ -984,9 +984,9 @@ nxt_py_asgi_websocket_close_handler(nxt_unit_request_info_t *req) return; } - if (ws->receive_future == NULL) { - ws->state = NXT_WS_DISCONNECTED; + ws->state = NXT_WS_DISCONNECTED; + if (ws->receive_future == NULL) { return; }