diff --git a/lib/internal/net.js b/lib/internal/net.js index d4558cfca8e214..847539d576906d 100644 --- a/lib/internal/net.js +++ b/lib/internal/net.js @@ -1,5 +1,8 @@ 'use strict'; +const Buffer = require('buffer').Buffer; +const { writeBuffer } = process.binding('fs'); + // Check that the port number is not NaN when coerced to a number, // is an integer and that it falls within the legal range of port numbers. function isLegalPort(port) { @@ -9,7 +12,28 @@ function isLegalPort(port) { return +port === (+port >>> 0) && port <= 0xFFFF; } +function makeSyncWrite(fd) { + return function(chunk, enc, cb) { + if (enc !== 'buffer') + chunk = Buffer.from(chunk, enc); + + this._bytesDispatched += chunk.length; + + try { + writeBuffer(fd, chunk, 0, chunk.length, null); + } catch (ex) { + // Legacy: net writes have .code === .errno, whereas writeBuffer gives the + // raw errno number in .errno. + if (typeof ex.code === 'string') + ex.errno = ex.code; + return cb(ex); + } + cb(); + }; +} + module.exports = { isLegalPort, + makeSyncWrite, normalizedArgsSymbol: Symbol('normalizedArgs') }; diff --git a/lib/net.js b/lib/net.js index 565a649a63aeb6..16d0b9605c594a 100644 --- a/lib/net.js +++ b/lib/net.js @@ -26,7 +26,11 @@ const stream = require('stream'); const timers = require('timers'); const util = require('util'); const internalUtil = require('internal/util'); -const { isLegalPort, normalizedArgsSymbol } = require('internal/net'); +const { + isLegalPort, + normalizedArgsSymbol, + makeSyncWrite +} = require('internal/net'); const assert = require('assert'); const cares = process.binding('cares_wrap'); const { @@ -220,20 +224,24 @@ function Socket(options) { this._handle = options.handle; // private this[async_id_symbol] = getNewAsyncId(this._handle); } else if (options.fd !== undefined) { - this._handle = createHandle(options.fd, false); - this._handle.open(options.fd); + const fd = options.fd; + this._handle = createHandle(fd, false); + this._handle.open(fd); this[async_id_symbol] = this._handle.getAsyncId(); // options.fd can be string (since it is user-defined), // so changing this to === would be semver-major // See: https://github.com/nodejs/node/pull/11513 // eslint-disable-next-line eqeqeq - if ((options.fd == 1 || options.fd == 2) && + if ((fd == 1 || fd == 2) && (this._handle instanceof Pipe) && process.platform === 'win32') { // Make stdout and stderr blocking on Windows var err = this._handle.setBlocking(true); if (err) throw errnoException(err, 'setBlocking'); + + this._writev = null; + this._write = makeSyncWrite(fd); } this.readable = options.readable !== false; this.writable = options.writable !== false; diff --git a/lib/tty.js b/lib/tty.js index 2a62fcb859a244..28f2a411840186 100644 --- a/lib/tty.js +++ b/lib/tty.js @@ -24,6 +24,7 @@ const util = require('util'); const net = require('net'); const { TTY, isTTY } = process.binding('tty_wrap'); +const { makeSyncWrite } = require('internal/net'); const { inherits } = util; const errnoException = util._errnoException; const errors = require('internal/errors'); @@ -91,6 +92,8 @@ function WriteStream(fd) { // even though it was originally intended to change in v1.0.2 (Libuv 1.2.1). // Ref: https://github.com/nodejs/node/pull/1771#issuecomment-119351671 this._handle.setBlocking(true); + this._writev = null; + this._write = makeSyncWrite(fd); var winSize = new Array(2); var err = this._handle.getWindowSize(winSize); diff --git a/src/stream_base.cc b/src/stream_base.cc index b1aea79d52f762..0fb801ddd57445 100644 --- a/src/stream_base.cc +++ b/src/stream_base.cc @@ -192,11 +192,17 @@ int StreamBase::Writev(const FunctionCallbackInfo& args) { offset += str_size; bytes += str_size; } + + err = DoTryWrite(&buf_list, &count); + if (err != 0 || count == 0) { + req_wrap->Dispatched(); + req_wrap->Dispose(); + goto done; + } } err = DoWrite(req_wrap, buf_list, count, nullptr); - if (HasWriteQueue()) - req_wrap_obj->Set(env->async(), True(env->isolate())); + req_wrap_obj->Set(env->async(), True(env->isolate())); if (err) req_wrap->Dispose(); @@ -254,8 +260,7 @@ int StreamBase::WriteBuffer(const FunctionCallbackInfo& args) { } err = DoWrite(req_wrap, bufs, count, nullptr); - if (HasWriteQueue()) - req_wrap_obj->Set(env->async(), True(env->isolate())); + req_wrap_obj->Set(env->async(), True(env->isolate())); req_wrap_obj->Set(env->buffer_string(), args[1]); if (err) @@ -381,8 +386,7 @@ int StreamBase::WriteString(const FunctionCallbackInfo& args) { reinterpret_cast(send_handle)); } - if (HasWriteQueue()) - req_wrap_obj->Set(env->async(), True(env->isolate())); + req_wrap_obj->Set(env->async(), True(env->isolate())); if (err) req_wrap->Dispose(); @@ -476,10 +480,6 @@ int StreamResource::DoTryWrite(uv_buf_t** bufs, size_t* count) { return 0; } -bool StreamResource::HasWriteQueue() { - return true; -} - const char* StreamResource::Error() const { return nullptr; diff --git a/src/stream_base.h b/src/stream_base.h index 071627f3bf2a67..d063176b04a4db 100644 --- a/src/stream_base.h +++ b/src/stream_base.h @@ -162,7 +162,6 @@ class StreamResource { uv_buf_t* bufs, size_t count, uv_stream_t* send_handle) = 0; - virtual bool HasWriteQueue(); virtual const char* Error() const; virtual void ClearError(); diff --git a/src/stream_wrap.cc b/src/stream_wrap.cc index 094991107ba7aa..b639d945004cfa 100644 --- a/src/stream_wrap.cc +++ b/src/stream_wrap.cc @@ -380,10 +380,6 @@ int LibuvStreamWrap::DoWrite(WriteWrap* w, } -bool LibuvStreamWrap::HasWriteQueue() { - return stream()->write_queue_size > 0; -} - void LibuvStreamWrap::AfterUvWrite(uv_write_t* req, int status) { WriteWrap* req_wrap = WriteWrap::from_req(req); diff --git a/src/stream_wrap.h b/src/stream_wrap.h index a695f9a08a7729..0146d41c6e8c7b 100644 --- a/src/stream_wrap.h +++ b/src/stream_wrap.h @@ -55,7 +55,6 @@ class LibuvStreamWrap : public HandleWrap, public StreamBase { uv_buf_t* bufs, size_t count, uv_stream_t* send_handle) override; - bool HasWriteQueue() override; inline uv_stream_t* stream() const { return stream_;