aboutsummaryrefslogtreecommitdiff
path: root/src/javascript/jsc/bindings/builtins/js/ReadableStreamInternals.js
diff options
context:
space:
mode:
authorGravatar Jarred Sumner <709451+Jarred-Sumner@users.noreply.github.com> 2022-06-15 02:00:45 -0700
committerGravatar Jarred Sumner <709451+Jarred-Sumner@users.noreply.github.com> 2022-06-15 02:00:45 -0700
commitd93f09331394148441d142930fea236a9fd73c5c (patch)
treea1838e49a1ac0d94cfcc13d8a954972cd51677e5 /src/javascript/jsc/bindings/builtins/js/ReadableStreamInternals.js
parentdbde52504d50b8d5bb3e071ce4b5a959c28eb760 (diff)
downloadbun-d93f09331394148441d142930fea236a9fd73c5c.tar.gz
bun-d93f09331394148441d142930fea236a9fd73c5c.tar.zst
bun-d93f09331394148441d142930fea236a9fd73c5c.zip
wip direct streams
Diffstat (limited to 'src/javascript/jsc/bindings/builtins/js/ReadableStreamInternals.js')
-rw-r--r--src/javascript/jsc/bindings/builtins/js/ReadableStreamInternals.js53
1 files changed, 43 insertions, 10 deletions
diff --git a/src/javascript/jsc/bindings/builtins/js/ReadableStreamInternals.js b/src/javascript/jsc/bindings/builtins/js/ReadableStreamInternals.js
index 45611a0e4..5d0779745 100644
--- a/src/javascript/jsc/bindings/builtins/js/ReadableStreamInternals.js
+++ b/src/javascript/jsc/bindings/builtins/js/ReadableStreamInternals.js
@@ -58,13 +58,11 @@ function privateInitializeReadableStreamDefaultController(stream, underlyingSour
@putByIdDirectPrivate(this, "controlledReadableStream", stream);
@putByIdDirectPrivate(this, "underlyingSource", underlyingSource);
@putByIdDirectPrivate(this, "queue", @newQueue());
- @putByIdDirectPrivate(this, "started", false);
+ @putByIdDirectPrivate(this, "started", -1);
@putByIdDirectPrivate(this, "closeRequested", false);
@putByIdDirectPrivate(this, "pullAgain", false);
@putByIdDirectPrivate(this, "pulling", false);
@putByIdDirectPrivate(this, "strategy", @validateAndNormalizeQueuingStrategy(size, highWaterMark));
-
-
return this;
}
@@ -77,7 +75,7 @@ function setupReadableStreamDefaultController(stream, underlyingSource, size, hi
"use strict";
const controller = new @ReadableStreamDefaultController(stream, underlyingSource, size, highWaterMark, @isReadableStream);
- const startAlgorithm = () => @promiseInvokeOrNoopMethodNoCatch(underlyingSource, startMethod, [controller]);
+
const pullAlgorithm = () => @promiseInvokeOrNoopMethod(underlyingSource, pullMethod, [controller]);
const cancelAlgorithm = (reason) => @promiseInvokeOrNoopMethod(underlyingSource, cancelMethod, [reason]);
@@ -87,12 +85,25 @@ function setupReadableStreamDefaultController(stream, underlyingSource, size, hi
@putByIdDirectPrivate(controller, "cancel", @readableStreamDefaultControllerCancel);
@putByIdDirectPrivate(stream, "readableStreamController", controller);
- startAlgorithm().@then(() => {
- @putByIdDirectPrivate(controller, "started", true);
+ if (@getByIdDirectPrivate(controller, "sink") === @undefined) {
+ @readableStreamDefaultControllerStart(controller);
+ }
+
+}
+
+function readableStreamDefaultControllerStart(controller) {
+ if (@getByIdDirectPrivate(controller, "started") !== -1)
+ return;
+
+ const underlyingSource = @getByIdDirectPrivate(controller, "underlyingSource");
+ const startMethod = underlyingSource.start;
+ @putByIdDirectPrivate(controller, "started", 0);
+
+ return @promiseInvokeOrNoopMethodNoCatch(underlyingSource, startMethod, [controller]).@then(() => {
+ @putByIdDirectPrivate(controller, "started", 1);
@assert(!@getByIdDirectPrivate(controller, "pulling"));
@assert(!@getByIdDirectPrivate(controller, "pullAgain"));
@readableStreamDefaultControllerCallPullIfNeeded(controller);
-
}, (error) => {
@readableStreamDefaultControllerError(controller, error);
});
@@ -106,6 +117,7 @@ function readableStreamDefaultControllerError(controller, error)
if (@getByIdDirectPrivate(stream, "state") !== @streamReadable)
return;
@putByIdDirectPrivate(controller, "queue", @newQueue());
+
@readableStreamError(stream, error);
}
@@ -138,8 +150,16 @@ function readableStreamPipeTo(stream, sink)
doPipe();
}
+
+
function acquireReadableStreamDefaultReader(stream)
{
+ "use strict";
+ var start = @getByIdDirectPrivate(stream, "start");
+ if (start) {
+ start();
+ }
+
return new @ReadableStreamDefaultReader(stream);
}
@@ -147,6 +167,8 @@ function acquireReadableStreamDefaultReader(stream)
// This method implements the latest https://streams.spec.whatwg.org/#readable-stream-pipe-to.
function readableStreamPipeToWritableStream(source, destination, preventClose, preventAbort, preventCancel, signal)
{
+ "use strict";
+
@assert(@isReadableStream(source));
@assert(@isWritableStream(destination));
@assert(!@isReadableStreamLocked(source));
@@ -217,6 +239,7 @@ function readableStreamPipeToWritableStream(source, destination, preventClose, p
function pipeToLoop(pipeState)
{
+ "use strict";
if (pipeState.shuttingDown)
return;
@@ -228,6 +251,7 @@ function pipeToLoop(pipeState)
function pipeToDoReadWrite(pipeState)
{
+ "use strict";
@assert(!pipeState.shuttingDown);
pipeState.pendingReadPromiseCapability = @newPromiseCapability(@Promise);
@@ -275,6 +299,7 @@ function pipeToErrorsMustBePropagatedForward(pipeState)
function pipeToErrorsMustBePropagatedBackward(pipeState)
{
+ "use strict";
const action = () => {
const error = @getByIdDirectPrivate(pipeState.destination, "storedError");
if (!pipeState.preventCancel) {
@@ -325,6 +350,8 @@ function pipeToClosingMustBePropagatedBackward(pipeState)
function pipeToShutdownWithAction(pipeState, action)
{
+ "use strict";
+
if (pipeState.shuttingDown)
return;
@@ -356,6 +383,8 @@ function pipeToShutdownWithAction(pipeState, action)
function pipeToShutdown(pipeState)
{
+ "use strict";
+
if (pipeState.shuttingDown)
return;
@@ -381,6 +410,8 @@ function pipeToShutdown(pipeState)
function pipeToFinalize(pipeState)
{
+ "use strict";
+
@writableStreamDefaultWriterRelease(pipeState.writer);
@readableStreamReaderGenericRelease(pipeState.reader);
@@ -570,7 +601,7 @@ function readableStreamDefaultControllerShouldCallPull(controller)
if (!@readableStreamDefaultControllerCanCloseOrEnqueue(controller))
return false;
- if (!@getByIdDirectPrivate(controller, "started"))
+ if (!(@getByIdDirectPrivate(controller, "started") > 0))
return false;
if ((!@isReadableStreamLocked(stream) || !@getByIdDirectPrivate(@getByIdDirectPrivate(stream, "reader"), "readRequests")?.isNotEmpty()) && @readableStreamDefaultControllerGetDesiredSize(controller) <= 0)
return false;
@@ -588,7 +619,7 @@ function readableStreamDefaultControllerCallPullIfNeeded(controller)
if (!@readableStreamDefaultControllerCanCloseOrEnqueue(controller))
return;
- if (!@getByIdDirectPrivate(controller, "started"))
+ if (!(@getByIdDirectPrivate(controller, "started") > 0))
return;
if ((!@isReadableStreamLocked(stream) || !@getByIdDirectPrivate(@getByIdDirectPrivate(stream, "reader"), "readRequests")?.isNotEmpty()) && @readableStreamDefaultControllerGetDesiredSize(controller) <= 0)
return;
@@ -656,7 +687,9 @@ function readableStreamCancel(stream, reason)
if (state === @streamErrored)
return @Promise.@reject(@getByIdDirectPrivate(stream, "storedError"));
@readableStreamClose(stream);
- return @getByIdDirectPrivate(stream, "readableStreamController").@cancel(@getByIdDirectPrivate(stream, "readableStreamController"), reason).@then(function() { });
+
+ var controller = @getByIdDirectPrivate(stream, "readableStreamController");
+ return controller.@cancel(controller, reason).@then(function() { });
}
function readableStreamDefaultControllerCancel(controller, reason)