Skip to content

net/http2: use actual Timeout instances #17704

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Dec 20, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 33 additions & 22 deletions lib/internal/http2/core.js
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,12 @@ const {
} = require('internal/http2/util');

const {
_unrefActive,
enroll,
unenroll
} = require('timers');
kTimeout,
setUnrefTimeout,
validateTimerDuration
} = require('internal/timers');

const { _unrefActive } = require('timers');

const { ShutdownWrap, WriteWrap } = process.binding('stream_wrap');
const { constants } = binding;
Expand Down Expand Up @@ -280,8 +282,8 @@ function onStreamClose(code, hasData) {
` [has data? ${hasData}]`);

if (!stream.closed) {
// Unenroll from timeouts
unenroll(stream);
// Clear timeout and remove timeout listeners
stream.setTimeout(0);
stream.removeAllListeners('timeout');

// Set the state flags
Expand Down Expand Up @@ -788,6 +790,7 @@ class Http2Session extends EventEmitter {
this[kType] = type;
this[kProxySocket] = null;
this[kSocket] = socket;
this[kTimeout] = null;

// Do not use nagle's algorithm
if (typeof socket.setNoDelay === 'function')
Expand Down Expand Up @@ -828,7 +831,7 @@ class Http2Session extends EventEmitter {
[kUpdateTimer]() {
if (this.destroyed)
return;
_unrefActive(this);
if (this[kTimeout]) _unrefActive(this[kTimeout]);
}

// Sets the id of the next stream to be created by this Http2Session.
Expand Down Expand Up @@ -1019,7 +1022,7 @@ class Http2Session extends EventEmitter {
state.flags |= SESSION_FLAGS_DESTROYED;

// Clear timeout and remove timeout listeners
unenroll(this);
this.setTimeout(0);
this.removeAllListeners('timeout');

// Destroy any pending and open streams
Expand Down Expand Up @@ -1322,6 +1325,8 @@ class Http2Stream extends Duplex {
this[kSession] = session;
session[kState].pendingStreams.add(this);

this[kTimeout] = null;

this[kState] = {
flags: STREAM_FLAGS_PENDING,
rstCode: NGHTTP2_NO_ERROR,
Expand All @@ -1336,9 +1341,10 @@ class Http2Stream extends Duplex {
[kUpdateTimer]() {
if (this.destroyed)
return;
_unrefActive(this);
if (this[kTimeout])
_unrefActive([kTimeout]);
if (this[kSession])
_unrefActive(this[kSession]);
this[kSession][kUpdateTimer]();
}

[kInit](id, handle) {
Expand Down Expand Up @@ -1560,7 +1566,7 @@ class Http2Stream extends Duplex {

// Close initiates closing the Http2Stream instance by sending an RST_STREAM
// frame to the connected peer. The readable and writable sides of the
// Http2Stream duplex are closed and the timeout timer is unenrolled. If
// Http2Stream duplex are closed and the timeout timer is cleared. If
// a callback is passed, it is registered to listen for the 'close' event.
//
// If the handle and stream ID have not been assigned yet, the close
Expand All @@ -1577,8 +1583,8 @@ class Http2Stream extends Duplex {
if (code < 0 || code > kMaxInt)
throw new errors.RangeError('ERR_OUT_OF_RANGE', 'code');

// Unenroll the timeout.
unenroll(this);
// Clear timeout and remove timeout listeners
this.setTimeout(0);
this.removeAllListeners('timeout');

// Close the writable
Expand Down Expand Up @@ -1637,8 +1643,10 @@ class Http2Stream extends Duplex {
handle.destroy();
session[kState].streams.delete(id);
} else {
unenroll(this);
// Clear timeout and remove timeout listeners
this.setTimeout(0);
this.removeAllListeners('timeout');

state.flags |= STREAM_FLAGS_CLOSED;
abort(this);
this.end();
Expand Down Expand Up @@ -2216,21 +2224,24 @@ const setTimeout = {
value: function(msecs, callback) {
if (this.destroyed)
return;
if (typeof msecs !== 'number') {
throw new errors.TypeError('ERR_INVALID_ARG_TYPE',
'msecs',
'number');
}

// Type checking identical to timers.enroll()
msecs = validateTimerDuration(msecs);

// Attempt to clear an existing timer lear in both cases -
// even if it will be rescheduled we don't want to leak an existing timer.
clearTimeout(this[kTimeout]);

if (msecs === 0) {
unenroll(this);
if (callback !== undefined) {
if (typeof callback !== 'function')
throw new errors.TypeError('ERR_INVALID_CALLBACK');
this.removeListener('timeout', callback);
}
} else {
enroll(this, msecs);
this[kUpdateTimer]();
this[kTimeout] = setUnrefTimeout(this._onTimeout.bind(this), msecs);
if (this[kSession]) this[kSession][kUpdateTimer]();

if (callback !== undefined) {
if (typeof callback !== 'function')
throw new errors.TypeError('ERR_INVALID_CALLBACK');
Expand Down
131 changes: 131 additions & 0 deletions lib/internal/timers.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
'use strict';

const async_wrap = process.binding('async_wrap');
// Two arrays that share state between C++ and JS.
const { async_hook_fields, async_id_fields } = async_wrap;
const {
getDefaultTriggerAsyncId,
// The needed emit*() functions.
emitInit
} = require('internal/async_hooks');
// Grab the constants necessary for working with internal arrays.
const { kInit, kAsyncIdCounter } = async_wrap.constants;
// Symbols for storing async id state.
const async_id_symbol = Symbol('asyncId');
const trigger_async_id_symbol = Symbol('triggerId');

const errors = require('internal/errors');

// Timeout values > TIMEOUT_MAX are set to 1.
const TIMEOUT_MAX = 2 ** 31 - 1;

module.exports = {
TIMEOUT_MAX,
kTimeout: Symbol('timeout'), // For hiding Timeouts on other internals.
async_id_symbol,
trigger_async_id_symbol,
Timeout,
setUnrefTimeout,
validateTimerDuration
};

// Timer constructor function.
// The entire prototype is defined in lib/timers.js
function Timeout(callback, after, args, isRepeat) {
after *= 1; // coalesce to number or NaN
if (!(after >= 1 && after <= TIMEOUT_MAX)) {
if (after > TIMEOUT_MAX) {
process.emitWarning(`${after} does not fit into` +
' a 32-bit signed integer.' +
'\nTimeout duration was set to 1.',
'TimeoutOverflowWarning');
}
after = 1; // schedule on next tick, follows browser behavior
}

this._called = false;
this._idleTimeout = after;
this._idlePrev = this;
this._idleNext = this;
this._idleStart = null;
// this must be set to null first to avoid function tracking
// on the hidden class, revisit in V8 versions after 6.2
this._onTimeout = null;
this._onTimeout = callback;
this._timerArgs = args;
this._repeat = isRepeat ? after : null;
this._destroyed = false;

this[async_id_symbol] = ++async_id_fields[kAsyncIdCounter];
this[trigger_async_id_symbol] = getDefaultTriggerAsyncId();
if (async_hook_fields[kInit] > 0) {
emitInit(this[async_id_symbol],
'Timeout',
this[trigger_async_id_symbol],
this);
}
}

var timers;
function getTimers() {
if (timers === undefined) {
timers = require('timers');
}
return timers;
}

function setUnrefTimeout(callback, after, arg1, arg2, arg3) {
// Type checking identical to setTimeout()
if (typeof callback !== 'function') {
throw new errors.TypeError('ERR_INVALID_CALLBACK');
}

let i, args;
switch (arguments.length) {
// fast cases
case 1:
case 2:
break;
case 3:
args = [arg1];
break;
case 4:
args = [arg1, arg2];
break;
default:
args = [arg1, arg2, arg3];
for (i = 5; i < arguments.length; i++) {
// extend array dynamically, makes .apply run much faster in v6.0.0
args[i - 2] = arguments[i];
}
break;
}

const timer = new Timeout(callback, after, args, false);
getTimers()._unrefActive(timer);

return timer;
}

// Type checking used by timers.enroll() and Socket#setTimeout()
function validateTimerDuration(msecs) {
if (typeof msecs !== 'number') {
throw new errors.TypeError('ERR_INVALID_ARG_TYPE', 'msecs',
'number', msecs);
}

if (msecs < 0 || !isFinite(msecs)) {
throw new errors.RangeError('ERR_VALUE_OUT_OF_RANGE', 'msecs',
'a non-negative finite number', msecs);
}

// Ensure that msecs fits into signed int32
if (msecs > TIMEOUT_MAX) {
process.emitWarning(`${msecs} does not fit into a 32-bit signed integer.` +
`\nTimer duration was truncated to ${TIMEOUT_MAX}.`,
'TimeoutOverflowWarning');
return TIMEOUT_MAX;
}

return msecs;
}
31 changes: 24 additions & 7 deletions lib/net.js
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,12 @@ var cluster = null;
const errnoException = util._errnoException;
const exceptionWithHostPort = util._exceptionWithHostPort;

const {
kTimeout,
setUnrefTimeout,
validateTimerDuration
} = require('internal/timers');

function noop() {}

function createHandle(fd, is_server) {
Expand Down Expand Up @@ -201,6 +207,7 @@ function Socket(options) {
this._parent = null;
this._host = null;
this[kLastWriteQueueSize] = 0;
this[kTimeout] = null;

if (typeof options === 'number')
options = { fd: options }; // Legacy interface.
Expand Down Expand Up @@ -272,9 +279,12 @@ function Socket(options) {
}
util.inherits(Socket, stream.Duplex);

// Refresh existing timeouts.
Socket.prototype._unrefTimer = function _unrefTimer() {
for (var s = this; s !== null; s = s._parent)
timers._unrefActive(s);
for (var s = this; s !== null; s = s._parent) {
if (s[kTimeout])
timers._unrefActive(s[kTimeout]);
}
};


Expand Down Expand Up @@ -387,14 +397,20 @@ Socket.prototype.read = function(n) {
};

Socket.prototype.setTimeout = function(msecs, callback) {
// Type checking identical to timers.enroll()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might be enough code to factor it out into some internal function, wdyt?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might be a good idea yeah.

msecs = validateTimerDuration(msecs);

// Attempt to clear an existing timer lear in both cases -
// even if it will be rescheduled we don't want to leak an existing timer.
clearTimeout(this[kTimeout]);

if (msecs === 0) {
timers.unenroll(this);
if (callback) {
this.removeListener('timeout', callback);
}
} else {
timers.enroll(this, msecs);
timers._unrefActive(this);
this[kTimeout] = setUnrefTimeout(this._onTimeout.bind(this), msecs);

if (callback) {
this.once('timeout', callback);
}
Expand Down Expand Up @@ -551,8 +567,9 @@ Socket.prototype._destroy = function(exception, cb) {

this.readable = this.writable = false;

for (var s = this; s !== null; s = s._parent)
timers.unenroll(s);
for (var s = this; s !== null; s = s._parent) {
clearTimeout(s[kTimeout]);
}

debug('close');
if (this._handle) {
Expand Down
Loading