diff options
Diffstat (limited to 'src/bun.js/builtins/js/ReadableStreamInternals.js')
-rw-r--r-- | src/bun.js/builtins/js/ReadableStreamInternals.js | 72 |
1 files changed, 47 insertions, 25 deletions
diff --git a/src/bun.js/builtins/js/ReadableStreamInternals.js b/src/bun.js/builtins/js/ReadableStreamInternals.js index 353e3f9e9..0f5871fa1 100644 --- a/src/bun.js/builtins/js/ReadableStreamInternals.js +++ b/src/bun.js/builtins/js/ReadableStreamInternals.js @@ -784,42 +784,51 @@ function readDirectStream(stream, sink, underlyingSource) { "use strict"; @putByIdDirectPrivate(stream, "underlyingSource", @undefined); + @putByIdDirectPrivate(stream, "start", @undefined); - var {close: originalClose, pull} = underlyingSource; - underlyingSource = @undefined; + var capturedStream = stream; + var reader; - var fakeReader = { - }; - var close = (reason) => { - try { - originalClose && originalClose(reason); - } catch (e) { + function close(stream, reason) { + if (reason && underlyingSource?.cancel) { + try { + underlyingSource.cancel(reason); + } catch (e) { + } + underlyingSource = @undefined; } - originalClose = @undefined; - @putByIdDirectPrivate(stream, "reader", @undefined); - @putByIdDirectPrivate(stream, "readableStreamController", null); - @putByIdDirectPrivate(stream, "state", @streamClosed); - stream = @undefined; - fakeReader = @undefined; - }; + + if (stream) { + @putByIdDirectPrivate(stream, "readableStreamController", @undefined); + @putByIdDirectPrivate(stream, "reader", @undefined); + if (reason) { + @putByIdDirectPrivate(stream, "state", @streamErrored); + @putByIdDirectPrivate(stream, "storedError", reason); + } else { + @putByIdDirectPrivate(stream, "state", @streamClosed); + } + + } + } - if (!pull) { + + + + if (!underlyingSource.pull) { close(); return; } - if (!@isCallable(pull)) { + if (!@isCallable(underlyingSource.pull)) { close(); @throwTypeError("pull is not a function"); return; } @putByIdDirectPrivate(stream, "readableStreamController", sink); - @putByIdDirectPrivate(stream, "start", @undefined); - const highWaterMark = @getByIdDirectPrivate(stream, "highWaterMark"); if (highWaterMark) { @@ -828,12 +837,15 @@ function readDirectStream(stream, sink, underlyingSource) { }); } - @startDirectStream.@call(sink, stream, pull, close); + @startDirectStream.@call(sink, stream, underlyingSource.pull, close); + @putByIdDirectPrivate(stream, "reader", {}); - // isReadableStreamLocked() checks for truthiness of "reader" - @putByIdDirectPrivate(stream, "reader", fakeReader); - pull(sink); + var maybePromise = underlyingSource.pull(sink); sink = @undefined; + if (maybePromise && @isPromise(maybePromise)) { + return maybePromise.@then(() => {}); + } + } @globalPrivate; @@ -841,11 +853,21 @@ function assignToStream(stream, sink) { "use strict"; // The stream is either a direct stream or a "default" JS stream - const underlyingSource = @getByIdDirectPrivate(stream, "underlyingSource"); + var underlyingSource = @getByIdDirectPrivate(stream, "underlyingSource"); // we know it's a direct stream when @underlyingSource is set if (underlyingSource) { - return @readDirectStream(stream, sink, underlyingSource); + try { + return @readDirectStream(stream, sink, underlyingSource); + } catch(e) { + throw e; + } finally { + underlyingSource = @undefined; + stream = @undefined; + sink = @undefined; + } + + } return @readStreamIntoSink(stream, sink, true); |