aboutsummaryrefslogtreecommitdiff
path: root/src/bun.js/builtins/cpp/ReadableStreamInternalsBuiltins.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/bun.js/builtins/cpp/ReadableStreamInternalsBuiltins.cpp')
-rw-r--r--src/bun.js/builtins/cpp/ReadableStreamInternalsBuiltins.cpp91
1 files changed, 66 insertions, 25 deletions
diff --git a/src/bun.js/builtins/cpp/ReadableStreamInternalsBuiltins.cpp b/src/bun.js/builtins/cpp/ReadableStreamInternalsBuiltins.cpp
index 90b9eeada..f2a3d5a55 100644
--- a/src/bun.js/builtins/cpp/ReadableStreamInternalsBuiltins.cpp
+++ b/src/bun.js/builtins/cpp/ReadableStreamInternalsBuiltins.cpp
@@ -1039,7 +1039,7 @@ const char* const s_readableStreamInternalsReadDirectStreamCode =
"\n" \
" if (highWaterMark) {\n" \
" sink.start({\n" \
- " highWaterMark,\n" \
+ " highWaterMark: highWaterMark < 64 ? 64 : highWaterMark,\n" \
" });\n" \
" }\n" \
"\n" \
@@ -2270,7 +2270,7 @@ const char* const s_readableStreamInternalsReadableStreamDefaultControllerCanClo
const JSC::ConstructAbility s_readableStreamInternalsLazyLoadStreamCodeConstructAbility = JSC::ConstructAbility::CannotConstruct;
const JSC::ConstructorKind s_readableStreamInternalsLazyLoadStreamCodeConstructorKind = JSC::ConstructorKind::None;
const JSC::ImplementationVisibility s_readableStreamInternalsLazyLoadStreamCodeImplementationVisibility = JSC::ImplementationVisibility::Public;
-const int s_readableStreamInternalsLazyLoadStreamCodeLength = 2614;
+const int s_readableStreamInternalsLazyLoadStreamCodeLength = 3647;
static const JSC::Intrinsic s_readableStreamInternalsLazyLoadStreamCodeIntrinsic = JSC::NoIntrinsic;
const char* const s_readableStreamInternalsLazyLoadStreamCode =
"(function (stream, autoAllocateChunkSize) {\n" \
@@ -2280,7 +2280,7 @@ const char* const s_readableStreamInternalsLazyLoadStreamCode =
" var nativePtr = @getByIdDirectPrivate(stream, \"bunNativePtr\");\n" \
" var Prototype = @lazyStreamPrototypeMap.@get(nativeType);\n" \
" if (Prototype === @undefined) {\n" \
- " var [pull, start, cancel, setClose, deinit] = @lazyLoad(nativeType);\n" \
+ " var [pull, start, cancel, setClose, deinit, setRefOrUnref, drain] = @lazyLoad(nativeType);\n" \
" var closer = [false];\n" \
" var handleResult;\n" \
" function handleNativeReadableStreamPromiseResult(val) {\n" \
@@ -2293,8 +2293,6 @@ const char* const s_readableStreamInternalsLazyLoadStreamCode =
"\n" \
" handleResult = function handleResult(result, controller, view) {\n" \
" \"use strict\";\n" \
- "\n" \
- " \n" \
" if (result && @isPromise(result)) {\n" \
" return result.then(\n" \
" handleNativeReadableStreamPromiseResult.bind({\n" \
@@ -2319,53 +2317,96 @@ const char* const s_readableStreamInternalsLazyLoadStreamCode =
" }\n" \
" };\n" \
"\n" \
+ " function createResult(tag, controller, view, closer) {\n" \
+ " closer[0] = false;\n" \
+ "\n" \
+ " var result;\n" \
+ " try {\n" \
+ " result = pull(tag, view, closer);\n" \
+ " } catch (err) {\n" \
+ " return controller.error(err);\n" \
+ " }\n" \
+ "\n" \
+ " return handleResult(result, controller, view);\n" \
+ " }\n" \
+ "\n" \
" Prototype = class NativeReadableStreamSource {\n" \
- " constructor(tag, autoAllocateChunkSize) {\n" \
- " this.pull = this.pull_.bind(tag);\n" \
- " this.cancel = this.cancel_.bind(tag);\n" \
+ " constructor(tag, autoAllocateChunkSize, drainValue) {\n" \
+ " this.#tag = tag;\n" \
+ " this.pull = this.#pull.bind(this);\n" \
+ " this.cancel = this.#cancel.bind(this);\n" \
" this.autoAllocateChunkSize = autoAllocateChunkSize;\n" \
+ "\n" \
+ " if (drainValue !== @undefined) {\n" \
+ " this.start = (controller) => {\n" \
+ " controller.enqueue(drainValue);\n" \
+ " console.log(\"chunkSize\", chunkSize);\n" \
+ " };\n" \
+ " }\n" \
" }\n" \
"\n" \
" pull;\n" \
" cancel;\n" \
+ " start;\n" \
"\n" \
+ " #tag;\n" \
" type = \"bytes\";\n" \
" autoAllocateChunkSize = 0;\n" \
- "\n" \
+ " \n" \
" static startSync = start;\n" \
+ " \n" \
+ " \n" \
+ " #pull(controller) {\n" \
+ " var tag = this.#tag;\n" \
"\n" \
- " pull_(controller) {\n" \
- " closer[0] = false;\n" \
- "\n" \
- " var result;\n" \
- "\n" \
- " const view = controller.byobRequest.view;\n" \
- " try {\n" \
- " result = pull(this, view, closer);\n" \
- " } catch (err) {\n" \
- " return controller.error(err);\n" \
+ " if (!tag) {\n" \
+ " controller.close();\n" \
+ " return;\n" \
" }\n" \
"\n" \
- " return handleResult(result, controller, view);\n" \
+ " createResult(tag, controller, controller.byobRequest.view, closer);\n" \
" }\n" \
"\n" \
- " cancel_(reason) {\n" \
- " cancel(this, reason);\n" \
+ " #cancel(reason) {\n" \
+ " var tag = this.#tag;\n" \
+ " setRefOrUnref && setRefOrUnref(tag, false);\n" \
+ " cancel(tag, reason);\n" \
" }\n" \
" static deinit = deinit;\n" \
" static registry = new FinalizationRegistry(deinit);\n" \
+ " static drain = drain;\n" \
" };\n" \
" @lazyStreamPrototypeMap.@set(nativeType, Prototype);\n" \
" }\n" \
"\n" \
" const chunkSize = Prototype.startSync(nativePtr, autoAllocateChunkSize);\n" \
+ " var drainValue;\n" \
+ " const drainFn = Prototype.drain;\n" \
+ " if (drainFn) {\n" \
+ " drainValue = drainFn(nativePtr);\n" \
+ " }\n" \
"\n" \
" //\n" \
" if (chunkSize === 0) {\n" \
- " @readableStreamClose(stream);\n" \
- " return null;\n" \
+ " if ((drainValue?.byteLength ?? 0) > 0) {\n" \
+ " deinit && nativePtr && @enqueueJob(deinit, nativePtr);\n" \
+ " return {\n" \
+ " start(controller) {\n" \
+ " controller.enqueue(drainValue);\n" \
+ " controller.close();\n" \
+ " },\n" \
+ " type: \"bytes\",\n" \
+ " };\n" \
+ " }\n" \
+ "\n" \
+ " return {\n" \
+ " start(controller) {\n" \
+ " controller.close();\n" \
+ " },\n" \
+ " type: \"bytes\",\n" \
+ " };\n" \
" }\n" \
- " var instance = new Prototype(nativePtr, chunkSize);\n" \
+ " var instance = new Prototype(nativePtr, chunkSize, drainValue);\n" \
" Prototype.registry.register(instance, nativePtr);\n" \
" return instance;\n" \
"})\n" \