Skip to content

Commit 9c108be

Browse files
committed
stream: cleanup async handling
Cleanup async stream method handling.
1 parent a5ba28d commit 9c108be

File tree

3 files changed

+119
-131
lines changed

3 files changed

+119
-131
lines changed

lib/internal/streams/destroy.js

Lines changed: 61 additions & 103 deletions
Original file line numberDiff line numberDiff line change
@@ -68,34 +68,39 @@ function destroy(err, cb) {
6868
return this;
6969
}
7070

71-
function _destroy(self, err, cb) {
72-
let called = false;
73-
const result = self._destroy(err || null, (err) => {
74-
const r = self._readableState;
75-
const w = self._writableState;
71+
function onDestroy(self, err, cb) {
72+
const r = self._readableState;
73+
const w = self._writableState;
7674

77-
called = true;
75+
checkError(err, w, r);
7876

79-
checkError(err, w, r);
77+
if (w) {
78+
w.closed = true;
79+
}
80+
if (r) {
81+
r.closed = true;
82+
}
8083

81-
if (w) {
82-
w.closed = true;
83-
}
84-
if (r) {
85-
r.closed = true;
86-
}
84+
if (typeof cb === 'function') {
85+
cb(err);
86+
}
8787

88-
if (typeof cb === 'function') {
89-
cb(err);
90-
}
88+
if (err) {
89+
process.nextTick(emitErrorCloseNT, self, err);
90+
} else {
91+
process.nextTick(emitCloseNT, self);
92+
}
93+
}
9194

92-
if (err) {
93-
process.nextTick(emitErrorCloseNT, self, err);
94-
} else {
95-
process.nextTick(emitCloseNT, self);
96-
}
95+
function _destroy(self, err, cb) {
96+
let called = false;
97+
const result = self._destroy(err || null, (err) => {
98+
if (called)
99+
return;
100+
called = true;
101+
onDestroy(self, err, cb);
97102
});
98-
if (result !== undefined && result !== null) {
103+
if (result != null) {
99104
try {
100105
const then = result.then;
101106
if (typeof then === 'function') {
@@ -104,49 +109,14 @@ function _destroy(self, err, cb) {
104109
function() {
105110
if (called)
106111
return;
107-
108-
const r = self._readableState;
109-
const w = self._writableState;
110-
111-
if (w) {
112-
w.closed = true;
113-
}
114-
if (r) {
115-
r.closed = true;
116-
}
117-
118-
if (typeof cb === 'function') {
119-
process.nextTick(cb);
120-
}
121-
122-
process.nextTick(emitCloseNT, self);
112+
called = true;
113+
onDestroy(self, null, cb);
123114
},
124115
function(err) {
125-
const r = self._readableState;
126-
const w = self._writableState;
127-
err.stack; // eslint-disable-line no-unused-expressions
128-
116+
if (called)
117+
return;
129118
called = true;
130-
131-
if (w && !w.errored) {
132-
w.errored = err;
133-
}
134-
if (r && !r.errored) {
135-
r.errored = err;
136-
}
137-
138-
if (w) {
139-
w.closed = true;
140-
}
141-
if (r) {
142-
r.closed = true;
143-
}
144-
145-
if (typeof cb === 'function') {
146-
process.nextTick(cb, err);
147-
}
148-
149-
process.nextTick(emitErrorCloseNT, self, err);
119+
onDestroy(self, err, cb);
150120
});
151121
}
152122
} catch (err) {
@@ -284,69 +254,57 @@ function construct(stream, cb) {
284254
process.nextTick(constructNT, stream);
285255
}
286256

287-
function constructNT(stream) {
257+
function onConstruct(stream, err) {
288258
const r = stream._readableState;
289259
const w = stream._writableState;
290-
// With duplex streams we use the writable side for state.
291260
const s = w || r;
292261

262+
if (r) {
263+
r.constructed = true;
264+
}
265+
if (w) {
266+
w.constructed = true;
267+
}
268+
269+
if (s.destroyed) {
270+
stream.emit(kDestroy, err);
271+
} else if (err) {
272+
errorOrDestroy(stream, err, true);
273+
} else {
274+
process.nextTick(emitConstructNT, stream);
275+
}
276+
}
277+
278+
function constructNT(stream) {
293279
let called = false;
294280
const result = stream._construct((err) => {
295-
if (r) {
296-
r.constructed = true;
297-
}
298-
if (w) {
299-
w.constructed = true;
300-
}
301-
302281
if (called) {
303-
err = new ERR_MULTIPLE_CALLBACK();
282+
errorOrDestroy(stream, err ?? new ERR_MULTIPLE_CALLBACK());
304283
} else {
305284
called = true;
306-
}
307-
308-
if (s.destroyed) {
309-
stream.emit(kDestroy, err);
310-
} else if (err) {
311-
errorOrDestroy(stream, err, true);
312-
} else {
313-
process.nextTick(emitConstructNT, stream);
285+
onConstruct(stream, err);
314286
}
315287
});
316-
if (result !== undefined && result !== null) {
288+
if (result != null) {
317289
try {
318290
const then = result.then;
319291
if (typeof then === 'function') {
320292
then.call(
321293
result,
322294
function() {
323-
// If the callback was invoked, do nothing further.
324-
if (called)
325-
return;
326-
if (r) {
327-
r.constructed = true;
328-
}
329-
if (w) {
330-
w.constructed = true;
331-
}
332-
if (s.destroyed) {
333-
process.nextTick(() => stream.emit(kDestroy));
295+
if (called) {
296+
errorOrDestroy(stream, new ERR_MULTIPLE_CALLBACK());
334297
} else {
335-
process.nextTick(emitConstructNT, stream);
298+
called = true;
299+
onConstruct(stream, null);
336300
}
337301
},
338302
function(err) {
339-
if (r) {
340-
r.constructed = true;
341-
}
342-
if (w) {
343-
w.constructed = true;
344-
}
345-
called = true;
346-
if (s.destroyed) {
347-
process.nextTick(() => stream.emit(kDestroy, err));
303+
if (called) {
304+
errorOrDestroy(stream, err);
348305
} else {
349-
process.nextTick(errorOrDestroy, stream, err);
306+
called = true;
307+
onConstruct(stream, err);
350308
}
351309
});
352310
}

lib/internal/streams/readable.js

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -483,8 +483,25 @@ Readable.prototype.read = function(n) {
483483
// If the length is currently zero, then we *need* a readable event.
484484
if (state.length === 0)
485485
state.needReadable = true;
486+
486487
// Call internal read method
487-
this._read(state.highWaterMark);
488+
const result = this._read(state.highWaterMark);
489+
if (result != null) {
490+
try {
491+
const then = result.then;
492+
if (typeof then === 'function') {
493+
then.call(
494+
result,
495+
nop,
496+
function(err) {
497+
errorOrDestroy(this, err);
498+
});
499+
}
500+
} catch (err) {
501+
errorOrDestroy(this, err);
502+
}
503+
}
504+
488505
state.sync = false;
489506
// If _read pushed data synchronously, then `reading` will be false,
490507
// and we need to re-evaluate how much data we can return to the user.

lib/internal/streams/writable.js

Lines changed: 40 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -657,51 +657,64 @@ function needFinish(state) {
657657
!state.writing);
658658
}
659659

660+
function onFinish(stream, state, err) {
661+
state.pendingcb--;
662+
if (err) {
663+
const onfinishCallbacks = state[kOnFinished].splice(0);
664+
for (let i = 0; i < onfinishCallbacks.length; i++) {
665+
onfinishCallbacks[i](err);
666+
}
667+
errorOrDestroy(stream, err, state.sync);
668+
} else if (needFinish(state)) {
669+
state.prefinished = true;
670+
stream.emit('prefinish');
671+
// Backwards compat. Don't check state.sync here.
672+
// Some streams assume 'finish' will be emitted
673+
// asynchronously relative to _final callback.
674+
state.pendingcb++;
675+
process.nextTick(finish, stream, state);
676+
}
677+
}
678+
660679
function callFinal(stream, state) {
680+
let called = false;
681+
661682
state.sync = true;
662683
state.pendingcb++;
663684
const result = stream._final((err) => {
664-
state.pendingcb--;
665-
if (err) {
666-
const onfinishCallbacks = state[kOnFinished].splice(0);
667-
for (let i = 0; i < onfinishCallbacks.length; i++) {
668-
onfinishCallbacks[i](err);
669-
}
670-
errorOrDestroy(stream, err, state.sync);
671-
} else if (needFinish(state)) {
672-
state.prefinished = true;
673-
stream.emit('prefinish');
674-
// Backwards compat. Don't check state.sync here.
675-
// Some streams assume 'finish' will be emitted
676-
// asynchronously relative to _final callback.
677-
state.pendingcb++;
678-
process.nextTick(finish, stream, state);
685+
if (called) {
686+
errorOrDestroy(stream, err ?? ERR_MULTIPLE_CALLBACK());
687+
} else {
688+
called = true;
689+
onFinish(stream, state, err);
679690
}
680691
});
681-
if (result !== undefined && result !== null) {
692+
693+
if (result != null) {
682694
try {
683695
const then = result.then;
684696
if (typeof then === 'function') {
685697
then.call(
686698
result,
687699
function() {
688-
if (state.prefinished)
689-
return;
690-
state.prefinish = true;
691-
process.nextTick(() => stream.emit('prefinish'));
692-
state.pendingcb++;
693-
process.nextTick(finish, stream, state);
700+
if (called) {
701+
errorOrDestroy(stream, new ERR_MULTIPLE_CALLBACK());
702+
} else {
703+
called = true;
704+
onFinish(stream, state, null);
705+
}
694706
},
695707
function(err) {
696-
const onfinishCallbacks = state[kOnFinished].splice(0);
697-
for (let i = 0; i < onfinishCallbacks.length; i++) {
698-
process.nextTick(onfinishCallbacks[i], err);
708+
if (called) {
709+
errorOrDestroy(stream, err);
710+
} else {
711+
called = true;
712+
onFinish(stream, state, err);
699713
}
700-
process.nextTick(errorOrDestroy, stream, err, state.sync);
701714
});
702715
}
703716
} catch (err) {
704-
process.nextTick(errorOrDestroy, stream, err, state.sync);
717+
onFinish(stream, state, err);
705718
}
706719
}
707720
state.sync = false;

0 commit comments

Comments
 (0)