Skip to content

Commit 6a296a4

Browse files
ronagrichardlau
authored andcommitted
stream: simpler and faster Readable async iterator
Reimplement as an async generator instead of a custom iterator class. Backport-PR-URL: #34887 PR-URL: #34035 Refs: #34680 Reviewed-By: Matteo Collina <[email protected]> Reviewed-By: Benjamin Gruenbaum <[email protected]> Reviewed-By: James M Snell <[email protected]>
1 parent 51a1a91 commit 6a296a4

File tree

7 files changed

+121
-269
lines changed

7 files changed

+121
-269
lines changed
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
'use strict';
2+
3+
const common = require('../common');
4+
const Readable = require('stream').Readable;
5+
6+
const bench = common.createBenchmark(main, {
7+
n: [1e5],
8+
sync: ['yes', 'no'],
9+
});
10+
11+
async function main({ n, sync }) {
12+
sync = sync === 'yes';
13+
14+
const s = new Readable({
15+
objectMode: true,
16+
read() {
17+
if (sync) {
18+
this.push(1);
19+
} else {
20+
process.nextTick(() => {
21+
this.push(1);
22+
});
23+
}
24+
}
25+
});
26+
27+
bench.start();
28+
29+
let x = 0;
30+
for await (const chunk of s) {
31+
x += chunk;
32+
if (x > n) {
33+
break;
34+
}
35+
}
36+
37+
bench.end(n);
38+
}

lib/_stream_readable.js

Lines changed: 61 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ const {
2727
NumberIsNaN,
2828
ObjectDefineProperties,
2929
ObjectSetPrototypeOf,
30+
Promise,
3031
Set,
3132
SymbolAsyncIterator,
3233
Symbol
@@ -59,11 +60,11 @@ const kPaused = Symbol('kPaused');
5960

6061
// Lazy loaded to improve the startup performance.
6162
let StringDecoder;
62-
let createReadableStreamAsyncIterator;
6363
let from;
6464

6565
ObjectSetPrototypeOf(Readable.prototype, Stream.prototype);
6666
ObjectSetPrototypeOf(Readable, Stream);
67+
function nop() {}
6768

6869
const { errorOrDestroy } = destroyImpl;
6970

@@ -1075,13 +1076,68 @@ Readable.prototype.wrap = function(stream) {
10751076
};
10761077

10771078
Readable.prototype[SymbolAsyncIterator] = function() {
1078-
if (createReadableStreamAsyncIterator === undefined) {
1079-
createReadableStreamAsyncIterator =
1080-
require('internal/streams/async_iterator');
1079+
let stream = this;
1080+
1081+
if (typeof stream.read !== 'function') {
1082+
// v1 stream
1083+
const src = stream;
1084+
stream = new Readable({
1085+
objectMode: true,
1086+
destroy(err, callback) {
1087+
destroyImpl.destroyer(src, err);
1088+
callback();
1089+
}
1090+
}).wrap(src);
10811091
}
1082-
return createReadableStreamAsyncIterator(this);
1092+
1093+
const iter = createAsyncIterator(stream);
1094+
iter.stream = stream;
1095+
return iter;
10831096
};
10841097

1098+
async function* createAsyncIterator(stream) {
1099+
let callback = nop;
1100+
1101+
function next(resolve) {
1102+
if (this === stream) {
1103+
callback();
1104+
callback = nop;
1105+
} else {
1106+
callback = resolve;
1107+
}
1108+
}
1109+
1110+
stream
1111+
.on('readable', next)
1112+
.on('error', next)
1113+
.on('end', next)
1114+
.on('close', next);
1115+
1116+
try {
1117+
const state = stream._readableState;
1118+
while (true) {
1119+
const chunk = stream.read();
1120+
if (chunk !== null) {
1121+
yield chunk;
1122+
} else if (state.errored) {
1123+
throw state.errored;
1124+
} else if (state.ended) {
1125+
break;
1126+
} else if (state.closed) {
1127+
// TODO(ronag): ERR_PREMATURE_CLOSE?
1128+
break;
1129+
} else {
1130+
await new Promise(next);
1131+
}
1132+
}
1133+
} catch (err) {
1134+
destroyImpl.destroyer(stream, err);
1135+
throw err;
1136+
} finally {
1137+
destroyImpl.destroyer(stream, null);
1138+
}
1139+
}
1140+
10851141
// Making it explicit these properties are not enumerable
10861142
// because otherwise some prototype manipulation in
10871143
// userland will fail.

lib/internal/streams/async_iterator.js

Lines changed: 0 additions & 221 deletions
This file was deleted.

lib/internal/streams/pipeline.js

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ const {
2323

2424
let EE;
2525
let PassThrough;
26-
let createReadableStreamAsyncIterator;
26+
let Readable;
2727

2828
function destroyer(stream, reading, writing, callback) {
2929
callback = once(callback);
@@ -113,11 +113,11 @@ function makeAsyncIterable(val) {
113113
}
114114

115115
async function* fromReadable(val) {
116-
if (!createReadableStreamAsyncIterator) {
117-
createReadableStreamAsyncIterator =
118-
require('internal/streams/async_iterator');
116+
if (!Readable) {
117+
Readable = require('_stream_readable');
119118
}
120-
yield* createReadableStreamAsyncIterator(val);
119+
120+
yield* Readable.prototype[SymbolAsyncIterator].call(val);
121121
}
122122

123123
async function pump(iterable, writable, finish) {

node.gyp

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -222,7 +222,6 @@
222222
'lib/internal/worker/js_transferable.js',
223223
'lib/internal/watchdog.js',
224224
'lib/internal/streams/lazy_transform.js',
225-
'lib/internal/streams/async_iterator.js',
226225
'lib/internal/streams/buffer_list.js',
227226
'lib/internal/streams/duplexpair.js',
228227
'lib/internal/streams/from.js',

test/parallel/test-readline-async-iterators-destroy.js

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@ async function testMutualDestroy() {
7575
break;
7676
}
7777
assert.deepStrictEqual(iteratedLines, expectedLines);
78+
break;
7879
}
7980

8081
assert.deepStrictEqual(iteratedLines, expectedLines);

0 commit comments

Comments
 (0)