|
2 | 2 |
|
3 | 3 | const pipeline = require('internal/streams/pipeline'); |
4 | 4 | const Duplex = require('internal/streams/duplex'); |
5 | | -const { createDeferredPromise } = require('internal/util'); |
6 | 5 | const { destroyer } = require('internal/streams/destroy'); |
7 | | -const from = require('internal/streams/from'); |
8 | 6 | const { |
9 | | - isIterable, |
10 | | - isReadableNodeStream, |
11 | | - isWritableNodeStream, |
12 | 7 | isNodeStream, |
13 | | - isDestroyed, |
14 | | - isReadableFinished, |
15 | | - isWritableEnded, |
16 | 8 | } = require('internal/streams/utils'); |
17 | | -const { |
18 | | - PromiseResolve, |
19 | | -} = primordials; |
20 | 9 | const { |
21 | 10 | AbortError, |
22 | 11 | codes: { |
23 | | - ERR_INVALID_ARG_TYPE, |
24 | 12 | ERR_INVALID_ARG_VALUE, |
25 | | - ERR_INVALID_RETURN_VALUE, |
26 | 13 | ERR_MISSING_ARGS, |
27 | 14 | }, |
28 | 15 | } = require('internal/errors'); |
29 | | -const assert = require('internal/assert'); |
30 | 16 |
|
31 | 17 | function isReadable(stream) { |
32 | 18 | const r = isReadableNodeStream(stream); |
@@ -70,18 +56,18 @@ module.exports = function compose(...streams) { |
70 | 56 | } |
71 | 57 |
|
72 | 58 | if (streams.length === 1) { |
73 | | - return makeDuplex(streams[0], 'streams[0]'); |
| 59 | + return Duplex.from(streams[0]); |
74 | 60 | } |
75 | 61 |
|
76 | 62 | const orgStreams = [...streams]; |
77 | 63 |
|
78 | 64 | if (typeof streams[0] === 'function') { |
79 | | - streams[0] = makeDuplex(streams[0], 'streams[0]'); |
| 65 | + streams[0] = Duplex.from(streams[0]); |
80 | 66 | } |
81 | 67 |
|
82 | 68 | if (typeof streams[streams.length - 1] === 'function') { |
83 | 69 | const idx = streams.length - 1; |
84 | | - streams[idx] = makeDuplex(streams[idx], `streams[${idx}]`); |
| 70 | + streams[idx] = Duplex.from(streams[idx]); |
85 | 71 | } |
86 | 72 |
|
87 | 73 | for (let n = 0; n < streams.length; ++n) { |
@@ -134,7 +120,7 @@ module.exports = function compose(...streams) { |
134 | 120 | // Implement Writable/Readable/Duplex traits. |
135 | 121 | // See, https://github.com/nodejs/node/pull/33515. |
136 | 122 | d = new ComposeDuplex({ |
137 | | - highWaterMark: 1, |
| 123 | + // TODO (ronag): highWaterMark? |
138 | 124 | writableObjectMode: !!head?.writableObjectMode, |
139 | 125 | readableObjectMode: !!tail?.writableObjectMode, |
140 | 126 | writable, |
@@ -220,82 +206,3 @@ module.exports = function compose(...streams) { |
220 | 206 |
|
221 | 207 | return d; |
222 | 208 | }; |
223 | | - |
224 | | -function makeDuplex(stream, name) { |
225 | | - let ret; |
226 | | - if (typeof stream === 'function') { |
227 | | - assert(stream.length > 0); |
228 | | - |
229 | | - const { value, write, final } = fromAsyncGen(stream); |
230 | | - |
231 | | - if (isIterable(value)) { |
232 | | - ret = from(ComposeDuplex, value, { |
233 | | - objectMode: true, |
234 | | - highWaterMark: 1, |
235 | | - write, |
236 | | - final |
237 | | - }); |
238 | | - } else if (typeof value?.then === 'function') { |
239 | | - const promise = PromiseResolve(value) |
240 | | - .then((val) => { |
241 | | - if (val != null) { |
242 | | - throw new ERR_INVALID_RETURN_VALUE('nully', name, val); |
243 | | - } |
244 | | - }) |
245 | | - .catch((err) => { |
246 | | - destroyer(ret, err); |
247 | | - }); |
248 | | - |
249 | | - ret = new ComposeDuplex({ |
250 | | - objectMode: true, |
251 | | - highWaterMark: 1, |
252 | | - readable: false, |
253 | | - write, |
254 | | - final(cb) { |
255 | | - final(() => promise.then(cb, cb)); |
256 | | - } |
257 | | - }); |
258 | | - } else { |
259 | | - throw new ERR_INVALID_RETURN_VALUE( |
260 | | - 'Iterable, AsyncIterable or AsyncFunction', name, value); |
261 | | - } |
262 | | - } else if (isNodeStream(stream)) { |
263 | | - ret = stream; |
264 | | - } else if (isIterable(stream)) { |
265 | | - ret = from(ComposeDuplex, stream, { |
266 | | - objectMode: true, |
267 | | - highWaterMark: 1, |
268 | | - writable: false |
269 | | - }); |
270 | | - } else { |
271 | | - throw new ERR_INVALID_ARG_TYPE( |
272 | | - name, |
273 | | - ['Stream', 'Iterable', 'AsyncIterable', 'Function'], |
274 | | - stream) |
275 | | - ; |
276 | | - } |
277 | | - return ret; |
278 | | -} |
279 | | - |
280 | | -function fromAsyncGen(fn) { |
281 | | - let { promise, resolve } = createDeferredPromise(); |
282 | | - const value = fn(async function*() { |
283 | | - while (true) { |
284 | | - const { chunk, done, cb } = await promise; |
285 | | - process.nextTick(cb); |
286 | | - if (done) return; |
287 | | - yield chunk; |
288 | | - ({ promise, resolve } = createDeferredPromise()); |
289 | | - } |
290 | | - }()); |
291 | | - |
292 | | - return { |
293 | | - value, |
294 | | - write(chunk, encoding, cb) { |
295 | | - resolve({ chunk, done: false, cb }); |
296 | | - }, |
297 | | - final(cb) { |
298 | | - resolve({ done: true, cb }); |
299 | | - } |
300 | | - }; |
301 | | -} |
0 commit comments