diff options
Diffstat (limited to 'src/bun.js/builtins')
| -rw-r--r-- | src/bun.js/builtins/cpp/ReadableStreamInternalsBuiltins.cpp | 71 | ||||
| -rw-r--r-- | src/bun.js/builtins/js/ReadableStreamInternals.js | 72 | 
2 files changed, 95 insertions, 48 deletions
| diff --git a/src/bun.js/builtins/cpp/ReadableStreamInternalsBuiltins.cpp b/src/bun.js/builtins/cpp/ReadableStreamInternalsBuiltins.cpp index fd6010f17..ee6dc21b3 100644 --- a/src/bun.js/builtins/cpp/ReadableStreamInternalsBuiltins.cpp +++ b/src/bun.js/builtins/cpp/ReadableStreamInternalsBuiltins.cpp @@ -949,33 +949,47 @@ const char* const s_readableStreamInternalsIsReadableStreamDefaultControllerCode  const JSC::ConstructAbility s_readableStreamInternalsReadDirectStreamCodeConstructAbility = JSC::ConstructAbility::CannotConstruct;  const JSC::ConstructorKind s_readableStreamInternalsReadDirectStreamCodeConstructorKind = JSC::ConstructorKind::None; -const int s_readableStreamInternalsReadDirectStreamCodeLength = 1208; +const int s_readableStreamInternalsReadDirectStreamCodeLength = 1496;  static const JSC::Intrinsic s_readableStreamInternalsReadDirectStreamCodeIntrinsic = JSC::NoIntrinsic;  const char* const s_readableStreamInternalsReadDirectStreamCode =      "(function (stream, sink, underlyingSource) {\n" \      "  \"use strict\";\n" \      "  \n" \      "  @putByIdDirectPrivate(stream, \"underlyingSource\", @undefined);\n" \ +    "  @putByIdDirectPrivate(stream, \"start\", @undefined);\n" \      "\n" \ -    "  var {close: originalClose, pull} = underlyingSource;\n" \ +    "  var {cancel, pull} = underlyingSource;\n" \      "  underlyingSource = @undefined;\n" \      "\n" \      "\n" \ -    "  var fakeReader = {\n" \ -    "  };\n" \ -    "  var close = (reason) => {\n" \ -    "    try {\n" \ -    "        originalClose && originalClose(reason);\n" \ -    "    } catch (e) {\n" \ +    "  var capturedStream = stream;\n" \ +    "  var reader;\n" \      "\n" \ +    "  function close(stream, reason) {\n" \ +    "    if (reason && cancel) {\n" \ +    "      try {\n" \ +    "        cancel(reason);\n" \ +    "      } catch (e) {\n" \ +    "      }\n" \ +    "\n" \ +    "      cancel = @undefined;\n" \      "    }\n" \ -    "    originalClose = @undefined;\n" \ -    "    @putByIdDirectPrivate(stream, \"reader\", @undefined);\n" \ -    "    @putByIdDirectPrivate(stream, \"readableStreamController\", null);\n" \ -    "    @putByIdDirectPrivate(stream, \"state\", @streamClosed);\n" \ -    "    stream = @undefined;\n" \ -    "    fakeReader = @undefined;\n" \ -    "  };\n" \ +    "\n" \ +    "    if (stream) {\n" \ +    "      @putByIdDirectPrivate(stream, \"readableStreamController\", @undefined);\n" \ +    "      @putByIdDirectPrivate(stream, \"reader\", @undefined);\n" \ +    "      if (reason) {\n" \ +    "        @putByIdDirectPrivate(stream, \"state\", @streamErrored);\n" \ +    "        @putByIdDirectPrivate(stream, \"storedError\", reason);\n" \ +    "      } else {\n" \ +    "        @putByIdDirectPrivate(stream, \"state\", @streamClosed);\n" \ +    "      }\n" \ +    "      \n" \ +    "    }\n" \ +    "  }\n" \ +    "\n" \ +    "\n" \ +    "\n" \      "\n" \      "\n" \      "  if (!pull) {\n" \ @@ -990,8 +1004,6 @@ const char* const s_readableStreamInternalsReadDirectStreamCode =      "  }\n" \      "\n" \      "  @putByIdDirectPrivate(stream, \"readableStreamController\", sink);\n" \ -    "  @putByIdDirectPrivate(stream, \"start\", @undefined);\n" \ -    "\n" \      "  const highWaterMark = @getByIdDirectPrivate(stream, \"highWaterMark\");\n" \      "\n" \      "  if (highWaterMark) {\n" \ @@ -1001,28 +1013,41 @@ const char* const s_readableStreamInternalsReadDirectStreamCode =      "  }\n" \      "\n" \      "  @startDirectStream.@call(sink, stream, pull, close);\n" \ +    "  @putByIdDirectPrivate(stream, \"reader\", {});\n" \      "\n" \ -    "  //\n" \ -    "  @putByIdDirectPrivate(stream, \"reader\", fakeReader);\n" \ -    "  pull(sink);\n" \ +    "  var maybePromise = pull(sink);\n" \      "  sink = @undefined;\n" \ +    "  if (maybePromise && @isPromise(maybePromise)) {\n" \ +    "    return maybePromise.@then(() => {});\n" \ +    "  }\n" \ +    "\n" \      "})\n" \  ;  const JSC::ConstructAbility s_readableStreamInternalsAssignToStreamCodeConstructAbility = JSC::ConstructAbility::CannotConstruct;  const JSC::ConstructorKind s_readableStreamInternalsAssignToStreamCodeConstructorKind = JSC::ConstructorKind::None; -const int s_readableStreamInternalsAssignToStreamCodeLength = 279; +const int s_readableStreamInternalsAssignToStreamCodeLength = 438;  static const JSC::Intrinsic s_readableStreamInternalsAssignToStreamCodeIntrinsic = JSC::NoIntrinsic;  const char* const s_readableStreamInternalsAssignToStreamCode =      "(function (stream, sink) {\n" \      "  \"use strict\";\n" \      "\n" \      "  //\n" \ -    "  const underlyingSource = @getByIdDirectPrivate(stream, \"underlyingSource\");\n" \ +    "  var underlyingSource = @getByIdDirectPrivate(stream, \"underlyingSource\");\n" \      "\n" \      "  //\n" \      "  if (underlyingSource) {\n" \ -    "    return @readDirectStream(stream, sink, underlyingSource);\n" \ +    "    try {\n" \ +    "      return @readDirectStream(stream, sink, underlyingSource);\n" \ +    "    } catch(e) {\n" \ +    "      throw e;\n" \ +    "    } finally {\n" \ +    "      underlyingSource = @undefined;\n" \ +    "      stream = @undefined;\n" \ +    "      sink = @undefined;\n" \ +    "    }\n" \ +    "    \n" \ +    "\n" \      "  }\n" \      "\n" \      "  return @readStreamIntoSink(stream, sink, true);\n" \ diff --git a/src/bun.js/builtins/js/ReadableStreamInternals.js b/src/bun.js/builtins/js/ReadableStreamInternals.js index 353e3f9e9..0f5871fa1 100644 --- a/src/bun.js/builtins/js/ReadableStreamInternals.js +++ b/src/bun.js/builtins/js/ReadableStreamInternals.js @@ -784,42 +784,51 @@ function readDirectStream(stream, sink, underlyingSource) {    "use strict";    @putByIdDirectPrivate(stream, "underlyingSource", @undefined); +  @putByIdDirectPrivate(stream, "start", @undefined); -  var {close: originalClose, pull} = underlyingSource; -  underlyingSource = @undefined; +  var capturedStream = stream; +  var reader; -  var fakeReader = { -  }; -  var close = (reason) => { -    try { -        originalClose && originalClose(reason); -    } catch (e) { +  function close(stream, reason) { +    if (reason && underlyingSource?.cancel) { +      try { +        underlyingSource.cancel(reason); +      } catch (e) { +      } +      underlyingSource = @undefined;      } -    originalClose = @undefined; -    @putByIdDirectPrivate(stream, "reader", @undefined); -    @putByIdDirectPrivate(stream, "readableStreamController", null); -    @putByIdDirectPrivate(stream, "state", @streamClosed); -    stream = @undefined; -    fakeReader = @undefined; -  }; + +    if (stream) { +      @putByIdDirectPrivate(stream, "readableStreamController", @undefined); +      @putByIdDirectPrivate(stream, "reader", @undefined); +      if (reason) { +        @putByIdDirectPrivate(stream, "state", @streamErrored); +        @putByIdDirectPrivate(stream, "storedError", reason); +      } else { +        @putByIdDirectPrivate(stream, "state", @streamClosed); +      } +       +    } +  } -  if (!pull) { + + + +  if (!underlyingSource.pull) {      close();      return;    } -  if (!@isCallable(pull)) { +  if (!@isCallable(underlyingSource.pull)) {      close();      @throwTypeError("pull is not a function");      return;    }    @putByIdDirectPrivate(stream, "readableStreamController", sink); -  @putByIdDirectPrivate(stream, "start", @undefined); -    const highWaterMark = @getByIdDirectPrivate(stream, "highWaterMark");    if (highWaterMark) { @@ -828,12 +837,15 @@ function readDirectStream(stream, sink, underlyingSource) {      });    } -  @startDirectStream.@call(sink, stream, pull, close); +  @startDirectStream.@call(sink, stream, underlyingSource.pull, close); +  @putByIdDirectPrivate(stream, "reader", {}); -  // isReadableStreamLocked() checks for truthiness of "reader" -  @putByIdDirectPrivate(stream, "reader", fakeReader); -  pull(sink); +  var maybePromise = underlyingSource.pull(sink);    sink = @undefined; +  if (maybePromise && @isPromise(maybePromise)) { +    return maybePromise.@then(() => {}); +  } +  }  @globalPrivate; @@ -841,11 +853,21 @@ function assignToStream(stream, sink) {    "use strict";    // The stream is either a direct stream or a "default" JS stream -  const underlyingSource = @getByIdDirectPrivate(stream, "underlyingSource"); +  var underlyingSource = @getByIdDirectPrivate(stream, "underlyingSource");    // we know it's a direct stream when @underlyingSource is set    if (underlyingSource) { -    return @readDirectStream(stream, sink, underlyingSource); +    try { +      return @readDirectStream(stream, sink, underlyingSource); +    } catch(e) { +      throw e; +    } finally { +      underlyingSource = @undefined; +      stream = @undefined; +      sink = @undefined; +    } +     +    }    return @readStreamIntoSink(stream, sink, true); | 
