aboutsummaryrefslogtreecommitdiff
path: root/src/bun.js/builtins
diff options
context:
space:
mode:
authorGravatar Jarred Sumner <709451+Jarred-Sumner@users.noreply.github.com> 2022-07-04 16:38:45 -0700
committerGravatar Jarred Sumner <709451+Jarred-Sumner@users.noreply.github.com> 2022-07-04 16:38:45 -0700
commit48f64bc6e5fade5410413d31d0f17e9802a3917b (patch)
tree94c00737824119970f741eda92c96f71a16f9232 /src/bun.js/builtins
parent667303fc861afaa33ceb273a1e1652c7cd706879 (diff)
downloadbun-48f64bc6e5fade5410413d31d0f17e9802a3917b.tar.gz
bun-48f64bc6e5fade5410413d31d0f17e9802a3917b.tar.zst
bun-48f64bc6e5fade5410413d31d0f17e9802a3917b.zip
[itnernal] Cleanup some of the streams code
Diffstat (limited to 'src/bun.js/builtins')
-rw-r--r--src/bun.js/builtins/cpp/ReadableStreamInternalsBuiltins.cpp71
-rw-r--r--src/bun.js/builtins/js/ReadableStreamInternals.js72
2 files changed, 95 insertions, 48 deletions
diff --git a/src/bun.js/builtins/cpp/ReadableStreamInternalsBuiltins.cpp b/src/bun.js/builtins/cpp/ReadableStreamInternalsBuiltins.cpp
index fd6010f17..ee6dc21b3 100644
--- a/src/bun.js/builtins/cpp/ReadableStreamInternalsBuiltins.cpp
+++ b/src/bun.js/builtins/cpp/ReadableStreamInternalsBuiltins.cpp
@@ -949,33 +949,47 @@ const char* const s_readableStreamInternalsIsReadableStreamDefaultControllerCode
const JSC::ConstructAbility s_readableStreamInternalsReadDirectStreamCodeConstructAbility = JSC::ConstructAbility::CannotConstruct;
const JSC::ConstructorKind s_readableStreamInternalsReadDirectStreamCodeConstructorKind = JSC::ConstructorKind::None;
-const int s_readableStreamInternalsReadDirectStreamCodeLength = 1208;
+const int s_readableStreamInternalsReadDirectStreamCodeLength = 1496;
static const JSC::Intrinsic s_readableStreamInternalsReadDirectStreamCodeIntrinsic = JSC::NoIntrinsic;
const char* const s_readableStreamInternalsReadDirectStreamCode =
"(function (stream, sink, underlyingSource) {\n" \
" \"use strict\";\n" \
" \n" \
" @putByIdDirectPrivate(stream, \"underlyingSource\", @undefined);\n" \
+ " @putByIdDirectPrivate(stream, \"start\", @undefined);\n" \
"\n" \
- " var {close: originalClose, pull} = underlyingSource;\n" \
+ " var {cancel, pull} = underlyingSource;\n" \
" underlyingSource = @undefined;\n" \
"\n" \
"\n" \
- " var fakeReader = {\n" \
- " };\n" \
- " var close = (reason) => {\n" \
- " try {\n" \
- " originalClose && originalClose(reason);\n" \
- " } catch (e) {\n" \
+ " var capturedStream = stream;\n" \
+ " var reader;\n" \
"\n" \
+ " function close(stream, reason) {\n" \
+ " if (reason && cancel) {\n" \
+ " try {\n" \
+ " cancel(reason);\n" \
+ " } catch (e) {\n" \
+ " }\n" \
+ "\n" \
+ " cancel = @undefined;\n" \
" }\n" \
- " originalClose = @undefined;\n" \
- " @putByIdDirectPrivate(stream, \"reader\", @undefined);\n" \
- " @putByIdDirectPrivate(stream, \"readableStreamController\", null);\n" \
- " @putByIdDirectPrivate(stream, \"state\", @streamClosed);\n" \
- " stream = @undefined;\n" \
- " fakeReader = @undefined;\n" \
- " };\n" \
+ "\n" \
+ " if (stream) {\n" \
+ " @putByIdDirectPrivate(stream, \"readableStreamController\", @undefined);\n" \
+ " @putByIdDirectPrivate(stream, \"reader\", @undefined);\n" \
+ " if (reason) {\n" \
+ " @putByIdDirectPrivate(stream, \"state\", @streamErrored);\n" \
+ " @putByIdDirectPrivate(stream, \"storedError\", reason);\n" \
+ " } else {\n" \
+ " @putByIdDirectPrivate(stream, \"state\", @streamClosed);\n" \
+ " }\n" \
+ " \n" \
+ " }\n" \
+ " }\n" \
+ "\n" \
+ "\n" \
+ "\n" \
"\n" \
"\n" \
" if (!pull) {\n" \
@@ -990,8 +1004,6 @@ const char* const s_readableStreamInternalsReadDirectStreamCode =
" }\n" \
"\n" \
" @putByIdDirectPrivate(stream, \"readableStreamController\", sink);\n" \
- " @putByIdDirectPrivate(stream, \"start\", @undefined);\n" \
- "\n" \
" const highWaterMark = @getByIdDirectPrivate(stream, \"highWaterMark\");\n" \
"\n" \
" if (highWaterMark) {\n" \
@@ -1001,28 +1013,41 @@ const char* const s_readableStreamInternalsReadDirectStreamCode =
" }\n" \
"\n" \
" @startDirectStream.@call(sink, stream, pull, close);\n" \
+ " @putByIdDirectPrivate(stream, \"reader\", {});\n" \
"\n" \
- " //\n" \
- " @putByIdDirectPrivate(stream, \"reader\", fakeReader);\n" \
- " pull(sink);\n" \
+ " var maybePromise = pull(sink);\n" \
" sink = @undefined;\n" \
+ " if (maybePromise && @isPromise(maybePromise)) {\n" \
+ " return maybePromise.@then(() => {});\n" \
+ " }\n" \
+ "\n" \
"})\n" \
;
const JSC::ConstructAbility s_readableStreamInternalsAssignToStreamCodeConstructAbility = JSC::ConstructAbility::CannotConstruct;
const JSC::ConstructorKind s_readableStreamInternalsAssignToStreamCodeConstructorKind = JSC::ConstructorKind::None;
-const int s_readableStreamInternalsAssignToStreamCodeLength = 279;
+const int s_readableStreamInternalsAssignToStreamCodeLength = 438;
static const JSC::Intrinsic s_readableStreamInternalsAssignToStreamCodeIntrinsic = JSC::NoIntrinsic;
const char* const s_readableStreamInternalsAssignToStreamCode =
"(function (stream, sink) {\n" \
" \"use strict\";\n" \
"\n" \
" //\n" \
- " const underlyingSource = @getByIdDirectPrivate(stream, \"underlyingSource\");\n" \
+ " var underlyingSource = @getByIdDirectPrivate(stream, \"underlyingSource\");\n" \
"\n" \
" //\n" \
" if (underlyingSource) {\n" \
- " return @readDirectStream(stream, sink, underlyingSource);\n" \
+ " try {\n" \
+ " return @readDirectStream(stream, sink, underlyingSource);\n" \
+ " } catch(e) {\n" \
+ " throw e;\n" \
+ " } finally {\n" \
+ " underlyingSource = @undefined;\n" \
+ " stream = @undefined;\n" \
+ " sink = @undefined;\n" \
+ " }\n" \
+ " \n" \
+ "\n" \
" }\n" \
"\n" \
" return @readStreamIntoSink(stream, sink, true);\n" \
diff --git a/src/bun.js/builtins/js/ReadableStreamInternals.js b/src/bun.js/builtins/js/ReadableStreamInternals.js
index 353e3f9e9..0f5871fa1 100644
--- a/src/bun.js/builtins/js/ReadableStreamInternals.js
+++ b/src/bun.js/builtins/js/ReadableStreamInternals.js
@@ -784,42 +784,51 @@ function readDirectStream(stream, sink, underlyingSource) {
"use strict";
@putByIdDirectPrivate(stream, "underlyingSource", @undefined);
+ @putByIdDirectPrivate(stream, "start", @undefined);
- var {close: originalClose, pull} = underlyingSource;
- underlyingSource = @undefined;
+ var capturedStream = stream;
+ var reader;
- var fakeReader = {
- };
- var close = (reason) => {
- try {
- originalClose && originalClose(reason);
- } catch (e) {
+ function close(stream, reason) {
+ if (reason && underlyingSource?.cancel) {
+ try {
+ underlyingSource.cancel(reason);
+ } catch (e) {
+ }
+ underlyingSource = @undefined;
}
- originalClose = @undefined;
- @putByIdDirectPrivate(stream, "reader", @undefined);
- @putByIdDirectPrivate(stream, "readableStreamController", null);
- @putByIdDirectPrivate(stream, "state", @streamClosed);
- stream = @undefined;
- fakeReader = @undefined;
- };
+
+ if (stream) {
+ @putByIdDirectPrivate(stream, "readableStreamController", @undefined);
+ @putByIdDirectPrivate(stream, "reader", @undefined);
+ if (reason) {
+ @putByIdDirectPrivate(stream, "state", @streamErrored);
+ @putByIdDirectPrivate(stream, "storedError", reason);
+ } else {
+ @putByIdDirectPrivate(stream, "state", @streamClosed);
+ }
+
+ }
+ }
- if (!pull) {
+
+
+
+ if (!underlyingSource.pull) {
close();
return;
}
- if (!@isCallable(pull)) {
+ if (!@isCallable(underlyingSource.pull)) {
close();
@throwTypeError("pull is not a function");
return;
}
@putByIdDirectPrivate(stream, "readableStreamController", sink);
- @putByIdDirectPrivate(stream, "start", @undefined);
-
const highWaterMark = @getByIdDirectPrivate(stream, "highWaterMark");
if (highWaterMark) {
@@ -828,12 +837,15 @@ function readDirectStream(stream, sink, underlyingSource) {
});
}
- @startDirectStream.@call(sink, stream, pull, close);
+ @startDirectStream.@call(sink, stream, underlyingSource.pull, close);
+ @putByIdDirectPrivate(stream, "reader", {});
- // isReadableStreamLocked() checks for truthiness of "reader"
- @putByIdDirectPrivate(stream, "reader", fakeReader);
- pull(sink);
+ var maybePromise = underlyingSource.pull(sink);
sink = @undefined;
+ if (maybePromise && @isPromise(maybePromise)) {
+ return maybePromise.@then(() => {});
+ }
+
}
@globalPrivate;
@@ -841,11 +853,21 @@ function assignToStream(stream, sink) {
"use strict";
// The stream is either a direct stream or a "default" JS stream
- const underlyingSource = @getByIdDirectPrivate(stream, "underlyingSource");
+ var underlyingSource = @getByIdDirectPrivate(stream, "underlyingSource");
// we know it's a direct stream when @underlyingSource is set
if (underlyingSource) {
- return @readDirectStream(stream, sink, underlyingSource);
+ try {
+ return @readDirectStream(stream, sink, underlyingSource);
+ } catch(e) {
+ throw e;
+ } finally {
+ underlyingSource = @undefined;
+ stream = @undefined;
+ sink = @undefined;
+ }
+
+
}
return @readStreamIntoSink(stream, sink, true);