diff options
author | 2022-06-15 22:10:12 -0700 | |
---|---|---|
committer | 2022-06-15 22:10:12 -0700 | |
commit | e6fbbd48db4077ab3d35fef322d2612cb6141a12 (patch) | |
tree | 7f8c43e4f7f4a051ab1b35e93aab2baab550e6e6 /src/javascript/jsc/bindings/builtins/js | |
parent | 56e88fb4dd06e07569ddc3861e2e8e21f71e45b8 (diff) | |
download | bun-e6fbbd48db4077ab3d35fef322d2612cb6141a12.tar.gz bun-e6fbbd48db4077ab3d35fef322d2612cb6141a12.tar.zst bun-e6fbbd48db4077ab3d35fef322d2612cb6141a12.zip |
Fix lazy loading internal streams
Diffstat (limited to 'src/javascript/jsc/bindings/builtins/js')
3 files changed, 52 insertions, 58 deletions
diff --git a/src/javascript/jsc/bindings/builtins/js/ReadableByteStreamInternals.js b/src/javascript/jsc/bindings/builtins/js/ReadableByteStreamInternals.js index f59a7bdbc..01da62e1a 100644 --- a/src/javascript/jsc/bindings/builtins/js/ReadableByteStreamInternals.js +++ b/src/javascript/jsc/bindings/builtins/js/ReadableByteStreamInternals.js @@ -41,7 +41,7 @@ function privateInitializeReadableByteStreamController(stream, underlyingByteSou @putByIdDirectPrivate(this, "pulling", false); @readableByteStreamControllerClearPendingPullIntos(this); @putByIdDirectPrivate(this, "queue", @newQueue()); - @putByIdDirectPrivate(this, "started", -1); + @putByIdDirectPrivate(this, "started", 0); @putByIdDirectPrivate(this, "closeRequested", false); let hwm = @toNumber(highWaterMark); @@ -58,38 +58,28 @@ function privateInitializeReadableByteStreamController(stream, underlyingByteSou @putByIdDirectPrivate(this, "autoAllocateChunkSize", autoAllocateChunkSize); @putByIdDirectPrivate(this, "pendingPullIntos", @createFIFO()); - @putByIdDirectPrivate(this, "cancel", @readableByteStreamControllerCancel); - @putByIdDirectPrivate(this, "pull", @readableByteStreamControllerPull); - if (@getByIdDirectPrivate(underlyingByteSource, "lazy") === true) { - @putByIdDirectPrivate(this, "start", () => @readableStreamByteStreamControllerStart(this)); - } else { - @putByIdDirectPrivate(this, "start", @undefined); - @readableStreamByteStreamControllerStart(this); - } + const controller = this; + @promiseInvokeOrNoopNoCatch(@getByIdDirectPrivate(controller, "underlyingByteSource"), "start", [controller]).@then(() => { + @putByIdDirectPrivate(controller, "started", 1); + @assert(!@getByIdDirectPrivate(controller, "pulling")); + @assert(!@getByIdDirectPrivate(controller, "pullAgain")); + @readableByteStreamControllerCallPullIfNeeded(controller); + }, (error) => { + if (@getByIdDirectPrivate(stream, "state") === @streamReadable) + @readableByteStreamControllerError(controller, error); + }); + @putByIdDirectPrivate(this, "cancel", @readableByteStreamControllerCancel); + @putByIdDirectPrivate(this, "pull", @readableByteStreamControllerPull); + return this; } function readableStreamByteStreamControllerStart(controller) { "use strict"; @putByIdDirectPrivate(controller, "start", @undefined); - - if (@getByIdDirectPrivate(controller, "started") !== -1) - return; - @putByIdDirectPrivate(controller, "started", 0); - var stream = @getByIdDirectPrivate(controller, "controlledReadableStream"); - return @promiseInvokeOrNoopNoCatch(@getByIdDirectPrivate(controller, "underlyingByteSource"), "start", [controller]).@then(() => { - @putByIdDirectPrivate(controller, "started", 1); - @assert(!@getByIdDirectPrivate(controller, "pulling")); - @assert(!@getByIdDirectPrivate(controller, "pullAgain")); - @readableByteStreamControllerCallPullIfNeeded(controller); - }, (error) => { - var stream = @getByIdDirectPrivate(controller, "controlledReadableStream"); - if (stream && @getByIdDirectPrivate(stream, "state") === @streamReadable) - @readableByteStreamControllerError(controller, error); - }); } @@ -237,8 +227,7 @@ function readableByteStreamControllerPull(controller) const stream = @getByIdDirectPrivate(controller, "controlledReadableStream"); @assert(@readableStreamHasDefaultReader(stream)); - - if (@getByIdDirectPrivate(controller, "queue").size > 0) { + if (@getByIdDirectPrivate(controller, "queue").content?.isNotEmpty()) { const entry = @getByIdDirectPrivate(controller, "queue").content.shift(); @getByIdDirectPrivate(controller, "queue").size -= entry.byteLength; @readableByteStreamControllerHandleQueueDrain(controller); diff --git a/src/javascript/jsc/bindings/builtins/js/ReadableStream.js b/src/javascript/jsc/bindings/builtins/js/ReadableStream.js index cfe68d13c..db7cf85a8 100644 --- a/src/javascript/jsc/bindings/builtins/js/ReadableStream.js +++ b/src/javascript/jsc/bindings/builtins/js/ReadableStream.js @@ -29,7 +29,7 @@ function initializeReadableStream(underlyingSource, strategy) "use strict"; if (underlyingSource === @undefined) - underlyingSource = { }; + underlyingSource = { @bunNativeType: 0, @bunNativePtr: 0, @lazy: false }; if (strategy === @undefined) strategy = { }; @@ -49,6 +49,8 @@ function initializeReadableStream(underlyingSource, strategy) // Initialized with null value to enable distinction with undefined case. @putByIdDirectPrivate(this, "readableStreamController", null); + @putByIdDirectPrivate(this, "bunNativeType", @getByIdDirectPrivate(underlyingSource, "bunNativeType") ?? 0); + @putByIdDirectPrivate(this, "bunNativePtr", @getByIdDirectPrivate(underlyingSource, "bunNativePtr") ?? 0); const isDirect = underlyingSource.type === "direct"; // direct streams are always lazy @@ -65,16 +67,20 @@ function initializeReadableStream(underlyingSource, strategy) return this; } if (isDirect) { - if ("start" in underlyingSource && typeof underlyingSource.start === "function") - @throwTypeError("\"start\" for direct streams are not implemented yet"); - - @putByIdDirectPrivate(this, "start", () => @createReadableStreamController.@call(this, underlyingSource, strategy, true)); + @putByIdDirectPrivate(this, "start", () => @createReadableStreamController(this, underlyingSource, strategy)); } else if (isLazy) { const autoAllocateChunkSize = underlyingSource.autoAllocateChunkSize; - @putByIdDirectPrivate(this, "start", () => @lazyLoadStream(this, autoAllocateChunkSize)); + + + @putByIdDirectPrivate(this, "start", () => { + const instance = @lazyLoadStream(this, autoAllocateChunkSize); + if (instance) { + @createReadableStreamController(this, instance, strategy); + } + }); } else { @putByIdDirectPrivate(this, "start", @undefined); - @createReadableStreamController.@call(this, underlyingSource, strategy, false); + @createReadableStreamController(this, underlyingSource, strategy); } @@ -327,6 +333,8 @@ function consumeReadableStream(nativePtr, nativeType, inputStream) { @globalPrivate function createEmptyReadableStream() { + "use strict"; + var stream = new @ReadableStream({ pull() {}, }); @@ -337,13 +345,12 @@ function createEmptyReadableStream() { @globalPrivate function createNativeReadableStream(nativePtr, nativeType, autoAllocateChunkSize) { "use strict"; - stream = new @ReadableStream({ + return new @ReadableStream({ @lazy: true, + @bunNativeType: nativeType, + @bunNativePtr: nativePtr, autoAllocateChunkSize: autoAllocateChunkSize, }); - @putByIdDirectPrivate(stream, "bunNativeType", nativeType); - @putByIdDirectPrivate(stream, "bunNativePtr", nativePtr); - return stream; } function cancel(reason) @@ -370,8 +377,10 @@ function getReader(options) if (mode === @undefined) { var start_ = @getByIdDirectPrivate(this, "start"); if (start_) { - start_.@call(this); + @putByIdDirectPrivate(this, "start", @undefined); + start_(); } + return new @ReadableStreamDefaultReader(this); } // String conversion is required by spec, hence double equals. diff --git a/src/javascript/jsc/bindings/builtins/js/ReadableStreamInternals.js b/src/javascript/jsc/bindings/builtins/js/ReadableStreamInternals.js index d2dfd5137..3e6590f31 100644 --- a/src/javascript/jsc/bindings/builtins/js/ReadableStreamInternals.js +++ b/src/javascript/jsc/bindings/builtins/js/ReadableStreamInternals.js @@ -143,10 +143,8 @@ function setupReadableStreamDefaultController(stream, underlyingSource, size, hi } -function createReadableStreamController(underlyingSource, strategy, fromLazy) { - if (fromLazy) { - @putByIdDirectPrivate(this, "start", @undefined); - } +function createReadableStreamController(stream, underlyingSource, strategy) { + "use strict"; const type = underlyingSource.type; const typeString = @toString(type); @@ -160,15 +158,15 @@ function createReadableStreamController(underlyingSource, strategy, fromLazy) { if (strategy.size !== @undefined) @throwRangeError("Strategy for a ReadableByteStreamController cannot have a size"); - @putByIdDirectPrivate(this, "readableStreamController", new @ReadableByteStreamController(this, underlyingSource, strategy.highWaterMark, @isReadableStream)); + @putByIdDirectPrivate(stream, "readableStreamController", new @ReadableByteStreamController(stream, underlyingSource, strategy.highWaterMark, @isReadableStream)); } else if (typeString === "direct") { var highWaterMark = strategy?.highWaterMark; - @initializeArrayBufferStream.@call(this, underlyingSource, highWaterMark); + @initializeArrayBufferStream.@call(stream, underlyingSource, highWaterMark); } else if (type === @undefined) { if (strategy.highWaterMark === @undefined) strategy.highWaterMark = 1; - @setupReadableStreamDefaultController(this, underlyingSource, strategy.size, strategy.highWaterMark, underlyingSource.start, underlyingSource.pull, underlyingSource.cancel); + @setupReadableStreamDefaultController(stream, underlyingSource, strategy.size, strategy.highWaterMark, underlyingSource.start, underlyingSource.pull, underlyingSource.cancel); } else @throwRangeError("Invalid type for underlying source"); @@ -313,6 +311,8 @@ function pipeToDoReadWrite(pipeState) function pipeToErrorsMustBePropagatedForward(pipeState) { + "use strict"; + const action = () => { pipeState.pendingReadPromiseCapability.@resolve.@call(@undefined, false); const error = @getByIdDirectPrivate(pipeState.source, "storedError"); @@ -351,6 +351,7 @@ function pipeToErrorsMustBePropagatedBackward(pipeState) function pipeToClosingMustBePropagatedForward(pipeState) { + "use strict"; const action = () => { pipeState.pendingReadPromiseCapability.@resolve.@call(@undefined, false); const error = @getByIdDirectPrivate(pipeState.source, "storedError"); @@ -369,6 +370,7 @@ function pipeToClosingMustBePropagatedForward(pipeState) function pipeToClosingMustBePropagatedBackward(pipeState) { + "use strict"; if (!@writableStreamCloseQueuedOrInFlight(pipeState.destination) && @getByIdDirectPrivate(pipeState.destination, "state") !== "closed") return; @@ -1167,14 +1169,12 @@ function readableStreamDefaultControllerCanCloseOrEnqueue(controller) function lazyLoadStream(stream, autoAllocateChunkSize) { "use strict"; - @putByIdDirectPrivate(stream, "start", @undefined); - var bunNativeType = @getByIdDirectPrivate(stream, "bunNativeType"); - var bunNativePtr = @getByIdDirectPrivate(stream, "bunNativePtr"); - - var cached = globalThis[globalThis.Symbol.for("Bun.nativeReadableStreamPrototype")] ||= new @Map; + var nativeType = @getByIdDirectPrivate(stream, "bunNativeType"); + var nativePtr = @getByIdDirectPrivate(stream, "bunNativePtr"); + var cached = @lazyStreamPrototypeMap; var Prototype = cached.@get(nativeType); if (Prototype === @undefined) { - var [pull, start, cancel, setClose, deinit] = globalThis[globalThis.Symbol.for("Bun.lazy")](nativeType); + var [pull, start, cancel, setClose, deinit] = @lazyLoad(nativeType); var closer = [false]; var handleResult; function handleNativeReadableStreamPromiseResult(val) { @@ -1236,24 +1236,20 @@ function lazyLoadStream(stream, autoAllocateChunkSize) { cancel_(reason) { cancel(this, reason); } - + static deinit = deinit; static registry = new FinalizationRegistry(deinit); } cached.@set(nativeType, Prototype); } - // either returns the chunk size - // or throws an error - // should never return a Promise const chunkSize = Prototype.startSync(nativePtr, autoAllocateChunkSize); // empty file, no need for native back-and-forth on this if (chunkSize === 0) { @readableStreamClose(stream); - return; + return null; } - var instance = new Prototype(nativePtr, chunkSize); Prototype.registry.register(instance, nativePtr); - @createReadableStreamController.@call(stream, instance, @undefined, true); + return instance; }
\ No newline at end of file |