aboutsummaryrefslogtreecommitdiff
path: root/src/javascript/jsc/bindings/builtins/js/ReadableStreamInternals.js
diff options
context:
space:
mode:
Diffstat (limited to 'src/javascript/jsc/bindings/builtins/js/ReadableStreamInternals.js')
-rw-r--r--src/javascript/jsc/bindings/builtins/js/ReadableStreamInternals.js500
1 files changed, 454 insertions, 46 deletions
diff --git a/src/javascript/jsc/bindings/builtins/js/ReadableStreamInternals.js b/src/javascript/jsc/bindings/builtins/js/ReadableStreamInternals.js
index 5d0779745..d2dfd5137 100644
--- a/src/javascript/jsc/bindings/builtins/js/ReadableStreamInternals.js
+++ b/src/javascript/jsc/bindings/builtins/js/ReadableStreamInternals.js
@@ -67,48 +67,6 @@ function privateInitializeReadableStreamDefaultController(stream, underlyingSour
return this;
}
-
-// https://streams.spec.whatwg.org/#set-up-readable-stream-default-controller, starting from step 6.
-// The other part is implemented in privateInitializeReadableStreamDefaultController.
-function setupReadableStreamDefaultController(stream, underlyingSource, size, highWaterMark, startMethod, pullMethod, cancelMethod)
-{
- "use strict";
-
- const controller = new @ReadableStreamDefaultController(stream, underlyingSource, size, highWaterMark, @isReadableStream);
-
- const pullAlgorithm = () => @promiseInvokeOrNoopMethod(underlyingSource, pullMethod, [controller]);
- const cancelAlgorithm = (reason) => @promiseInvokeOrNoopMethod(underlyingSource, cancelMethod, [reason]);
-
- @putByIdDirectPrivate(controller, "pullAlgorithm", pullAlgorithm);
- @putByIdDirectPrivate(controller, "cancelAlgorithm", cancelAlgorithm);
- @putByIdDirectPrivate(controller, "pull", @readableStreamDefaultControllerPull);
- @putByIdDirectPrivate(controller, "cancel", @readableStreamDefaultControllerCancel);
- @putByIdDirectPrivate(stream, "readableStreamController", controller);
-
- if (@getByIdDirectPrivate(controller, "sink") === @undefined) {
- @readableStreamDefaultControllerStart(controller);
- }
-
-}
-
-function readableStreamDefaultControllerStart(controller) {
- if (@getByIdDirectPrivate(controller, "started") !== -1)
- return;
-
- const underlyingSource = @getByIdDirectPrivate(controller, "underlyingSource");
- const startMethod = underlyingSource.start;
- @putByIdDirectPrivate(controller, "started", 0);
-
- return @promiseInvokeOrNoopMethodNoCatch(underlyingSource, startMethod, [controller]).@then(() => {
- @putByIdDirectPrivate(controller, "started", 1);
- @assert(!@getByIdDirectPrivate(controller, "pulling"));
- @assert(!@getByIdDirectPrivate(controller, "pullAgain"));
- @readableStreamDefaultControllerCallPullIfNeeded(controller);
- }, (error) => {
- @readableStreamDefaultControllerError(controller, error);
- });
-}
-
function readableStreamDefaultControllerError(controller, error)
{
"use strict";
@@ -157,12 +115,88 @@ function acquireReadableStreamDefaultReader(stream)
"use strict";
var start = @getByIdDirectPrivate(stream, "start");
if (start) {
- start();
+ start.@call(stream);
}
return new @ReadableStreamDefaultReader(stream);
}
+// https://streams.spec.whatwg.org/#set-up-readable-stream-default-controller, starting from step 6.
+// The other part is implemented in privateInitializeReadableStreamDefaultController.
+function setupReadableStreamDefaultController(stream, underlyingSource, size, highWaterMark, startMethod, pullMethod, cancelMethod)
+{
+ "use strict";
+
+ const controller = new @ReadableStreamDefaultController(stream, underlyingSource, size, highWaterMark, @isReadableStream);
+
+ const pullAlgorithm = () => @promiseInvokeOrNoopMethod(underlyingSource, pullMethod, [controller]);
+ const cancelAlgorithm = (reason) => @promiseInvokeOrNoopMethod(underlyingSource, cancelMethod, [reason]);
+
+ @putByIdDirectPrivate(controller, "pullAlgorithm", pullAlgorithm);
+ @putByIdDirectPrivate(controller, "cancelAlgorithm", cancelAlgorithm);
+ @putByIdDirectPrivate(controller, "pull", @readableStreamDefaultControllerPull);
+ @putByIdDirectPrivate(controller, "cancel", @readableStreamDefaultControllerCancel);
+ @putByIdDirectPrivate(stream, "readableStreamController", controller);
+
+ @readableStreamDefaultControllerStart(controller);
+
+}
+
+
+function createReadableStreamController(underlyingSource, strategy, fromLazy) {
+ if (fromLazy) {
+ @putByIdDirectPrivate(this, "start", @undefined);
+ }
+
+ const type = underlyingSource.type;
+ const typeString = @toString(type);
+
+ if (typeString === "bytes") {
+ // if (!@readableByteStreamAPIEnabled())
+ // @throwTypeError("ReadableByteStreamController is not implemented");
+
+ if (strategy.highWaterMark === @undefined)
+ strategy.highWaterMark = 0;
+ if (strategy.size !== @undefined)
+ @throwRangeError("Strategy for a ReadableByteStreamController cannot have a size");
+
+ @putByIdDirectPrivate(this, "readableStreamController", new @ReadableByteStreamController(this, underlyingSource, strategy.highWaterMark, @isReadableStream));
+ } else if (typeString === "direct") {
+ var highWaterMark = strategy?.highWaterMark;
+ @initializeArrayBufferStream.@call(this, underlyingSource, highWaterMark);
+ } else if (type === @undefined) {
+ if (strategy.highWaterMark === @undefined)
+ strategy.highWaterMark = 1;
+
+ @setupReadableStreamDefaultController(this, underlyingSource, strategy.size, strategy.highWaterMark, underlyingSource.start, underlyingSource.pull, underlyingSource.cancel);
+ } else
+ @throwRangeError("Invalid type for underlying source");
+
+}
+
+function readableStreamDefaultControllerStart(controller) {
+ "use strict";
+
+
+
+ if (@getByIdDirectPrivate(controller, "started") !== -1)
+ return;
+
+ const underlyingSource = @getByIdDirectPrivate(controller, "underlyingSource");
+ const startMethod = underlyingSource.start;
+ @putByIdDirectPrivate(controller, "started", 0);
+
+ @promiseInvokeOrNoopMethodNoCatch(underlyingSource, startMethod, [controller]).@then(() => {
+ @putByIdDirectPrivate(controller, "started", 1);
+ @assert(!@getByIdDirectPrivate(controller, "pulling"));
+ @assert(!@getByIdDirectPrivate(controller, "pullAgain"));
+ @readableStreamDefaultControllerCallPullIfNeeded(controller);
+ }, (error) => {
+ @readableStreamDefaultControllerError(controller, error);
+ });
+}
+
+
// FIXME: Replace readableStreamPipeTo by below function.
// This method implements the latest https://streams.spec.whatwg.org/#readable-stream-pipe-to.
function readableStreamPipeToWritableStream(source, destination, preventClose, preventAbort, preventCancel, signal)
@@ -563,6 +597,281 @@ function isReadableStreamDefaultController(controller)
return @isObject(controller) && !!@getByIdDirectPrivate(controller, "underlyingSource");
}
+
+@globalPrivate
+function assignDirectStream() {
+ "use strict";
+
+ var stream = this;
+}
+
+
+function handleDirectStreamError(e) {
+ "use strict";
+
+ var controller = this;
+ var sink = controller.@sink;
+ if (sink) {
+ @putByIdDirectPrivate(controller, "sink", @undefined);
+ try {
+ sink.close(e);
+ } catch (f) {}
+ }
+
+ this.error = this.drain = this.write = this.close = this.end = @onReadableStreamDirectControllerClosed;
+
+ if (typeof this.@underlyingSource.close === 'function') {
+ try {
+ this.@underlyingSource.close.@call(this.@underlyingSource, e);
+ } catch (e) {
+ }
+ }
+
+ try {
+ var pend = controller._pendingRead;
+ if (pend) {
+ controller._pendingRead = @undefined;
+ @rejectPromise(pend, e);
+ }
+ } catch (f) {}
+ var stream = controller.@controlledReadableStream;
+ if (stream) @readableStreamError(stream, e);
+}
+
+function handleDirectStreamErrorReject(e) {
+ @handleDirectStreamError.@call(this, e);
+ return @Promise.@reject(e);
+}
+
+function onPullDirectStream(controller)
+{
+
+ "use strict";
+
+ var stream = controller.@controlledReadableStream;
+ if (!stream || @getByIdDirectPrivate(stream, "state") !== @streamReadable)
+ return;
+
+ // pull is in progress
+ // this is a recursive call
+ // ignore it
+ if (controller._deferClose === -1) {
+ return;
+ }
+
+
+ controller._deferClose = -1;
+ controller._deferDrain = -1;
+ var deferClose;
+ var deferDrain;
+
+ // Direct streams allow @pull to be called multiple times, unlike the spec.
+ // Backpressure is handled by the destination, not by the underlying source.
+ // In this case, we rely on the heuristic that repeatedly draining in the same tick
+ // is bad for performance
+ // this code is only run when consuming a direct stream from JS
+ // without the HTTP server or anything else
+ try {
+ var result = controller.@underlyingSource.pull(
+ controller,
+ );
+
+ if (result && @isPromise(result)) {
+ if (controller._handleError === @undefined) {
+ controller._handleError = @handleDirectStreamErrorReject.bind(controller);
+ }
+
+ @Promise.prototype.catch.@call(result, controller._handleError);
+ }
+ } catch(e) {
+ return @handleDirectStreamErrorReject.@call(controller, e);
+ } finally {
+ deferClose = controller._deferClose;
+ deferDrain = controller._deferDrain;
+ controller._deferDrain = controller._deferClose = 0;
+ }
+
+
+ var promiseToReturn;
+
+
+ if (controller._pendingRead === @undefined) {
+ controller._pendingRead = promiseToReturn = @newPromise();
+ } else {
+ promiseToReturn = @readableStreamAddReadRequest(stream);
+ }
+
+
+ // they called close during @pull()
+ // we delay that
+ if (deferClose === 1) {
+ var reason = controller._deferCloseReason;
+ controller._deferCloseReason = @undefined;
+ @onCloseDirectStream.@call(controller, reason);
+ return promiseToReturn;
+ }
+
+ // not done, but they called drain()
+ if (deferDrain === 1) {
+ @onDrainDirectStream.@call(controller);
+ }
+
+
+ return promiseToReturn;
+}
+
+function noopDoneFunction() {
+ return @Promise.@resolve({value: @undefined, done: true});
+}
+
+function onReadableStreamDirectControllerClosed(reason)
+{
+ "use strict";
+ @throwTypeError("ReadableStreamDirectController is now closed");
+}
+
+function onCloseDirectStream(reason)
+{
+ "use strict";
+ var stream = this.@controlledReadableStream;
+ if (!stream || @getByIdDirectPrivate(stream, "state") !== @streamReadable)
+ return;
+
+ if (this._deferClose !== 0) {
+ this._deferClose = 1;
+ this._deferCloseReason = reason;
+ return;
+ }
+
+ @putByIdDirectPrivate(stream, "state", @streamClosing);
+ if (typeof this.@underlyingSource.close === 'function') {
+ try {
+ this.@underlyingSource.close.@call(this.@underlyingSource, reason);
+ } catch (e) {
+
+ }
+ }
+
+ var drained;
+ try {
+ drained = this.@sink.end();
+ @putByIdDirectPrivate(this, "sink", @undefined);
+ } catch (e) {
+ if (this._pendingRead) {
+ var read = this._pendingRead;
+ this._pendingRead = @undefined;
+ @rejectPromise(read, e);
+ }
+ @readableStreamError(stream, e);
+ return;
+ }
+
+ this.error = this.drain = this.write = this.close = this.end = @onReadableStreamDirectControllerClosed;
+
+ var reader = @getByIdDirectPrivate(stream, "reader");
+
+ if (reader && @isReadableStreamDefaultReader(reader)) {
+ var _pendingRead = this._pendingRead;
+ if (_pendingRead && @isPromise(_pendingRead) && drained?.byteLength) {
+ this._pendingRead = @undefined;
+ @fulfillPromise(_pendingRead, {value: drained, done: false});
+ @readableStreamClose(stream);
+ return;
+ }
+ }
+
+ if (drained?.byteLength) {
+ var requests = @getByIdDirectPrivate(reader, "readRequests");
+ if (requests?.isNotEmpty()) {
+ @readableStreamFulfillReadRequest(stream, drained, false);
+ @readableStreamClose(stream);
+ return;
+ }
+
+ @putByIdDirectPrivate(stream, "state", @streamReadable);
+ this.@pull = () => {
+ var thisResult = @createFulfilledPromise({value: drained, done: false});
+ drained = @undefined;
+ @readableStreamClose(stream);
+ stream = @undefined;
+ return thisResult;
+ };
+ } else if (this._pendingRead) {
+ var read = this._pendingRead;
+ this._pendingRead = @undefined;
+ @putByIdDirectPrivate(this, "pull", @noopDoneFunction);
+ @fulfillPromise(read, {value: @undefined, done: true});
+ }
+
+ @readableStreamClose(stream);
+}
+
+function onDrainDirectStream()
+{
+ "use strict";
+
+ var stream = this.@controlledReadableStream;
+ var reader = @getByIdDirectPrivate(stream, "reader");
+ if (!reader || !@isReadableStreamDefaultReader(reader)) {
+ return;
+ }
+
+ var _pendingRead = this._pendingRead;
+ this._pendingRead = @undefined;
+ if (_pendingRead && @isPromise(_pendingRead)) {
+ var drained = this.@sink.drain();
+ if (drained?.byteLength) {
+ this._pendingRead = @getByIdDirectPrivate(stream, "readRequests")?.shift();
+ @fulfillPromise(_pendingRead, {value: drained, done: false});
+ } else {
+ this._pendingRead = _pendingRead;
+ }
+ } else if (@getByIdDirectPrivate(stream, "readRequests")?.isNotEmpty()) {
+ var drained = this.@sink.drain();
+ if (drained?.byteLength) {
+ @readableStreamFulfillReadRequest(stream, drained, false);
+ }
+ } else if (this._deferDrain === -1) {
+ this._deferDrain = 1;
+ }
+
+}
+
+function initializeArrayBufferStream(underlyingSource, highWaterMark)
+{
+ "use strict";
+
+ // This is the fallback implementation for direct streams
+ // When we don't know what the destination type is
+ // We assume it is a Uint8Array.
+
+ var opts = highWaterMark ? {highWaterMark, stream: true, asUint8Array: true} : {stream: true, asUint8Array: true};
+ var sink = new globalThis.Bun.ArrayBufferSink();
+ sink.start(opts);
+
+ var controller = {
+ @underlyingSource: underlyingSource,
+ @pull: @onPullDirectStream,
+ @controlledReadableStream: this,
+ @sink: sink,
+ close: @onCloseDirectStream,
+ write: sink.write.bind(sink),
+ error: @handleDirectStreamError,
+ end: @onCloseDirectStream,
+ @close: @onCloseDirectStream,
+ drain: @onDrainDirectStream,
+ _pendingRead: @undefined,
+ _deferClose: 0,
+ _deferDrain: 0,
+ _deferCloseReason: @undefined,
+ _handleError: @undefined,
+ };
+
+
+ @putByIdDirectPrivate(this, "readableStreamController", controller);
+
+}
+
function readableStreamError(stream, error)
{
"use strict";
@@ -597,11 +906,13 @@ function readableStreamError(stream, error)
function readableStreamDefaultControllerShouldCallPull(controller)
{
- const stream = @getByIdDirectPrivate(controller, "controlledReadableStream");
+ "use strict";
+ const stream = @getByIdDirectPrivate(controller, "controlledReadableStream");
+
if (!@readableStreamDefaultControllerCanCloseOrEnqueue(controller))
return false;
- if (!(@getByIdDirectPrivate(controller, "started") > 0))
+ if (!(@getByIdDirectPrivate(controller, "started") === 1))
return false;
if ((!@isReadableStreamLocked(stream) || !@getByIdDirectPrivate(@getByIdDirectPrivate(stream, "reader"), "readRequests")?.isNotEmpty()) && @readableStreamDefaultControllerGetDesiredSize(controller) <= 0)
return false;
@@ -619,7 +930,7 @@ function readableStreamDefaultControllerCallPullIfNeeded(controller)
if (!@readableStreamDefaultControllerCanCloseOrEnqueue(controller))
return;
- if (!(@getByIdDirectPrivate(controller, "started") > 0))
+ if (!(@getByIdDirectPrivate(controller, "started") === 1))
return;
if ((!@isReadableStreamLocked(stream) || !@getByIdDirectPrivate(@getByIdDirectPrivate(stream, "reader"), "readRequests")?.isNotEmpty()) && @readableStreamDefaultControllerGetDesiredSize(controller) <= 0)
return;
@@ -629,6 +940,7 @@ function readableStreamDefaultControllerCallPullIfNeeded(controller)
return;
}
+
@assert(!@getByIdDirectPrivate(controller, "pullAgain"));
@putByIdDirectPrivate(controller, "pulling", true);
@@ -636,6 +948,7 @@ function readableStreamDefaultControllerCallPullIfNeeded(controller)
@putByIdDirectPrivate(controller, "pulling", false);
if (@getByIdDirectPrivate(controller, "pullAgain")) {
@putByIdDirectPrivate(controller, "pullAgain", false);
+
@readableStreamDefaultControllerCallPullIfNeeded(controller);
}
}, function(error) {
@@ -849,3 +1162,98 @@ function readableStreamDefaultControllerCanCloseOrEnqueue(controller)
return !@getByIdDirectPrivate(controller, "closeRequested") && @getByIdDirectPrivate(@getByIdDirectPrivate(controller, "controlledReadableStream"), "state") === @streamReadable;
}
+
+
+function lazyLoadStream(stream, autoAllocateChunkSize) {
+ "use strict";
+
+ @putByIdDirectPrivate(stream, "start", @undefined);
+ var bunNativeType = @getByIdDirectPrivate(stream, "bunNativeType");
+ var bunNativePtr = @getByIdDirectPrivate(stream, "bunNativePtr");
+
+ var cached = globalThis[globalThis.Symbol.for("Bun.nativeReadableStreamPrototype")] ||= new @Map;
+ var Prototype = cached.@get(nativeType);
+ if (Prototype === @undefined) {
+ var [pull, start, cancel, setClose, deinit] = globalThis[globalThis.Symbol.for("Bun.lazy")](nativeType);
+ var closer = [false];
+ var handleResult;
+ function handleNativeReadableStreamPromiseResult(val) {
+ "use strict";
+ var {c, v} = this;
+ this.c = @undefined;
+ this.v = @undefined;
+ handleResult(val, c, v);
+ }
+
+ handleResult = function handleResult(result, controller, view) {
+ "use strict";
+
+ if (result && @isPromise(result)) {
+ return result.then(handleNativeReadableStreamPromiseResult.bind({c: controller, v: view}), (err) => controller.error(err));
+ } else if (result !== false) {
+ if (view && view.byteLength === result) {
+ controller.byobRequest.respondWithNewView(view);
+ } else {
+ controller.byobRequest.respond(result);
+ }
+ }
+
+ if (closer[0] || result === false) {
+ @enqueueJob(() => controller.close());
+ closer[0] = false;
+ }
+ };
+
+ Prototype = class NativeReadableStreamSource {
+ constructor(tag, autoAllocateChunkSize) {
+ this.pull = this.pull_.bind(tag);
+ this.cancel = this.cancel_.bind(tag);
+ this.autoAllocateChunkSize = autoAllocateChunkSize;
+ }
+
+ pull;
+ cancel;
+
+ type = "bytes";
+ autoAllocateChunkSize = 0;
+
+ static startSync = start;
+
+ pull_(controller) {
+ closer[0] = false;
+ var result;
+
+ const view = controller.byobRequest.view;
+ try {
+ result = pull(this, view, closer);
+ } catch(err) {
+ return controller.error(err);
+ }
+
+ return handleResult(result, controller, view);
+ }
+
+ cancel_(reason) {
+ cancel(this, reason);
+ }
+
+ static registry = new FinalizationRegistry(deinit);
+ }
+ cached.@set(nativeType, Prototype);
+ }
+
+ // either returns the chunk size
+ // or throws an error
+ // should never return a Promise
+ const chunkSize = Prototype.startSync(nativePtr, autoAllocateChunkSize);
+
+ // empty file, no need for native back-and-forth on this
+ if (chunkSize === 0) {
+ @readableStreamClose(stream);
+ return;
+ }
+
+ var instance = new Prototype(nativePtr, chunkSize);
+ Prototype.registry.register(instance, nativePtr);
+ @createReadableStreamController.@call(stream, instance, @undefined, true);
+} \ No newline at end of file