diff options
Diffstat (limited to 'src/bun.js/builtins/js')
| -rw-r--r-- | src/bun.js/builtins/js/ReadableStreamInternals.js | 72 | 
1 files changed, 47 insertions, 25 deletions
| diff --git a/src/bun.js/builtins/js/ReadableStreamInternals.js b/src/bun.js/builtins/js/ReadableStreamInternals.js index 353e3f9e9..0f5871fa1 100644 --- a/src/bun.js/builtins/js/ReadableStreamInternals.js +++ b/src/bun.js/builtins/js/ReadableStreamInternals.js @@ -784,42 +784,51 @@ function readDirectStream(stream, sink, underlyingSource) {    "use strict";    @putByIdDirectPrivate(stream, "underlyingSource", @undefined); +  @putByIdDirectPrivate(stream, "start", @undefined); -  var {close: originalClose, pull} = underlyingSource; -  underlyingSource = @undefined; +  var capturedStream = stream; +  var reader; -  var fakeReader = { -  }; -  var close = (reason) => { -    try { -        originalClose && originalClose(reason); -    } catch (e) { +  function close(stream, reason) { +    if (reason && underlyingSource?.cancel) { +      try { +        underlyingSource.cancel(reason); +      } catch (e) { +      } +      underlyingSource = @undefined;      } -    originalClose = @undefined; -    @putByIdDirectPrivate(stream, "reader", @undefined); -    @putByIdDirectPrivate(stream, "readableStreamController", null); -    @putByIdDirectPrivate(stream, "state", @streamClosed); -    stream = @undefined; -    fakeReader = @undefined; -  }; + +    if (stream) { +      @putByIdDirectPrivate(stream, "readableStreamController", @undefined); +      @putByIdDirectPrivate(stream, "reader", @undefined); +      if (reason) { +        @putByIdDirectPrivate(stream, "state", @streamErrored); +        @putByIdDirectPrivate(stream, "storedError", reason); +      } else { +        @putByIdDirectPrivate(stream, "state", @streamClosed); +      } +       +    } +  } -  if (!pull) { + + + +  if (!underlyingSource.pull) {      close();      return;    } -  if (!@isCallable(pull)) { +  if (!@isCallable(underlyingSource.pull)) {      close();      @throwTypeError("pull is not a function");      return;    }    @putByIdDirectPrivate(stream, "readableStreamController", sink); -  @putByIdDirectPrivate(stream, "start", @undefined); -    const highWaterMark = @getByIdDirectPrivate(stream, "highWaterMark");    if (highWaterMark) { @@ -828,12 +837,15 @@ function readDirectStream(stream, sink, underlyingSource) {      });    } -  @startDirectStream.@call(sink, stream, pull, close); +  @startDirectStream.@call(sink, stream, underlyingSource.pull, close); +  @putByIdDirectPrivate(stream, "reader", {}); -  // isReadableStreamLocked() checks for truthiness of "reader" -  @putByIdDirectPrivate(stream, "reader", fakeReader); -  pull(sink); +  var maybePromise = underlyingSource.pull(sink);    sink = @undefined; +  if (maybePromise && @isPromise(maybePromise)) { +    return maybePromise.@then(() => {}); +  } +  }  @globalPrivate; @@ -841,11 +853,21 @@ function assignToStream(stream, sink) {    "use strict";    // The stream is either a direct stream or a "default" JS stream -  const underlyingSource = @getByIdDirectPrivate(stream, "underlyingSource"); +  var underlyingSource = @getByIdDirectPrivate(stream, "underlyingSource");    // we know it's a direct stream when @underlyingSource is set    if (underlyingSource) { -    return @readDirectStream(stream, sink, underlyingSource); +    try { +      return @readDirectStream(stream, sink, underlyingSource); +    } catch(e) { +      throw e; +    } finally { +      underlyingSource = @undefined; +      stream = @undefined; +      sink = @undefined; +    } +     +    }    return @readStreamIntoSink(stream, sink, true); | 
