aboutsummaryrefslogtreecommitdiff
path: root/src/bun.js/builtins
diff options
context:
space:
mode:
Diffstat (limited to 'src/bun.js/builtins')
-rw-r--r--src/bun.js/builtins/cpp/ReadableStreamInternalsBuiltins.cpp91
-rw-r--r--src/bun.js/builtins/js/ReadableStreamInternals.js89
2 files changed, 131 insertions, 49 deletions
diff --git a/src/bun.js/builtins/cpp/ReadableStreamInternalsBuiltins.cpp b/src/bun.js/builtins/cpp/ReadableStreamInternalsBuiltins.cpp
index 90b9eeada..f2a3d5a55 100644
--- a/src/bun.js/builtins/cpp/ReadableStreamInternalsBuiltins.cpp
+++ b/src/bun.js/builtins/cpp/ReadableStreamInternalsBuiltins.cpp
@@ -1039,7 +1039,7 @@ const char* const s_readableStreamInternalsReadDirectStreamCode =
"\n" \
" if (highWaterMark) {\n" \
" sink.start({\n" \
- " highWaterMark,\n" \
+ " highWaterMark: highWaterMark < 64 ? 64 : highWaterMark,\n" \
" });\n" \
" }\n" \
"\n" \
@@ -2270,7 +2270,7 @@ const char* const s_readableStreamInternalsReadableStreamDefaultControllerCanClo
const JSC::ConstructAbility s_readableStreamInternalsLazyLoadStreamCodeConstructAbility = JSC::ConstructAbility::CannotConstruct;
const JSC::ConstructorKind s_readableStreamInternalsLazyLoadStreamCodeConstructorKind = JSC::ConstructorKind::None;
const JSC::ImplementationVisibility s_readableStreamInternalsLazyLoadStreamCodeImplementationVisibility = JSC::ImplementationVisibility::Public;
-const int s_readableStreamInternalsLazyLoadStreamCodeLength = 2614;
+const int s_readableStreamInternalsLazyLoadStreamCodeLength = 3647;
static const JSC::Intrinsic s_readableStreamInternalsLazyLoadStreamCodeIntrinsic = JSC::NoIntrinsic;
const char* const s_readableStreamInternalsLazyLoadStreamCode =
"(function (stream, autoAllocateChunkSize) {\n" \
@@ -2280,7 +2280,7 @@ const char* const s_readableStreamInternalsLazyLoadStreamCode =
" var nativePtr = @getByIdDirectPrivate(stream, \"bunNativePtr\");\n" \
" var Prototype = @lazyStreamPrototypeMap.@get(nativeType);\n" \
" if (Prototype === @undefined) {\n" \
- " var [pull, start, cancel, setClose, deinit] = @lazyLoad(nativeType);\n" \
+ " var [pull, start, cancel, setClose, deinit, setRefOrUnref, drain] = @lazyLoad(nativeType);\n" \
" var closer = [false];\n" \
" var handleResult;\n" \
" function handleNativeReadableStreamPromiseResult(val) {\n" \
@@ -2293,8 +2293,6 @@ const char* const s_readableStreamInternalsLazyLoadStreamCode =
"\n" \
" handleResult = function handleResult(result, controller, view) {\n" \
" \"use strict\";\n" \
- "\n" \
- " \n" \
" if (result && @isPromise(result)) {\n" \
" return result.then(\n" \
" handleNativeReadableStreamPromiseResult.bind({\n" \
@@ -2319,53 +2317,96 @@ const char* const s_readableStreamInternalsLazyLoadStreamCode =
" }\n" \
" };\n" \
"\n" \
+ " function createResult(tag, controller, view, closer) {\n" \
+ " closer[0] = false;\n" \
+ "\n" \
+ " var result;\n" \
+ " try {\n" \
+ " result = pull(tag, view, closer);\n" \
+ " } catch (err) {\n" \
+ " return controller.error(err);\n" \
+ " }\n" \
+ "\n" \
+ " return handleResult(result, controller, view);\n" \
+ " }\n" \
+ "\n" \
" Prototype = class NativeReadableStreamSource {\n" \
- " constructor(tag, autoAllocateChunkSize) {\n" \
- " this.pull = this.pull_.bind(tag);\n" \
- " this.cancel = this.cancel_.bind(tag);\n" \
+ " constructor(tag, autoAllocateChunkSize, drainValue) {\n" \
+ " this.#tag = tag;\n" \
+ " this.pull = this.#pull.bind(this);\n" \
+ " this.cancel = this.#cancel.bind(this);\n" \
" this.autoAllocateChunkSize = autoAllocateChunkSize;\n" \
+ "\n" \
+ " if (drainValue !== @undefined) {\n" \
+ " this.start = (controller) => {\n" \
+ " controller.enqueue(drainValue);\n" \
+ " console.log(\"chunkSize\", chunkSize);\n" \
+ " };\n" \
+ " }\n" \
" }\n" \
"\n" \
" pull;\n" \
" cancel;\n" \
+ " start;\n" \
"\n" \
+ " #tag;\n" \
" type = \"bytes\";\n" \
" autoAllocateChunkSize = 0;\n" \
- "\n" \
+ " \n" \
" static startSync = start;\n" \
+ " \n" \
+ " \n" \
+ " #pull(controller) {\n" \
+ " var tag = this.#tag;\n" \
"\n" \
- " pull_(controller) {\n" \
- " closer[0] = false;\n" \
- "\n" \
- " var result;\n" \
- "\n" \
- " const view = controller.byobRequest.view;\n" \
- " try {\n" \
- " result = pull(this, view, closer);\n" \
- " } catch (err) {\n" \
- " return controller.error(err);\n" \
+ " if (!tag) {\n" \
+ " controller.close();\n" \
+ " return;\n" \
" }\n" \
"\n" \
- " return handleResult(result, controller, view);\n" \
+ " createResult(tag, controller, controller.byobRequest.view, closer);\n" \
" }\n" \
"\n" \
- " cancel_(reason) {\n" \
- " cancel(this, reason);\n" \
+ " #cancel(reason) {\n" \
+ " var tag = this.#tag;\n" \
+ " setRefOrUnref && setRefOrUnref(tag, false);\n" \
+ " cancel(tag, reason);\n" \
" }\n" \
" static deinit = deinit;\n" \
" static registry = new FinalizationRegistry(deinit);\n" \
+ " static drain = drain;\n" \
" };\n" \
" @lazyStreamPrototypeMap.@set(nativeType, Prototype);\n" \
" }\n" \
"\n" \
" const chunkSize = Prototype.startSync(nativePtr, autoAllocateChunkSize);\n" \
+ " var drainValue;\n" \
+ " const drainFn = Prototype.drain;\n" \
+ " if (drainFn) {\n" \
+ " drainValue = drainFn(nativePtr);\n" \
+ " }\n" \
"\n" \
" //\n" \
" if (chunkSize === 0) {\n" \
- " @readableStreamClose(stream);\n" \
- " return null;\n" \
+ " if ((drainValue?.byteLength ?? 0) > 0) {\n" \
+ " deinit && nativePtr && @enqueueJob(deinit, nativePtr);\n" \
+ " return {\n" \
+ " start(controller) {\n" \
+ " controller.enqueue(drainValue);\n" \
+ " controller.close();\n" \
+ " },\n" \
+ " type: \"bytes\",\n" \
+ " };\n" \
+ " }\n" \
+ "\n" \
+ " return {\n" \
+ " start(controller) {\n" \
+ " controller.close();\n" \
+ " },\n" \
+ " type: \"bytes\",\n" \
+ " };\n" \
" }\n" \
- " var instance = new Prototype(nativePtr, chunkSize);\n" \
+ " var instance = new Prototype(nativePtr, chunkSize, drainValue);\n" \
" Prototype.registry.register(instance, nativePtr);\n" \
" return instance;\n" \
"})\n" \
diff --git a/src/bun.js/builtins/js/ReadableStreamInternals.js b/src/bun.js/builtins/js/ReadableStreamInternals.js
index e8c9667a0..def4d51a3 100644
--- a/src/bun.js/builtins/js/ReadableStreamInternals.js
+++ b/src/bun.js/builtins/js/ReadableStreamInternals.js
@@ -839,7 +839,7 @@ function readDirectStream(stream, sink, underlyingSource) {
if (highWaterMark) {
sink.start({
- highWaterMark,
+ highWaterMark: highWaterMark < 64 ? 64 : highWaterMark,
});
}
@@ -1857,7 +1857,7 @@ function lazyLoadStream(stream, autoAllocateChunkSize) {
var nativePtr = @getByIdDirectPrivate(stream, "bunNativePtr");
var Prototype = @lazyStreamPrototypeMap.@get(nativeType);
if (Prototype === @undefined) {
- var [pull, start, cancel, setClose, deinit] = @lazyLoad(nativeType);
+ var [pull, start, cancel, setClose, deinit, setRefOrUnref, drain] = @lazyLoad(nativeType);
var closer = [false];
var handleResult;
function handleNativeReadableStreamPromiseResult(val) {
@@ -1870,8 +1870,6 @@ function lazyLoadStream(stream, autoAllocateChunkSize) {
handleResult = function handleResult(result, controller, view) {
"use strict";
-
-
if (result && @isPromise(result)) {
return result.then(
handleNativeReadableStreamPromiseResult.bind({
@@ -1896,53 +1894,96 @@ function lazyLoadStream(stream, autoAllocateChunkSize) {
}
};
+ function createResult(tag, controller, view, closer) {
+ closer[0] = false;
+
+ var result;
+ try {
+ result = pull(tag, view, closer);
+ } catch (err) {
+ return controller.error(err);
+ }
+
+ return handleResult(result, controller, view);
+ }
+
Prototype = class NativeReadableStreamSource {
- constructor(tag, autoAllocateChunkSize) {
- this.pull = this.pull_.bind(tag);
- this.cancel = this.cancel_.bind(tag);
+ constructor(tag, autoAllocateChunkSize, drainValue) {
+ this.#tag = tag;
+ this.pull = this.#pull.bind(this);
+ this.cancel = this.#cancel.bind(this);
this.autoAllocateChunkSize = autoAllocateChunkSize;
+
+ if (drainValue !== @undefined) {
+ this.start = (controller) => {
+ controller.enqueue(drainValue);
+ console.log("chunkSize", chunkSize);
+ };
+ }
}
pull;
cancel;
+ start;
+ #tag;
type = "bytes";
autoAllocateChunkSize = 0;
-
+
static startSync = start;
+
+
+ #pull(controller) {
+ var tag = this.#tag;
- pull_(controller) {
- closer[0] = false;
-
- var result;
-
- const view = controller.byobRequest.view;
- try {
- result = pull(this, view, closer);
- } catch (err) {
- return controller.error(err);
+ if (!tag) {
+ controller.close();
+ return;
}
- return handleResult(result, controller, view);
+ createResult(tag, controller, controller.byobRequest.view, closer);
}
- cancel_(reason) {
- cancel(this, reason);
+ #cancel(reason) {
+ var tag = this.#tag;
+ setRefOrUnref && setRefOrUnref(tag, false);
+ cancel(tag, reason);
}
static deinit = deinit;
static registry = new FinalizationRegistry(deinit);
+ static drain = drain;
};
@lazyStreamPrototypeMap.@set(nativeType, Prototype);
}
const chunkSize = Prototype.startSync(nativePtr, autoAllocateChunkSize);
+ var drainValue;
+ const drainFn = Prototype.drain;
+ if (drainFn) {
+ drainValue = drainFn(nativePtr);
+ }
// empty file, no need for native back-and-forth on this
if (chunkSize === 0) {
- @readableStreamClose(stream);
- return null;
+ if ((drainValue?.byteLength ?? 0) > 0) {
+ deinit && nativePtr && @enqueueJob(deinit, nativePtr);
+ return {
+ start(controller) {
+ controller.enqueue(drainValue);
+ controller.close();
+ },
+ type: "bytes",
+ };
+ }
+
+ return {
+ start(controller) {
+ controller.close();
+ },
+ type: "bytes",
+ };
}
- var instance = new Prototype(nativePtr, chunkSize);
+ var instance = new Prototype(nativePtr, chunkSize, drainValue);
Prototype.registry.register(instance, nativePtr);
return instance;
}