diff options
Diffstat (limited to 'src/js/node/stream.js')
-rw-r--r-- | src/js/node/stream.js | 92 |
1 files changed, 85 insertions, 7 deletions
diff --git a/src/js/node/stream.js b/src/js/node/stream.js index 474ada8d8..1ae0f7fb6 100644 --- a/src/js/node/stream.js +++ b/src/js/node/stream.js @@ -3328,16 +3328,94 @@ var require_readable = __commonJS({ }; var webStreamsAdapters = { newStreamReadableFromReadableStream, + + newReadableStreamFromStreamReadable(streamReadable, options = {}) { + // Not using the internal/streams/utils isReadableNodeStream utility + // here because it will return false if streamReadable is a Duplex + // whose readable option is false. For a Duplex that is not readable, + // we want it to pass this check but return a closed ReadableStream. + if (typeof streamReadable?._readableState !== "object") { + throw new ERR_INVALID_ARG_TYPE("streamReadable", "stream.Readable", streamReadable); + } + var { isDestroyed, isReadable } = require_utils(); + + if (isDestroyed(streamReadable) || !isReadable(streamReadable)) { + const readable = new ReadableStream(); + readable.cancel(); + return readable; + } + + const objectMode = streamReadable.readableObjectMode; + const highWaterMark = streamReadable.readableHighWaterMark; + + const evaluateStrategyOrFallback = strategy => { + // If there is a strategy available, use it + if (strategy) return strategy; + + if (objectMode) { + // When running in objectMode explicitly but no strategy, we just fall + // back to CountQueuingStrategy + return new CountQueuingStrategy({ highWaterMark }); + } + + // When not running in objectMode explicitly, we just fall + // back to a minimal strategy that just specifies the highWaterMark + // and no size algorithm. Using a ByteLengthQueuingStrategy here + // is unnecessary. + return { highWaterMark }; + }; + + const strategy = evaluateStrategyOrFallback(options?.strategy); + + let controller; + + function onData(chunk) { + controller.enqueue(chunk); + if (controller.desiredSize <= 0) streamReadable.pause(); + } + + streamReadable.pause(); + + const cleanup = finished(streamReadable, error => { + if (error?.code === "ERR_STREAM_PREMATURE_CLOSE") { + const err = new AbortError(undefined, { cause: error }); + error = err; + } + + cleanup(); + // This is a protection against non-standard, legacy streams + // that happen to emit an error event again after finished is called. + streamReadable.on("error", () => {}); + if (error) return controller.error(error); + controller.close(); + }); + + streamReadable.on("data", onData); + + return new ReadableStream( + { + start(c) { + controller = c; + }, + + pull() { + streamReadable.resume(); + }, + + cancel(reason) { + destroy(streamReadable, reason); + }, + }, + strategy, + ); + }, }; - function lazyWebStreams() { - if (webStreamsAdapters === void 0) webStreamsAdapters = {}; - return webStreamsAdapters; - } + Readable.fromWeb = function (readableStream, options) { - return lazyWebStreams().newStreamReadableFromReadableStream(readableStream, options); + return webStreamsAdapters.newStreamReadableFromReadableStream(readableStream, options); }; - Readable.toWeb = function (streamReadable) { - return lazyWebStreams().newReadableStreamFromStreamReadable(streamReadable); + Readable.toWeb = function (streamReadable, options) { + return webStreamsAdapters.newReadableStreamFromStreamReadable(streamReadable, options); }; Readable.wrap = function (src, options) { var _ref, _src$readableObjectMo; |