Skip to content

[v12.x] http2: use and support non-empty DATA frame with END_STREAM flag #34845

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions doc/api/http2.md
Original file line number Diff line number Diff line change
Expand Up @@ -927,8 +927,9 @@ the value is `undefined`, the stream is not yet ready for use.
All [`Http2Stream`][] instances are destroyed either when:

* An `RST_STREAM` frame for the stream is received by the connected peer,
and pending data has been read.
* The `http2stream.close()` method is called, and pending data has been read.
and (for client streams only) pending data has been read.
* The `http2stream.close()` method is called, and (for client streams only)
pending data has been read.
* The `http2stream.destroy()` or `http2session.destroy()` methods are called.

When an `Http2Stream` instance is destroyed, an attempt will be made to send an
Expand Down
215 changes: 155 additions & 60 deletions lib/internal/http2/core.js
Original file line number Diff line number Diff line change
Expand Up @@ -499,7 +499,10 @@ function onStreamClose(code) {
if (!stream || stream.destroyed)
return false;

debugStreamObj(stream, 'closed with code %d', code);
debugStreamObj(
stream, 'closed with code %d, closed %s, readable %s',
code, stream.closed, stream.readable
);

if (!stream.closed)
closeStream(stream, code, kNoRstStream);
Expand All @@ -508,7 +511,7 @@ function onStreamClose(code) {
// Defer destroy we actually emit end.
if (!stream.readable || code !== NGHTTP2_NO_ERROR) {
// If errored or ended, we can destroy immediately.
stream[kMaybeDestroy](code);
stream.destroy();
} else {
// Wait for end to destroy.
stream.on('end', stream[kMaybeDestroy]);
Expand Down Expand Up @@ -1019,20 +1022,76 @@ function emitClose(self, error) {
self.emit('close');
}

function finishSessionDestroy(session, error) {
function cleanupSession(session) {
const socket = session[kSocket];
if (!socket.destroyed)
socket.destroy(error);

const handle = session[kHandle];
session[kProxySocket] = undefined;
session[kSocket] = undefined;
session[kHandle] = undefined;
session[kNativeFields] = new Uint8Array(kSessionUint8FieldCount);
socket[kSession] = undefined;
socket[kServer] = undefined;
if (handle)
handle.ondone = null;
if (socket) {
socket[kSession] = undefined;
socket[kServer] = undefined;
}
}

function finishSessionClose(session, error) {
debugSessionObj(session, 'finishSessionClose');

const socket = session[kSocket];
cleanupSession(session);

if (socket && !socket.destroyed) {
// Always wait for writable side to finish.
socket.end((err) => {
debugSessionObj(session, 'finishSessionClose socket end', err);
// Due to the way the underlying stream is handled in Http2Session we
// won't get graceful Readable end from the other side even if it was sent
// as the stream is already considered closed and will neither be read
// from nor keep the event loop alive.
// Therefore destroy the socket immediately.
// Fixing this would require some heavy juggling of ReadStart/ReadStop
// mostly on Windows as on Unix it will be fine with just ReadStart
// after this 'ondone' callback.
socket.destroy(error);
emitClose(session, error);
});
} else {
process.nextTick(emitClose, session, error);
}
}

function closeSession(session, code, error) {
debugSessionObj(session, 'start closing/destroying');

const state = session[kState];
state.flags |= SESSION_FLAGS_DESTROYED;
state.destroyCode = code;

// Finally, emit the close and error events (if necessary) on next tick.
process.nextTick(emitClose, session, error);
// Clear timeout and remove timeout listeners.
session.setTimeout(0);
session.removeAllListeners('timeout');

// Destroy any pending and open streams
if (state.pendingStreams.size > 0 || state.streams.size > 0) {
const cancel = new ERR_HTTP2_STREAM_CANCEL(error);
state.pendingStreams.forEach((stream) => stream.destroy(cancel));
state.streams.forEach((stream) => stream.destroy(error));
}

// Disassociate from the socket and server.
const socket = session[kSocket];
const handle = session[kHandle];

// Destroy the handle if it exists at this point.
if (handle !== undefined) {
handle.ondone = finishSessionClose.bind(null, session, error);
handle.destroy(code, socket.destroyed);
} else {
finishSessionClose(session, error);
}
}

// Upon creation, the Http2Session takes ownership of the socket. The session
Expand Down Expand Up @@ -1097,6 +1156,7 @@ class Http2Session extends EventEmitter {
streams: new Map(),
pendingStreams: new Set(),
pendingAck: 0,
shutdownWritableCalled: false,
writeQueueSize: 0,
originSet: undefined
};
Expand Down Expand Up @@ -1359,6 +1419,7 @@ class Http2Session extends EventEmitter {
destroy(error = NGHTTP2_NO_ERROR, code) {
if (this.destroyed)
return;

debugSessionObj(this, 'destroying');

if (typeof error === 'number') {
Expand All @@ -1370,34 +1431,7 @@ class Http2Session extends EventEmitter {
if (code === undefined && error != null)
code = NGHTTP2_INTERNAL_ERROR;

const state = this[kState];
state.flags |= SESSION_FLAGS_DESTROYED;
state.destroyCode = code;

// Clear timeout and remove timeout listeners
this.setTimeout(0);
this.removeAllListeners('timeout');

// Destroy any pending and open streams
const cancel = new ERR_HTTP2_STREAM_CANCEL(error);
state.pendingStreams.forEach((stream) => stream.destroy(cancel));
state.streams.forEach((stream) => stream.destroy(error));

// Disassociate from the socket and server
const socket = this[kSocket];
const handle = this[kHandle];

// Destroy the handle if it exists at this point
if (handle !== undefined)
handle.destroy(code, socket.destroyed);

// If the socket is alive, use setImmediate to destroy the session on the
// next iteration of the event loop in order to give data time to transmit.
// Otherwise, destroy immediately.
if (!socket.destroyed)
setImmediate(finishSessionDestroy, this, error);
else
finishSessionDestroy(this, error);
closeSession(this, code, error);
}

// Closing the session will:
Expand Down Expand Up @@ -1689,6 +1723,26 @@ function afterShutdown(status) {
stream[kMaybeDestroy]();
}

function shutdownWritable(callback) {
const handle = this[kHandle];
if (!handle) return callback();
const state = this[kState];
if (state.shutdownWritableCalled) {
// Backport v12.x: Session required for debugging stream object
// debugStreamObj(this, 'shutdownWritable() already called');
return callback();
}
state.shutdownWritableCalled = true;

const req = new ShutdownWrap();
req.oncomplete = afterShutdown;
req.callback = callback;
req.handle = handle;
const err = handle.shutdown(req);
if (err === 1) // synchronous finish
return afterShutdown.call(req, 0);
}

function finishSendTrailers(stream, headersList) {
// The stream might be destroyed and in that case
// there is nothing to do.
Expand Down Expand Up @@ -1948,10 +2002,50 @@ class Http2Stream extends Duplex {

let req;

let waitingForWriteCallback = true;
let waitingForEndCheck = true;
let writeCallbackErr;
let endCheckCallbackErr;
const done = () => {
if (waitingForEndCheck || waitingForWriteCallback) return;
const err = writeCallbackErr || endCheckCallbackErr;
// writeGeneric does not destroy on error and
// we cannot enable autoDestroy,
// so make sure to destroy on error.
if (err) {
this.destroy(err);
}
cb(err);
};
const writeCallback = (err) => {
waitingForWriteCallback = false;
writeCallbackErr = err;
done();
};
const endCheckCallback = (err) => {
waitingForEndCheck = false;
endCheckCallbackErr = err;
done();
};
// Shutdown write stream right after last chunk is sent
// so final DATA frame can include END_STREAM flag
process.nextTick(() => {
if (writeCallbackErr ||
!this._writableState.ending ||
// Backport v12.x: _writableState.buffered does not exist
// this._writableState.buffered.length ||
this._writableState.bufferedRequest ||
(this[kState].flags & STREAM_FLAGS_HAS_TRAILERS))
return endCheckCallback();
// Backport v12.x: Session required for debugging stream object
// debugStreamObj(this, 'shutting down writable on last write');
shutdownWritable.call(this, endCheckCallback);
});

if (writev)
req = writevGeneric(this, data, cb);
req = writevGeneric(this, data, writeCallback);
else
req = writeGeneric(this, data, encoding, cb);
req = writeGeneric(this, data, encoding, writeCallback);

trackWriteState(this, req.bytes);
}
Expand All @@ -1965,21 +2059,13 @@ class Http2Stream extends Duplex {
}

_final(cb) {
const handle = this[kHandle];
if (this.pending) {
this.once('ready', () => this._final(cb));
} else if (handle !== undefined) {
debugStreamObj(this, '_final shutting down');
const req = new ShutdownWrap();
req.oncomplete = afterShutdown;
req.callback = cb;
req.handle = handle;
const err = handle.shutdown(req);
if (err === 1) // synchronous finish
return afterShutdown.call(req, 0);
} else {
cb();
return;
}
// Backport v12.x: Session required for debugging stream object
// debugStreamObj(this, 'shutting down writable on _final');
shutdownWritable.call(this, cb);
}

_read(nread) {
Expand Down Expand Up @@ -2084,11 +2170,20 @@ class Http2Stream extends Duplex {
debugStream(this[kID] || 'pending', session[kType], 'destroying stream');

const state = this[kState];
const sessionCode = session[kState].goawayCode ||
session[kState].destroyCode;
const code = err != null ?
sessionCode || NGHTTP2_INTERNAL_ERROR :
state.rstCode || sessionCode;
const sessionState = session[kState];
const sessionCode = sessionState.goawayCode || sessionState.destroyCode;

// If a stream has already closed successfully, there is no error
// to report from this stream, even if the session has errored.
// This can happen if the stream was already in process of destroying
// after a successful close, but the session had a error between
// this stream's close and destroy operations.
// Previously, this always overrode a successful close operation code
// NGHTTP2_NO_ERROR (0) with sessionCode because the use of the || operator.
const code = (err != null ?
(sessionCode || NGHTTP2_INTERNAL_ERROR) :
(this.closed ? this.rstCode : sessionCode)
);
const hasHandle = handle !== undefined;

if (!this.closed)
Expand All @@ -2097,13 +2192,13 @@ class Http2Stream extends Duplex {

if (hasHandle) {
handle.destroy();
session[kState].streams.delete(id);
sessionState.streams.delete(id);
} else {
session[kState].pendingStreams.delete(this);
sessionState.pendingStreams.delete(this);
}

// Adjust the write queue size for accounting
session[kState].writeQueueSize -= state.writeQueueSize;
sessionState.writeQueueSize -= state.writeQueueSize;
state.writeQueueSize = 0;

// RST code 8 not emitted as an error as its used by clients to signify
Expand Down
Loading