diff options
author | 2022-06-15 02:00:45 -0700 | |
---|---|---|
committer | 2022-06-15 02:00:45 -0700 | |
commit | d93f09331394148441d142930fea236a9fd73c5c (patch) | |
tree | a1838e49a1ac0d94cfcc13d8a954972cd51677e5 /src/javascript/jsc/bindings/builtins/js/ReadableStreamInternals.js | |
parent | dbde52504d50b8d5bb3e071ce4b5a959c28eb760 (diff) | |
download | bun-d93f09331394148441d142930fea236a9fd73c5c.tar.gz bun-d93f09331394148441d142930fea236a9fd73c5c.tar.zst bun-d93f09331394148441d142930fea236a9fd73c5c.zip |
wip direct streams
Diffstat (limited to 'src/javascript/jsc/bindings/builtins/js/ReadableStreamInternals.js')
-rw-r--r-- | src/javascript/jsc/bindings/builtins/js/ReadableStreamInternals.js | 53 |
1 files changed, 43 insertions, 10 deletions
diff --git a/src/javascript/jsc/bindings/builtins/js/ReadableStreamInternals.js b/src/javascript/jsc/bindings/builtins/js/ReadableStreamInternals.js index 45611a0e4..5d0779745 100644 --- a/src/javascript/jsc/bindings/builtins/js/ReadableStreamInternals.js +++ b/src/javascript/jsc/bindings/builtins/js/ReadableStreamInternals.js @@ -58,13 +58,11 @@ function privateInitializeReadableStreamDefaultController(stream, underlyingSour @putByIdDirectPrivate(this, "controlledReadableStream", stream); @putByIdDirectPrivate(this, "underlyingSource", underlyingSource); @putByIdDirectPrivate(this, "queue", @newQueue()); - @putByIdDirectPrivate(this, "started", false); + @putByIdDirectPrivate(this, "started", -1); @putByIdDirectPrivate(this, "closeRequested", false); @putByIdDirectPrivate(this, "pullAgain", false); @putByIdDirectPrivate(this, "pulling", false); @putByIdDirectPrivate(this, "strategy", @validateAndNormalizeQueuingStrategy(size, highWaterMark)); - - return this; } @@ -77,7 +75,7 @@ function setupReadableStreamDefaultController(stream, underlyingSource, size, hi "use strict"; const controller = new @ReadableStreamDefaultController(stream, underlyingSource, size, highWaterMark, @isReadableStream); - const startAlgorithm = () => @promiseInvokeOrNoopMethodNoCatch(underlyingSource, startMethod, [controller]); + const pullAlgorithm = () => @promiseInvokeOrNoopMethod(underlyingSource, pullMethod, [controller]); const cancelAlgorithm = (reason) => @promiseInvokeOrNoopMethod(underlyingSource, cancelMethod, [reason]); @@ -87,12 +85,25 @@ function setupReadableStreamDefaultController(stream, underlyingSource, size, hi @putByIdDirectPrivate(controller, "cancel", @readableStreamDefaultControllerCancel); @putByIdDirectPrivate(stream, "readableStreamController", controller); - startAlgorithm().@then(() => { - @putByIdDirectPrivate(controller, "started", true); + if (@getByIdDirectPrivate(controller, "sink") === @undefined) { + @readableStreamDefaultControllerStart(controller); + } + +} + +function readableStreamDefaultControllerStart(controller) { + if (@getByIdDirectPrivate(controller, "started") !== -1) + return; + + const underlyingSource = @getByIdDirectPrivate(controller, "underlyingSource"); + const startMethod = underlyingSource.start; + @putByIdDirectPrivate(controller, "started", 0); + + return @promiseInvokeOrNoopMethodNoCatch(underlyingSource, startMethod, [controller]).@then(() => { + @putByIdDirectPrivate(controller, "started", 1); @assert(!@getByIdDirectPrivate(controller, "pulling")); @assert(!@getByIdDirectPrivate(controller, "pullAgain")); @readableStreamDefaultControllerCallPullIfNeeded(controller); - }, (error) => { @readableStreamDefaultControllerError(controller, error); }); @@ -106,6 +117,7 @@ function readableStreamDefaultControllerError(controller, error) if (@getByIdDirectPrivate(stream, "state") !== @streamReadable) return; @putByIdDirectPrivate(controller, "queue", @newQueue()); + @readableStreamError(stream, error); } @@ -138,8 +150,16 @@ function readableStreamPipeTo(stream, sink) doPipe(); } + + function acquireReadableStreamDefaultReader(stream) { + "use strict"; + var start = @getByIdDirectPrivate(stream, "start"); + if (start) { + start(); + } + return new @ReadableStreamDefaultReader(stream); } @@ -147,6 +167,8 @@ function acquireReadableStreamDefaultReader(stream) // This method implements the latest https://streams.spec.whatwg.org/#readable-stream-pipe-to. function readableStreamPipeToWritableStream(source, destination, preventClose, preventAbort, preventCancel, signal) { + "use strict"; + @assert(@isReadableStream(source)); @assert(@isWritableStream(destination)); @assert(!@isReadableStreamLocked(source)); @@ -217,6 +239,7 @@ function readableStreamPipeToWritableStream(source, destination, preventClose, p function pipeToLoop(pipeState) { + "use strict"; if (pipeState.shuttingDown) return; @@ -228,6 +251,7 @@ function pipeToLoop(pipeState) function pipeToDoReadWrite(pipeState) { + "use strict"; @assert(!pipeState.shuttingDown); pipeState.pendingReadPromiseCapability = @newPromiseCapability(@Promise); @@ -275,6 +299,7 @@ function pipeToErrorsMustBePropagatedForward(pipeState) function pipeToErrorsMustBePropagatedBackward(pipeState) { + "use strict"; const action = () => { const error = @getByIdDirectPrivate(pipeState.destination, "storedError"); if (!pipeState.preventCancel) { @@ -325,6 +350,8 @@ function pipeToClosingMustBePropagatedBackward(pipeState) function pipeToShutdownWithAction(pipeState, action) { + "use strict"; + if (pipeState.shuttingDown) return; @@ -356,6 +383,8 @@ function pipeToShutdownWithAction(pipeState, action) function pipeToShutdown(pipeState) { + "use strict"; + if (pipeState.shuttingDown) return; @@ -381,6 +410,8 @@ function pipeToShutdown(pipeState) function pipeToFinalize(pipeState) { + "use strict"; + @writableStreamDefaultWriterRelease(pipeState.writer); @readableStreamReaderGenericRelease(pipeState.reader); @@ -570,7 +601,7 @@ function readableStreamDefaultControllerShouldCallPull(controller) if (!@readableStreamDefaultControllerCanCloseOrEnqueue(controller)) return false; - if (!@getByIdDirectPrivate(controller, "started")) + if (!(@getByIdDirectPrivate(controller, "started") > 0)) return false; if ((!@isReadableStreamLocked(stream) || !@getByIdDirectPrivate(@getByIdDirectPrivate(stream, "reader"), "readRequests")?.isNotEmpty()) && @readableStreamDefaultControllerGetDesiredSize(controller) <= 0) return false; @@ -588,7 +619,7 @@ function readableStreamDefaultControllerCallPullIfNeeded(controller) if (!@readableStreamDefaultControllerCanCloseOrEnqueue(controller)) return; - if (!@getByIdDirectPrivate(controller, "started")) + if (!(@getByIdDirectPrivate(controller, "started") > 0)) return; if ((!@isReadableStreamLocked(stream) || !@getByIdDirectPrivate(@getByIdDirectPrivate(stream, "reader"), "readRequests")?.isNotEmpty()) && @readableStreamDefaultControllerGetDesiredSize(controller) <= 0) return; @@ -656,7 +687,9 @@ function readableStreamCancel(stream, reason) if (state === @streamErrored) return @Promise.@reject(@getByIdDirectPrivate(stream, "storedError")); @readableStreamClose(stream); - return @getByIdDirectPrivate(stream, "readableStreamController").@cancel(@getByIdDirectPrivate(stream, "readableStreamController"), reason).@then(function() { }); + + var controller = @getByIdDirectPrivate(stream, "readableStreamController"); + return controller.@cancel(controller, reason).@then(function() { }); } function readableStreamDefaultControllerCancel(controller, reason) |