Skip to content

Commit bb1557b

Browse files
committed
stream: cleanup async handling
Cleanup async stream method handling.
1 parent a5ba28d commit bb1557b

File tree

3 files changed

+119
-135
lines changed

3 files changed

+119
-135
lines changed

lib/internal/streams/destroy.js

Lines changed: 62 additions & 107 deletions
Original file line numberDiff line numberDiff line change
@@ -68,34 +68,39 @@ function destroy(err, cb) {
6868
return this;
6969
}
7070

71-
function _destroy(self, err, cb) {
72-
let called = false;
73-
const result = self._destroy(err || null, (err) => {
74-
const r = self._readableState;
75-
const w = self._writableState;
71+
function onDestroy(self, err, cb) {
72+
const r = self._readableState;
73+
const w = self._writableState;
7674

77-
called = true;
75+
checkError(err, w, r);
7876

79-
checkError(err, w, r);
77+
if (w) {
78+
w.closed = true;
79+
}
80+
if (r) {
81+
r.closed = true;
82+
}
8083

81-
if (w) {
82-
w.closed = true;
83-
}
84-
if (r) {
85-
r.closed = true;
86-
}
84+
if (typeof cb === 'function') {
85+
cb(err);
86+
}
8787

88-
if (typeof cb === 'function') {
89-
cb(err);
90-
}
88+
if (err) {
89+
process.nextTick(emitErrorCloseNT, self, err);
90+
} else {
91+
process.nextTick(emitCloseNT, self);
92+
}
93+
}
9194

92-
if (err) {
93-
process.nextTick(emitErrorCloseNT, self, err);
94-
} else {
95-
process.nextTick(emitCloseNT, self);
96-
}
95+
function _destroy(self, err, cb) {
96+
let called = false;
97+
const result = self._destroy(err || null, (err) => {
98+
if (called)
99+
return;
100+
called = true;
101+
onDestroy(self, err, cb);
97102
});
98-
if (result !== undefined && result !== null) {
103+
if (result != null) {
99104
try {
100105
const then = result.then;
101106
if (typeof then === 'function') {
@@ -104,49 +109,14 @@ function _destroy(self, err, cb) {
104109
function() {
105110
if (called)
106111
return;
107-
108-
const r = self._readableState;
109-
const w = self._writableState;
110-
111-
if (w) {
112-
w.closed = true;
113-
}
114-
if (r) {
115-
r.closed = true;
116-
}
117-
118-
if (typeof cb === 'function') {
119-
process.nextTick(cb);
120-
}
121-
122-
process.nextTick(emitCloseNT, self);
112+
called = true;
113+
onDestroy(self, null, cb);
123114
},
124115
function(err) {
125-
const r = self._readableState;
126-
const w = self._writableState;
127-
err.stack; // eslint-disable-line no-unused-expressions
128-
116+
if (called)
117+
return;
129118
called = true;
130-
131-
if (w && !w.errored) {
132-
w.errored = err;
133-
}
134-
if (r && !r.errored) {
135-
r.errored = err;
136-
}
137-
138-
if (w) {
139-
w.closed = true;
140-
}
141-
if (r) {
142-
r.closed = true;
143-
}
144-
145-
if (typeof cb === 'function') {
146-
process.nextTick(cb, err);
147-
}
148-
149-
process.nextTick(emitErrorCloseNT, self, err);
119+
onDestroy(self, err, cb);
150120
});
151121
}
152122
} catch (err) {
@@ -284,69 +254,54 @@ function construct(stream, cb) {
284254
process.nextTick(constructNT, stream);
285255
}
286256

287-
function constructNT(stream) {
257+
function onConstruct(stream, err) {
288258
const r = stream._readableState;
289259
const w = stream._writableState;
290-
// With duplex streams we use the writable side for state.
291260
const s = w || r;
292261

293-
let called = false;
294-
const result = stream._construct((err) => {
295-
if (r) {
296-
r.constructed = true;
297-
}
298-
if (w) {
299-
w.constructed = true;
300-
}
262+
if (r) {
263+
r.constructed = true;
264+
}
265+
if (w) {
266+
w.constructed = true;
267+
}
301268

302-
if (called) {
303-
err = new ERR_MULTIPLE_CALLBACK();
304-
} else {
305-
called = true;
306-
}
269+
if (s.destroyed) {
270+
stream.emit(kDestroy, err);
271+
} else if (err) {
272+
errorOrDestroy(stream, err, true);
273+
} else {
274+
process.nextTick(emitConstructNT, stream);
275+
}
276+
}
307277

308-
if (s.destroyed) {
309-
stream.emit(kDestroy, err);
310-
} else if (err) {
311-
errorOrDestroy(stream, err, true);
312-
} else {
313-
process.nextTick(emitConstructNT, stream);
314-
}
278+
function constructNT(stream) {
279+
let called = false;
280+
const result = stream._construct((err) => {
281+
if (called)
282+
err ??= ERR_MULTIPLE_CALLBACK();
283+
called = true;
284+
onConstruct(stream, err);
315285
});
316-
if (result !== undefined && result !== null) {
286+
if (result != null) {
317287
try {
318288
const then = result.then;
319289
if (typeof then === 'function') {
320290
then.call(
321291
result,
322292
function() {
323-
// If the callback was invoked, do nothing further.
324293
if (called)
325294
return;
326-
if (r) {
327-
r.constructed = true;
328-
}
329-
if (w) {
330-
w.constructed = true;
331-
}
332-
if (s.destroyed) {
333-
process.nextTick(() => stream.emit(kDestroy));
334-
} else {
335-
process.nextTick(emitConstructNT, stream);
336-
}
295+
called = true;
296+
297+
onConstruct(stream, null);
337298
},
338299
function(err) {
339-
if (r) {
340-
r.constructed = true;
341-
}
342-
if (w) {
343-
w.constructed = true;
344-
}
345-
called = true;
346-
if (s.destroyed) {
347-
process.nextTick(() => stream.emit(kDestroy, err));
300+
if (called) {
301+
errorOrDestroy(stream, err);
348302
} else {
349-
process.nextTick(errorOrDestroy, stream, err);
303+
called = true;
304+
onConstruct(stream, err);
350305
}
351306
});
352307
}

lib/internal/streams/readable.js

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -483,8 +483,25 @@ Readable.prototype.read = function(n) {
483483
// If the length is currently zero, then we *need* a readable event.
484484
if (state.length === 0)
485485
state.needReadable = true;
486+
486487
// Call internal read method
487-
this._read(state.highWaterMark);
488+
const result = this._read(state.highWaterMark);
489+
if (result != null) {
490+
try {
491+
const then = result.then;
492+
if (typeof then === 'function') {
493+
then.call(
494+
result,
495+
nop,
496+
function(err) {
497+
errorOrDestroy(this, err);
498+
});
499+
}
500+
} catch (err) {
501+
errorOrDestroy(this, err);
502+
}
503+
}
504+
488505
state.sync = false;
489506
// If _read pushed data synchronously, then `reading` will be false,
490507
// and we need to re-evaluate how much data we can return to the user.

lib/internal/streams/writable.js

Lines changed: 39 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -657,51 +657,63 @@ function needFinish(state) {
657657
!state.writing);
658658
}
659659

660+
function onFinish(stream, state, err) {
661+
state.pendingcb--;
662+
if (err) {
663+
const onfinishCallbacks = state[kOnFinished].splice(0);
664+
for (let i = 0; i < onfinishCallbacks.length; i++) {
665+
onfinishCallbacks[i](err);
666+
}
667+
errorOrDestroy(stream, err, state.sync);
668+
} else if (needFinish(state)) {
669+
state.prefinished = true;
670+
stream.emit('prefinish');
671+
// Backwards compat. Don't check state.sync here.
672+
// Some streams assume 'finish' will be emitted
673+
// asynchronously relative to _final callback.
674+
state.pendingcb++;
675+
process.nextTick(finish, stream, state);
676+
}
677+
}
678+
660679
function callFinal(stream, state) {
680+
let called = false;
681+
661682
state.sync = true;
662683
state.pendingcb++;
663684
const result = stream._final((err) => {
664-
state.pendingcb--;
665-
if (err) {
666-
const onfinishCallbacks = state[kOnFinished].splice(0);
667-
for (let i = 0; i < onfinishCallbacks.length; i++) {
668-
onfinishCallbacks[i](err);
669-
}
670-
errorOrDestroy(stream, err, state.sync);
671-
} else if (needFinish(state)) {
672-
state.prefinished = true;
673-
stream.emit('prefinish');
674-
// Backwards compat. Don't check state.sync here.
675-
// Some streams assume 'finish' will be emitted
676-
// asynchronously relative to _final callback.
677-
state.pendingcb++;
678-
process.nextTick(finish, stream, state);
685+
if (called) {
686+
err ??= ERR_MULTIPLE_CALLBACK();
687+
errorOrDestroy(stream, err);
688+
} else {
689+
called = true;
690+
onFinish(stream, state, err);
679691
}
680692
});
681-
if (result !== undefined && result !== null) {
693+
694+
if (result != null) {
682695
try {
683696
const then = result.then;
684697
if (typeof then === 'function') {
685698
then.call(
686699
result,
687700
function() {
688-
if (state.prefinished)
689-
return;
690-
state.prefinish = true;
691-
process.nextTick(() => stream.emit('prefinish'));
692-
state.pendingcb++;
693-
process.nextTick(finish, stream, state);
701+
if (called) return;
702+
called = true;
703+
onFinish(stream, state, null);
694704
},
695705
function(err) {
696-
const onfinishCallbacks = state[kOnFinished].splice(0);
697-
for (let i = 0; i < onfinishCallbacks.length; i++) {
698-
process.nextTick(onfinishCallbacks[i], err);
706+
if (called) {
707+
err ??= ERR_MULTIPLE_CALLBACK();
708+
errorOrDestroy(stream, err);
709+
} else {
710+
called = true;
711+
onFinish(stream, state, err);
699712
}
700-
process.nextTick(errorOrDestroy, stream, err, state.sync);
701713
});
702714
}
703715
} catch (err) {
704-
process.nextTick(errorOrDestroy, stream, err, state.sync);
716+
onFinish(stream, state, err);
705717
}
706718
}
707719
state.sync = false;

0 commit comments

Comments
 (0)