diff options
Diffstat (limited to 'src/bun.js/builtins')
-rw-r--r-- | src/bun.js/builtins/cpp/ReadableStreamInternalsBuiltins.cpp | 91 | ||||
-rw-r--r-- | src/bun.js/builtins/js/ReadableStreamInternals.js | 89 |
2 files changed, 131 insertions, 49 deletions
diff --git a/src/bun.js/builtins/cpp/ReadableStreamInternalsBuiltins.cpp b/src/bun.js/builtins/cpp/ReadableStreamInternalsBuiltins.cpp index 90b9eeada..f2a3d5a55 100644 --- a/src/bun.js/builtins/cpp/ReadableStreamInternalsBuiltins.cpp +++ b/src/bun.js/builtins/cpp/ReadableStreamInternalsBuiltins.cpp @@ -1039,7 +1039,7 @@ const char* const s_readableStreamInternalsReadDirectStreamCode = "\n" \ " if (highWaterMark) {\n" \ " sink.start({\n" \ - " highWaterMark,\n" \ + " highWaterMark: highWaterMark < 64 ? 64 : highWaterMark,\n" \ " });\n" \ " }\n" \ "\n" \ @@ -2270,7 +2270,7 @@ const char* const s_readableStreamInternalsReadableStreamDefaultControllerCanClo const JSC::ConstructAbility s_readableStreamInternalsLazyLoadStreamCodeConstructAbility = JSC::ConstructAbility::CannotConstruct; const JSC::ConstructorKind s_readableStreamInternalsLazyLoadStreamCodeConstructorKind = JSC::ConstructorKind::None; const JSC::ImplementationVisibility s_readableStreamInternalsLazyLoadStreamCodeImplementationVisibility = JSC::ImplementationVisibility::Public; -const int s_readableStreamInternalsLazyLoadStreamCodeLength = 2614; +const int s_readableStreamInternalsLazyLoadStreamCodeLength = 3647; static const JSC::Intrinsic s_readableStreamInternalsLazyLoadStreamCodeIntrinsic = JSC::NoIntrinsic; const char* const s_readableStreamInternalsLazyLoadStreamCode = "(function (stream, autoAllocateChunkSize) {\n" \ @@ -2280,7 +2280,7 @@ const char* const s_readableStreamInternalsLazyLoadStreamCode = " var nativePtr = @getByIdDirectPrivate(stream, \"bunNativePtr\");\n" \ " var Prototype = @lazyStreamPrototypeMap.@get(nativeType);\n" \ " if (Prototype === @undefined) {\n" \ - " var [pull, start, cancel, setClose, deinit] = @lazyLoad(nativeType);\n" \ + " var [pull, start, cancel, setClose, deinit, setRefOrUnref, drain] = @lazyLoad(nativeType);\n" \ " var closer = [false];\n" \ " var handleResult;\n" \ " function handleNativeReadableStreamPromiseResult(val) {\n" \ @@ -2293,8 +2293,6 @@ const char* const s_readableStreamInternalsLazyLoadStreamCode = "\n" \ " handleResult = function handleResult(result, controller, view) {\n" \ " \"use strict\";\n" \ - "\n" \ - " \n" \ " if (result && @isPromise(result)) {\n" \ " return result.then(\n" \ " handleNativeReadableStreamPromiseResult.bind({\n" \ @@ -2319,53 +2317,96 @@ const char* const s_readableStreamInternalsLazyLoadStreamCode = " }\n" \ " };\n" \ "\n" \ + " function createResult(tag, controller, view, closer) {\n" \ + " closer[0] = false;\n" \ + "\n" \ + " var result;\n" \ + " try {\n" \ + " result = pull(tag, view, closer);\n" \ + " } catch (err) {\n" \ + " return controller.error(err);\n" \ + " }\n" \ + "\n" \ + " return handleResult(result, controller, view);\n" \ + " }\n" \ + "\n" \ " Prototype = class NativeReadableStreamSource {\n" \ - " constructor(tag, autoAllocateChunkSize) {\n" \ - " this.pull = this.pull_.bind(tag);\n" \ - " this.cancel = this.cancel_.bind(tag);\n" \ + " constructor(tag, autoAllocateChunkSize, drainValue) {\n" \ + " this.#tag = tag;\n" \ + " this.pull = this.#pull.bind(this);\n" \ + " this.cancel = this.#cancel.bind(this);\n" \ " this.autoAllocateChunkSize = autoAllocateChunkSize;\n" \ + "\n" \ + " if (drainValue !== @undefined) {\n" \ + " this.start = (controller) => {\n" \ + " controller.enqueue(drainValue);\n" \ + " console.log(\"chunkSize\", chunkSize);\n" \ + " };\n" \ + " }\n" \ " }\n" \ "\n" \ " pull;\n" \ " cancel;\n" \ + " start;\n" \ "\n" \ + " #tag;\n" \ " type = \"bytes\";\n" \ " autoAllocateChunkSize = 0;\n" \ - "\n" \ + " \n" \ " static startSync = start;\n" \ + " \n" \ + " \n" \ + " #pull(controller) {\n" \ + " var tag = this.#tag;\n" \ "\n" \ - " pull_(controller) {\n" \ - " closer[0] = false;\n" \ - "\n" \ - " var result;\n" \ - "\n" \ - " const view = controller.byobRequest.view;\n" \ - " try {\n" \ - " result = pull(this, view, closer);\n" \ - " } catch (err) {\n" \ - " return controller.error(err);\n" \ + " if (!tag) {\n" \ + " controller.close();\n" \ + " return;\n" \ " }\n" \ "\n" \ - " return handleResult(result, controller, view);\n" \ + " createResult(tag, controller, controller.byobRequest.view, closer);\n" \ " }\n" \ "\n" \ - " cancel_(reason) {\n" \ - " cancel(this, reason);\n" \ + " #cancel(reason) {\n" \ + " var tag = this.#tag;\n" \ + " setRefOrUnref && setRefOrUnref(tag, false);\n" \ + " cancel(tag, reason);\n" \ " }\n" \ " static deinit = deinit;\n" \ " static registry = new FinalizationRegistry(deinit);\n" \ + " static drain = drain;\n" \ " };\n" \ " @lazyStreamPrototypeMap.@set(nativeType, Prototype);\n" \ " }\n" \ "\n" \ " const chunkSize = Prototype.startSync(nativePtr, autoAllocateChunkSize);\n" \ + " var drainValue;\n" \ + " const drainFn = Prototype.drain;\n" \ + " if (drainFn) {\n" \ + " drainValue = drainFn(nativePtr);\n" \ + " }\n" \ "\n" \ " //\n" \ " if (chunkSize === 0) {\n" \ - " @readableStreamClose(stream);\n" \ - " return null;\n" \ + " if ((drainValue?.byteLength ?? 0) > 0) {\n" \ + " deinit && nativePtr && @enqueueJob(deinit, nativePtr);\n" \ + " return {\n" \ + " start(controller) {\n" \ + " controller.enqueue(drainValue);\n" \ + " controller.close();\n" \ + " },\n" \ + " type: \"bytes\",\n" \ + " };\n" \ + " }\n" \ + "\n" \ + " return {\n" \ + " start(controller) {\n" \ + " controller.close();\n" \ + " },\n" \ + " type: \"bytes\",\n" \ + " };\n" \ " }\n" \ - " var instance = new Prototype(nativePtr, chunkSize);\n" \ + " var instance = new Prototype(nativePtr, chunkSize, drainValue);\n" \ " Prototype.registry.register(instance, nativePtr);\n" \ " return instance;\n" \ "})\n" \ diff --git a/src/bun.js/builtins/js/ReadableStreamInternals.js b/src/bun.js/builtins/js/ReadableStreamInternals.js index e8c9667a0..def4d51a3 100644 --- a/src/bun.js/builtins/js/ReadableStreamInternals.js +++ b/src/bun.js/builtins/js/ReadableStreamInternals.js @@ -839,7 +839,7 @@ function readDirectStream(stream, sink, underlyingSource) { if (highWaterMark) { sink.start({ - highWaterMark, + highWaterMark: highWaterMark < 64 ? 64 : highWaterMark, }); } @@ -1857,7 +1857,7 @@ function lazyLoadStream(stream, autoAllocateChunkSize) { var nativePtr = @getByIdDirectPrivate(stream, "bunNativePtr"); var Prototype = @lazyStreamPrototypeMap.@get(nativeType); if (Prototype === @undefined) { - var [pull, start, cancel, setClose, deinit] = @lazyLoad(nativeType); + var [pull, start, cancel, setClose, deinit, setRefOrUnref, drain] = @lazyLoad(nativeType); var closer = [false]; var handleResult; function handleNativeReadableStreamPromiseResult(val) { @@ -1870,8 +1870,6 @@ function lazyLoadStream(stream, autoAllocateChunkSize) { handleResult = function handleResult(result, controller, view) { "use strict"; - - if (result && @isPromise(result)) { return result.then( handleNativeReadableStreamPromiseResult.bind({ @@ -1896,53 +1894,96 @@ function lazyLoadStream(stream, autoAllocateChunkSize) { } }; + function createResult(tag, controller, view, closer) { + closer[0] = false; + + var result; + try { + result = pull(tag, view, closer); + } catch (err) { + return controller.error(err); + } + + return handleResult(result, controller, view); + } + Prototype = class NativeReadableStreamSource { - constructor(tag, autoAllocateChunkSize) { - this.pull = this.pull_.bind(tag); - this.cancel = this.cancel_.bind(tag); + constructor(tag, autoAllocateChunkSize, drainValue) { + this.#tag = tag; + this.pull = this.#pull.bind(this); + this.cancel = this.#cancel.bind(this); this.autoAllocateChunkSize = autoAllocateChunkSize; + + if (drainValue !== @undefined) { + this.start = (controller) => { + controller.enqueue(drainValue); + console.log("chunkSize", chunkSize); + }; + } } pull; cancel; + start; + #tag; type = "bytes"; autoAllocateChunkSize = 0; - + static startSync = start; + + + #pull(controller) { + var tag = this.#tag; - pull_(controller) { - closer[0] = false; - - var result; - - const view = controller.byobRequest.view; - try { - result = pull(this, view, closer); - } catch (err) { - return controller.error(err); + if (!tag) { + controller.close(); + return; } - return handleResult(result, controller, view); + createResult(tag, controller, controller.byobRequest.view, closer); } - cancel_(reason) { - cancel(this, reason); + #cancel(reason) { + var tag = this.#tag; + setRefOrUnref && setRefOrUnref(tag, false); + cancel(tag, reason); } static deinit = deinit; static registry = new FinalizationRegistry(deinit); + static drain = drain; }; @lazyStreamPrototypeMap.@set(nativeType, Prototype); } const chunkSize = Prototype.startSync(nativePtr, autoAllocateChunkSize); + var drainValue; + const drainFn = Prototype.drain; + if (drainFn) { + drainValue = drainFn(nativePtr); + } // empty file, no need for native back-and-forth on this if (chunkSize === 0) { - @readableStreamClose(stream); - return null; + if ((drainValue?.byteLength ?? 0) > 0) { + deinit && nativePtr && @enqueueJob(deinit, nativePtr); + return { + start(controller) { + controller.enqueue(drainValue); + controller.close(); + }, + type: "bytes", + }; + } + + return { + start(controller) { + controller.close(); + }, + type: "bytes", + }; } - var instance = new Prototype(nativePtr, chunkSize); + var instance = new Prototype(nativePtr, chunkSize, drainValue); Prototype.registry.register(instance, nativePtr); return instance; } |