diff options
author | 2022-11-23 07:14:33 -0800 | |
---|---|---|
committer | 2022-11-23 07:14:33 -0800 | |
commit | ac36ea51cfb85130403ac09299f8e1207bad4bcb (patch) | |
tree | a05bc2d34295bc0087b68b799155f18050451721 /src/bun.js/builtins/js/ReadableStreamInternals.js | |
parent | ae3fcb5bd89a4ac908ba6d4cdb1be4e7c7f0ea81 (diff) | |
download | bun-ac36ea51cfb85130403ac09299f8e1207bad4bcb.tar.gz bun-ac36ea51cfb85130403ac09299f8e1207bad4bcb.tar.zst bun-ac36ea51cfb85130403ac09299f8e1207bad4bcb.zip |
possibly more reliable Bun.spawn (#1547)
* wip
* wip
* Fix bug with stdin
* zig fmt
* seems to work!
* Update streams.test.js
Co-authored-by: Jarred Sumner <709451+Jarred-Sumner@users.noreply.github.com>
Diffstat (limited to 'src/bun.js/builtins/js/ReadableStreamInternals.js')
-rw-r--r-- | src/bun.js/builtins/js/ReadableStreamInternals.js | 89 |
1 files changed, 65 insertions, 24 deletions
diff --git a/src/bun.js/builtins/js/ReadableStreamInternals.js b/src/bun.js/builtins/js/ReadableStreamInternals.js index e8c9667a0..def4d51a3 100644 --- a/src/bun.js/builtins/js/ReadableStreamInternals.js +++ b/src/bun.js/builtins/js/ReadableStreamInternals.js @@ -839,7 +839,7 @@ function readDirectStream(stream, sink, underlyingSource) { if (highWaterMark) { sink.start({ - highWaterMark, + highWaterMark: highWaterMark < 64 ? 64 : highWaterMark, }); } @@ -1857,7 +1857,7 @@ function lazyLoadStream(stream, autoAllocateChunkSize) { var nativePtr = @getByIdDirectPrivate(stream, "bunNativePtr"); var Prototype = @lazyStreamPrototypeMap.@get(nativeType); if (Prototype === @undefined) { - var [pull, start, cancel, setClose, deinit] = @lazyLoad(nativeType); + var [pull, start, cancel, setClose, deinit, setRefOrUnref, drain] = @lazyLoad(nativeType); var closer = [false]; var handleResult; function handleNativeReadableStreamPromiseResult(val) { @@ -1870,8 +1870,6 @@ function lazyLoadStream(stream, autoAllocateChunkSize) { handleResult = function handleResult(result, controller, view) { "use strict"; - - if (result && @isPromise(result)) { return result.then( handleNativeReadableStreamPromiseResult.bind({ @@ -1896,53 +1894,96 @@ function lazyLoadStream(stream, autoAllocateChunkSize) { } }; + function createResult(tag, controller, view, closer) { + closer[0] = false; + + var result; + try { + result = pull(tag, view, closer); + } catch (err) { + return controller.error(err); + } + + return handleResult(result, controller, view); + } + Prototype = class NativeReadableStreamSource { - constructor(tag, autoAllocateChunkSize) { - this.pull = this.pull_.bind(tag); - this.cancel = this.cancel_.bind(tag); + constructor(tag, autoAllocateChunkSize, drainValue) { + this.#tag = tag; + this.pull = this.#pull.bind(this); + this.cancel = this.#cancel.bind(this); this.autoAllocateChunkSize = autoAllocateChunkSize; + + if (drainValue !== @undefined) { + this.start = (controller) => { + controller.enqueue(drainValue); + console.log("chunkSize", chunkSize); + }; + } } pull; cancel; + start; + #tag; type = "bytes"; autoAllocateChunkSize = 0; - + static startSync = start; + + + #pull(controller) { + var tag = this.#tag; - pull_(controller) { - closer[0] = false; - - var result; - - const view = controller.byobRequest.view; - try { - result = pull(this, view, closer); - } catch (err) { - return controller.error(err); + if (!tag) { + controller.close(); + return; } - return handleResult(result, controller, view); + createResult(tag, controller, controller.byobRequest.view, closer); } - cancel_(reason) { - cancel(this, reason); + #cancel(reason) { + var tag = this.#tag; + setRefOrUnref && setRefOrUnref(tag, false); + cancel(tag, reason); } static deinit = deinit; static registry = new FinalizationRegistry(deinit); + static drain = drain; }; @lazyStreamPrototypeMap.@set(nativeType, Prototype); } const chunkSize = Prototype.startSync(nativePtr, autoAllocateChunkSize); + var drainValue; + const drainFn = Prototype.drain; + if (drainFn) { + drainValue = drainFn(nativePtr); + } // empty file, no need for native back-and-forth on this if (chunkSize === 0) { - @readableStreamClose(stream); - return null; + if ((drainValue?.byteLength ?? 0) > 0) { + deinit && nativePtr && @enqueueJob(deinit, nativePtr); + return { + start(controller) { + controller.enqueue(drainValue); + controller.close(); + }, + type: "bytes", + }; + } + + return { + start(controller) { + controller.close(); + }, + type: "bytes", + }; } - var instance = new Prototype(nativePtr, chunkSize); + var instance = new Prototype(nativePtr, chunkSize, drainValue); Prototype.registry.register(instance, nativePtr); return instance; } |