diff options
Diffstat (limited to 'src/javascript/jsc/bindings/builtins/js/ReadableStream.js')
-rw-r--r-- | src/javascript/jsc/bindings/builtins/js/ReadableStream.js | 84 |
1 files changed, 48 insertions, 36 deletions
diff --git a/src/javascript/jsc/bindings/builtins/js/ReadableStream.js b/src/javascript/jsc/bindings/builtins/js/ReadableStream.js index 5a5ea4094..4d7113888 100644 --- a/src/javascript/jsc/bindings/builtins/js/ReadableStream.js +++ b/src/javascript/jsc/bindings/builtins/js/ReadableStream.js @@ -89,79 +89,69 @@ function initializeReadableStream(underlyingSource, strategy) } @globalPrivate -function createNativeReadableStream(nativePtr, nativeType) { +function createNativeReadableStream(nativePtr, nativeType, autoAllocateChunkSize) { "use strict"; var cached = globalThis[Symbol.for("Bun.nativeReadableStreamPrototype")] ||= new @Map; var Prototype = cached.@get(nativeType); if (Prototype === @undefined) { var [pull, start, cancel, setClose, deinit] = globalThis[Symbol.for("Bun.lazy")](nativeType); var closer = [false]; - +var handleResult; function handleNativeReadableStreamPromiseResult(val) { "use strict"; - var {r, c} = this; - this.r = @undefined; + var {c, v} = this; this.c = @undefined; - r(val, c); + this.v = @undefined; + handleResult(val, c, v); } - function closeNativeReadableStreamOnNextTick(controller) { - "use strict"; - controller.close(); - controller = @undefined; - } - - var handleResult = function handleResult(result, controller) { + + handleResult = function handleResult(result, controller, view) { "use strict"; if (result && @isPromise(result)) { - return result.then(handleNativeReadableStreamPromiseResult.bind({c: controller, r: handleResult}), controller.error); + return result.then(handleNativeReadableStreamPromiseResult.bind({c: controller, v: view}), (err) => controller.error(err)); } else if (result !== false) { - controller.enqueue(result); + if (view && view.byteLength === result) { + controller.byobRequest.respondWithNewView(view); + } else { + controller.byobRequest.respond(result); + } } if (closer[0] || result === false) { - @enqueueJob(closeNativeReadableStreamOnNextTick, controller); + @enqueueJob(() => controller.close()); closer[0] = false; } - } + }; Prototype = class NativeReadableStreamSource { - constructor(tag) { + constructor(tag, autoAllocateChunkSize) { this.pull = this.pull_.bind(tag); - this.start = this.start_.bind(tag); this.cancel = this.cancel_.bind(tag); + this.autoAllocateChunkSize = autoAllocateChunkSize; } pull; - start; cancel; - - pull_(controller) { - closer[0] = false; - var result; - - try { - result = pull(this, closer); - } catch(err) { - return controller.error(err); - } - return handleResult(result, controller); - } + type = "bytes"; + autoAllocateChunkSize = 0; - start_(controller) { - setClose(this, controller.close); + static startSync = start; + + pull_(controller) { closer[0] = false; var result; + const view = controller.byobRequest.view; try { - result = start(this, closer); + result = pull(this, view, closer); } catch(err) { return controller.error(err); } - return handleResult(result, controller); + return handleResult(result, controller, view); } cancel_(reason) { @@ -173,7 +163,29 @@ function createNativeReadableStream(nativePtr, nativeType) { cached.@set(nativeType, Prototype); } - var instance = new Prototype(nativePtr); + // either returns the chunk size + // or throws an error + // should never return a Promise + const chunkSize = Prototype.startSync(nativePtr, autoAllocateChunkSize); + + // empty file, no need for native back-and-forth on this + if (chunkSize === 0) { + return new @ReadableStream({ + start(controller) { + controller.close(); + }, + + pull() { + + }, + + cancel() { + + }, + }); + } + + var instance = new Prototype(nativePtr, chunkSize); Prototype.registry.register(instance, nativePtr); var stream = new @ReadableStream(instance); @putByIdDirectPrivate(stream, "bunNativeType", nativeType); |