aboutsummaryrefslogtreecommitdiff
path: root/src/js/node/stream.js
diff options
context:
space:
mode:
Diffstat (limited to 'src/js/node/stream.js')
-rw-r--r--src/js/node/stream.js92
1 files changed, 85 insertions, 7 deletions
diff --git a/src/js/node/stream.js b/src/js/node/stream.js
index 474ada8d8..1ae0f7fb6 100644
--- a/src/js/node/stream.js
+++ b/src/js/node/stream.js
@@ -3328,16 +3328,94 @@ var require_readable = __commonJS({
};
var webStreamsAdapters = {
newStreamReadableFromReadableStream,
+
+ newReadableStreamFromStreamReadable(streamReadable, options = {}) {
+ // Not using the internal/streams/utils isReadableNodeStream utility
+ // here because it will return false if streamReadable is a Duplex
+ // whose readable option is false. For a Duplex that is not readable,
+ // we want it to pass this check but return a closed ReadableStream.
+ if (typeof streamReadable?._readableState !== "object") {
+ throw new ERR_INVALID_ARG_TYPE("streamReadable", "stream.Readable", streamReadable);
+ }
+ var { isDestroyed, isReadable } = require_utils();
+
+ if (isDestroyed(streamReadable) || !isReadable(streamReadable)) {
+ const readable = new ReadableStream();
+ readable.cancel();
+ return readable;
+ }
+
+ const objectMode = streamReadable.readableObjectMode;
+ const highWaterMark = streamReadable.readableHighWaterMark;
+
+ const evaluateStrategyOrFallback = strategy => {
+ // If there is a strategy available, use it
+ if (strategy) return strategy;
+
+ if (objectMode) {
+ // When running in objectMode explicitly but no strategy, we just fall
+ // back to CountQueuingStrategy
+ return new CountQueuingStrategy({ highWaterMark });
+ }
+
+ // When not running in objectMode explicitly, we just fall
+ // back to a minimal strategy that just specifies the highWaterMark
+ // and no size algorithm. Using a ByteLengthQueuingStrategy here
+ // is unnecessary.
+ return { highWaterMark };
+ };
+
+ const strategy = evaluateStrategyOrFallback(options?.strategy);
+
+ let controller;
+
+ function onData(chunk) {
+ controller.enqueue(chunk);
+ if (controller.desiredSize <= 0) streamReadable.pause();
+ }
+
+ streamReadable.pause();
+
+ const cleanup = finished(streamReadable, error => {
+ if (error?.code === "ERR_STREAM_PREMATURE_CLOSE") {
+ const err = new AbortError(undefined, { cause: error });
+ error = err;
+ }
+
+ cleanup();
+ // This is a protection against non-standard, legacy streams
+ // that happen to emit an error event again after finished is called.
+ streamReadable.on("error", () => {});
+ if (error) return controller.error(error);
+ controller.close();
+ });
+
+ streamReadable.on("data", onData);
+
+ return new ReadableStream(
+ {
+ start(c) {
+ controller = c;
+ },
+
+ pull() {
+ streamReadable.resume();
+ },
+
+ cancel(reason) {
+ destroy(streamReadable, reason);
+ },
+ },
+ strategy,
+ );
+ },
};
- function lazyWebStreams() {
- if (webStreamsAdapters === void 0) webStreamsAdapters = {};
- return webStreamsAdapters;
- }
+
Readable.fromWeb = function (readableStream, options) {
- return lazyWebStreams().newStreamReadableFromReadableStream(readableStream, options);
+ return webStreamsAdapters.newStreamReadableFromReadableStream(readableStream, options);
};
- Readable.toWeb = function (streamReadable) {
- return lazyWebStreams().newReadableStreamFromStreamReadable(streamReadable);
+ Readable.toWeb = function (streamReadable, options) {
+ return webStreamsAdapters.newReadableStreamFromStreamReadable(streamReadable, options);
};
Readable.wrap = function (src, options) {
var _ref, _src$readableObjectMo;