diff options
| author | 2022-07-04 16:38:45 -0700 | |
|---|---|---|
| committer | 2022-07-04 16:38:45 -0700 | |
| commit | 48f64bc6e5fade5410413d31d0f17e9802a3917b (patch) | |
| tree | 94c00737824119970f741eda92c96f71a16f9232 /src/bun.js/builtins | |
| parent | 667303fc861afaa33ceb273a1e1652c7cd706879 (diff) | |
| download | bun-48f64bc6e5fade5410413d31d0f17e9802a3917b.tar.gz bun-48f64bc6e5fade5410413d31d0f17e9802a3917b.tar.zst bun-48f64bc6e5fade5410413d31d0f17e9802a3917b.zip | |
[itnernal] Cleanup some of the streams code
Diffstat (limited to 'src/bun.js/builtins')
| -rw-r--r-- | src/bun.js/builtins/cpp/ReadableStreamInternalsBuiltins.cpp | 71 | ||||
| -rw-r--r-- | src/bun.js/builtins/js/ReadableStreamInternals.js | 72 |
2 files changed, 95 insertions, 48 deletions
diff --git a/src/bun.js/builtins/cpp/ReadableStreamInternalsBuiltins.cpp b/src/bun.js/builtins/cpp/ReadableStreamInternalsBuiltins.cpp index fd6010f17..ee6dc21b3 100644 --- a/src/bun.js/builtins/cpp/ReadableStreamInternalsBuiltins.cpp +++ b/src/bun.js/builtins/cpp/ReadableStreamInternalsBuiltins.cpp @@ -949,33 +949,47 @@ const char* const s_readableStreamInternalsIsReadableStreamDefaultControllerCode const JSC::ConstructAbility s_readableStreamInternalsReadDirectStreamCodeConstructAbility = JSC::ConstructAbility::CannotConstruct; const JSC::ConstructorKind s_readableStreamInternalsReadDirectStreamCodeConstructorKind = JSC::ConstructorKind::None; -const int s_readableStreamInternalsReadDirectStreamCodeLength = 1208; +const int s_readableStreamInternalsReadDirectStreamCodeLength = 1496; static const JSC::Intrinsic s_readableStreamInternalsReadDirectStreamCodeIntrinsic = JSC::NoIntrinsic; const char* const s_readableStreamInternalsReadDirectStreamCode = "(function (stream, sink, underlyingSource) {\n" \ " \"use strict\";\n" \ " \n" \ " @putByIdDirectPrivate(stream, \"underlyingSource\", @undefined);\n" \ + " @putByIdDirectPrivate(stream, \"start\", @undefined);\n" \ "\n" \ - " var {close: originalClose, pull} = underlyingSource;\n" \ + " var {cancel, pull} = underlyingSource;\n" \ " underlyingSource = @undefined;\n" \ "\n" \ "\n" \ - " var fakeReader = {\n" \ - " };\n" \ - " var close = (reason) => {\n" \ - " try {\n" \ - " originalClose && originalClose(reason);\n" \ - " } catch (e) {\n" \ + " var capturedStream = stream;\n" \ + " var reader;\n" \ "\n" \ + " function close(stream, reason) {\n" \ + " if (reason && cancel) {\n" \ + " try {\n" \ + " cancel(reason);\n" \ + " } catch (e) {\n" \ + " }\n" \ + "\n" \ + " cancel = @undefined;\n" \ " }\n" \ - " originalClose = @undefined;\n" \ - " @putByIdDirectPrivate(stream, \"reader\", @undefined);\n" \ - " @putByIdDirectPrivate(stream, \"readableStreamController\", null);\n" \ - " @putByIdDirectPrivate(stream, \"state\", @streamClosed);\n" \ - " stream = @undefined;\n" \ - " fakeReader = @undefined;\n" \ - " };\n" \ + "\n" \ + " if (stream) {\n" \ + " @putByIdDirectPrivate(stream, \"readableStreamController\", @undefined);\n" \ + " @putByIdDirectPrivate(stream, \"reader\", @undefined);\n" \ + " if (reason) {\n" \ + " @putByIdDirectPrivate(stream, \"state\", @streamErrored);\n" \ + " @putByIdDirectPrivate(stream, \"storedError\", reason);\n" \ + " } else {\n" \ + " @putByIdDirectPrivate(stream, \"state\", @streamClosed);\n" \ + " }\n" \ + " \n" \ + " }\n" \ + " }\n" \ + "\n" \ + "\n" \ + "\n" \ "\n" \ "\n" \ " if (!pull) {\n" \ @@ -990,8 +1004,6 @@ const char* const s_readableStreamInternalsReadDirectStreamCode = " }\n" \ "\n" \ " @putByIdDirectPrivate(stream, \"readableStreamController\", sink);\n" \ - " @putByIdDirectPrivate(stream, \"start\", @undefined);\n" \ - "\n" \ " const highWaterMark = @getByIdDirectPrivate(stream, \"highWaterMark\");\n" \ "\n" \ " if (highWaterMark) {\n" \ @@ -1001,28 +1013,41 @@ const char* const s_readableStreamInternalsReadDirectStreamCode = " }\n" \ "\n" \ " @startDirectStream.@call(sink, stream, pull, close);\n" \ + " @putByIdDirectPrivate(stream, \"reader\", {});\n" \ "\n" \ - " //\n" \ - " @putByIdDirectPrivate(stream, \"reader\", fakeReader);\n" \ - " pull(sink);\n" \ + " var maybePromise = pull(sink);\n" \ " sink = @undefined;\n" \ + " if (maybePromise && @isPromise(maybePromise)) {\n" \ + " return maybePromise.@then(() => {});\n" \ + " }\n" \ + "\n" \ "})\n" \ ; const JSC::ConstructAbility s_readableStreamInternalsAssignToStreamCodeConstructAbility = JSC::ConstructAbility::CannotConstruct; const JSC::ConstructorKind s_readableStreamInternalsAssignToStreamCodeConstructorKind = JSC::ConstructorKind::None; -const int s_readableStreamInternalsAssignToStreamCodeLength = 279; +const int s_readableStreamInternalsAssignToStreamCodeLength = 438; static const JSC::Intrinsic s_readableStreamInternalsAssignToStreamCodeIntrinsic = JSC::NoIntrinsic; const char* const s_readableStreamInternalsAssignToStreamCode = "(function (stream, sink) {\n" \ " \"use strict\";\n" \ "\n" \ " //\n" \ - " const underlyingSource = @getByIdDirectPrivate(stream, \"underlyingSource\");\n" \ + " var underlyingSource = @getByIdDirectPrivate(stream, \"underlyingSource\");\n" \ "\n" \ " //\n" \ " if (underlyingSource) {\n" \ - " return @readDirectStream(stream, sink, underlyingSource);\n" \ + " try {\n" \ + " return @readDirectStream(stream, sink, underlyingSource);\n" \ + " } catch(e) {\n" \ + " throw e;\n" \ + " } finally {\n" \ + " underlyingSource = @undefined;\n" \ + " stream = @undefined;\n" \ + " sink = @undefined;\n" \ + " }\n" \ + " \n" \ + "\n" \ " }\n" \ "\n" \ " return @readStreamIntoSink(stream, sink, true);\n" \ diff --git a/src/bun.js/builtins/js/ReadableStreamInternals.js b/src/bun.js/builtins/js/ReadableStreamInternals.js index 353e3f9e9..0f5871fa1 100644 --- a/src/bun.js/builtins/js/ReadableStreamInternals.js +++ b/src/bun.js/builtins/js/ReadableStreamInternals.js @@ -784,42 +784,51 @@ function readDirectStream(stream, sink, underlyingSource) { "use strict"; @putByIdDirectPrivate(stream, "underlyingSource", @undefined); + @putByIdDirectPrivate(stream, "start", @undefined); - var {close: originalClose, pull} = underlyingSource; - underlyingSource = @undefined; + var capturedStream = stream; + var reader; - var fakeReader = { - }; - var close = (reason) => { - try { - originalClose && originalClose(reason); - } catch (e) { + function close(stream, reason) { + if (reason && underlyingSource?.cancel) { + try { + underlyingSource.cancel(reason); + } catch (e) { + } + underlyingSource = @undefined; } - originalClose = @undefined; - @putByIdDirectPrivate(stream, "reader", @undefined); - @putByIdDirectPrivate(stream, "readableStreamController", null); - @putByIdDirectPrivate(stream, "state", @streamClosed); - stream = @undefined; - fakeReader = @undefined; - }; + + if (stream) { + @putByIdDirectPrivate(stream, "readableStreamController", @undefined); + @putByIdDirectPrivate(stream, "reader", @undefined); + if (reason) { + @putByIdDirectPrivate(stream, "state", @streamErrored); + @putByIdDirectPrivate(stream, "storedError", reason); + } else { + @putByIdDirectPrivate(stream, "state", @streamClosed); + } + + } + } - if (!pull) { + + + + if (!underlyingSource.pull) { close(); return; } - if (!@isCallable(pull)) { + if (!@isCallable(underlyingSource.pull)) { close(); @throwTypeError("pull is not a function"); return; } @putByIdDirectPrivate(stream, "readableStreamController", sink); - @putByIdDirectPrivate(stream, "start", @undefined); - const highWaterMark = @getByIdDirectPrivate(stream, "highWaterMark"); if (highWaterMark) { @@ -828,12 +837,15 @@ function readDirectStream(stream, sink, underlyingSource) { }); } - @startDirectStream.@call(sink, stream, pull, close); + @startDirectStream.@call(sink, stream, underlyingSource.pull, close); + @putByIdDirectPrivate(stream, "reader", {}); - // isReadableStreamLocked() checks for truthiness of "reader" - @putByIdDirectPrivate(stream, "reader", fakeReader); - pull(sink); + var maybePromise = underlyingSource.pull(sink); sink = @undefined; + if (maybePromise && @isPromise(maybePromise)) { + return maybePromise.@then(() => {}); + } + } @globalPrivate; @@ -841,11 +853,21 @@ function assignToStream(stream, sink) { "use strict"; // The stream is either a direct stream or a "default" JS stream - const underlyingSource = @getByIdDirectPrivate(stream, "underlyingSource"); + var underlyingSource = @getByIdDirectPrivate(stream, "underlyingSource"); // we know it's a direct stream when @underlyingSource is set if (underlyingSource) { - return @readDirectStream(stream, sink, underlyingSource); + try { + return @readDirectStream(stream, sink, underlyingSource); + } catch(e) { + throw e; + } finally { + underlyingSource = @undefined; + stream = @undefined; + sink = @undefined; + } + + } return @readStreamIntoSink(stream, sink, true); |
