aboutsummaryrefslogtreecommitdiff
path: root/src/javascript/jsc/bindings/builtins/js/ReadableStream.js
diff options
context:
space:
mode:
Diffstat (limited to 'src/javascript/jsc/bindings/builtins/js/ReadableStream.js')
-rw-r--r--src/javascript/jsc/bindings/builtins/js/ReadableStream.js444
1 files changed, 21 insertions, 423 deletions
diff --git a/src/javascript/jsc/bindings/builtins/js/ReadableStream.js b/src/javascript/jsc/bindings/builtins/js/ReadableStream.js
index cf9eba8b6..cfe68d13c 100644
--- a/src/javascript/jsc/bindings/builtins/js/ReadableStream.js
+++ b/src/javascript/jsc/bindings/builtins/js/ReadableStream.js
@@ -52,38 +52,27 @@ function initializeReadableStream(underlyingSource, strategy)
const isDirect = underlyingSource.type === "direct";
// direct streams are always lazy
- const isLazy = isDirect || !!underlyingSource.@lazy;
+ const isUnderlyingSourceLazy = !!underlyingSource.@lazy;
+ const isLazy = isDirect || isUnderlyingSourceLazy;
-
- @putByIdDirectPrivate(this, "direct", isDirect);
-
- // FIXME: We should introduce https://streams.spec.whatwg.org/#create-readable-stream.
- // For now, we emulate this with underlyingSource with private properties.
- if (@getByIdDirectPrivate(underlyingSource, "pull") !== @undefined) {
- @putByIdDirectPrivate(this, "underlyingSource", @undefined);
+ // // FIXME: We should introduce https://streams.spec.whatwg.org/#create-readable-stream.
+ // // For now, we emulate this with underlyingSource with private properties.
+ if (@getByIdDirectPrivate(underlyingSource, "pull") !== @undefined && !isLazy) {
const size = @getByIdDirectPrivate(strategy, "size");
const highWaterMark = @getByIdDirectPrivate(strategy, "highWaterMark");
@setupReadableStreamDefaultController(this, underlyingSource, size, highWaterMark !== @undefined ? highWaterMark : 1, @getByIdDirectPrivate(underlyingSource, "start"), @getByIdDirectPrivate(underlyingSource, "pull"), @getByIdDirectPrivate(underlyingSource, "cancel"));
return this;
}
-
- if (isLazy) {
-
- if (isDirect) {
- if ("start" in underlyingSource && typeof underlyingSource.start === "function")
- @throwTypeError("\"start\" for direct streams are not implemented yet");
-
-
- @putByIdDirectPrivate(this, "underlyingSource", underlyingSource);
- @putByIdDirectPrivate(this, "start", () => @createReadableStreamController(this, underlyingSource, strategy, true));
- } else {
- @putByIdDirectPrivate(this, "underlyingSource", @undefined);
- const autoAllocateChunkSize = underlyingSource.autoAllocateChunkSize;
- @putByIdDirectPrivate(this, "start", () => @lazyLoadStream(this, autoAllocateChunkSize));
- }
+ if (isDirect) {
+ if ("start" in underlyingSource && typeof underlyingSource.start === "function")
+ @throwTypeError("\"start\" for direct streams are not implemented yet");
+
+ @putByIdDirectPrivate(this, "start", () => @createReadableStreamController.@call(this, underlyingSource, strategy, true));
+ } else if (isLazy) {
+ const autoAllocateChunkSize = underlyingSource.autoAllocateChunkSize;
+ @putByIdDirectPrivate(this, "start", () => @lazyLoadStream(this, autoAllocateChunkSize));
} else {
- @putByIdDirectPrivate(this, "underlyingSource", @undefined);
@putByIdDirectPrivate(this, "start", @undefined);
@createReadableStreamController.@call(this, underlyingSource, strategy, false);
}
@@ -92,300 +81,6 @@ function initializeReadableStream(underlyingSource, strategy)
return this;
}
-function handleDirectStreamError(e) {
- "use strict";
-
- var controller = this;
- var sink = controller.@sink;
- if (sink) {
- @putByIdDirectPrivate(controller "sink", @undefined);
- try {
- sink.close(e);
- } catch (f) {}
- }
-
- this.error = this.drain = this.write = this.close = this.end = @onReadableStreamDirectControllerClosed;
-
- if (typeof this.@underlyingSource.close === 'function') {
- try {
- this.@underlyingSource.close.@call(this.@underlyingSource, e);
- } catch (e) {
- }
- }
-
- try {
- var pend = controller._pendingRead;
- if (pend) {
- controller._pendingRead = @undefined;
- @rejectPromise(pend, e);
- }
- } catch (f) {}
- var stream = controller.@controlledReadableStream;
- if (stream) @readableStreamError(stream, e);
-}
-
-function handleDirectStreamErrorReject(e) {
- @handleDirectStreamError.@call(this, e);
- return @Promise.@reject(e);
-}
-
-function onPullDirectStream(controller)
-{
-
- "use strict";
-
- var stream = controller.@controlledReadableStream;
- if (!stream || @getByIdDirectPrivate(stream, "state") !== @streamReadable)
- return;
-
- // pull is in progress
- // this is a recursive call
- // ignore it
- if (controller._deferClose === -1) {
- return;
- }
-
-
- controller._deferClose = -1;
- controller._deferDrain = -1;
- var deferClose;
- var deferDrain;
-
- // Direct streams allow @pull to be called multiple times, unlike the spec.
- // Backpressure is handled by the destination, not by the underlying source.
- // In this case, we rely on the heuristic that repeatedly draining in the same tick
- // is bad for performance
- // this code is only run when consuming a direct stream from JS
- // without the HTTP server or anything else
- try {
- var result = controller.@underlyingSource.@pull(
- controller,
- );
-
- if (result && @isPromise(result)) {
- if (controller._handleError === @undefined) {
- controller._handleError = @handleDirectStreamErrorReject.@bind(controller);
- }
-
- @Promise.prototype.@catch.@call(result, controller._handleError);
- }
- } catch(e) {
- return @handleDirectStreamErrorReject.@call(controller, e);
- } finally {
- deferDrain = controller._deferClose;
- deferClose = controller._deferDrain;
- controller._deferDrain = 0;
- controller._deferClose = 0;
-
- }
-
-
- var promiseToReturn;
-
-
- if (controller._pendingRead === @undefined) {
- controller._pendingRead = promiseToReturn = @newPromise();
- } else {
- promiseToReturn = @readableStreamAddReadRequest(stream);
- }
-
- // they called close during @pull()
- // we delay that
- if (deferClose === 1) {
- var reason = controller._deferCloseReason;
- controller._deferCloseReason = @undefined;
- @onCloseDirectStream.@call(controller, reason);
- return promiseToReturn;
- }
-
- // not done, but they called drain()
- if (deferDrain === 1) {
- @onDrainDirectStream.@call(controller);
- }
-
-
- return promiseToReturn;
-}
-
-function noopDoneFunction() {
- return @Promise.@resolve({value: @undefined, done: true});
-}
-
-function onReadableStreamDirectControllerClosed(reason)
-{
- "use strict";
- @throwTypeError("ReadableStreamDirectController is now closed");
-}
-
-function onCloseDirectStream(reason)
-{
- "use strict";
- var stream = this.@controlledReadableStream;
- if (!stream || @getByIdDirectPrivate(stream, "state") !== @streamReadable)
- return;
-
- if (this._deferClose !== 0) {
- this._deferClose = 1;
- this._deferCloseReason = reason;
- return;
- }
-
- @putByIdDirectPrivate(stream, "state", @streamClosing);
- if (typeof this.@underlyingSource.close === 'function') {
- try {
- this.@underlyingSource.close.@call(this.@underlyingSource, reason);
- } catch (e) {
-
- }
- }
-
- var drained;
- try {
- drained = this.@sink.end();
- @putByIdDirectPrivate(this, "sink", @undefined);
- } catch (e) {
- if (this._pendingRead) {
- var read = this._pendingRead;
- this._pendingRead = @undefined;
- @rejectPromise(read, e);
- }
- @readableStreamError(stream, e);
- return;
- }
-
- this.error = this.drain = this.write = this.close = this.end = @onReadableStreamDirectControllerClosed;
-
- var reader = @getByIdDirectPrivate(stream, "reader");
-
- if (reader && @isReadableStreamDefaultReader(reader)) {
- var _pendingRead = this._pendingRead;
- if (_pendingRead && @isPromise(_pendingRead) && drained?.byteLength) {
- this._pendingRead = @undefined;
- @fulfillPromise(_pendingRead, {value: drained, done: false});
- @readableStreamClose(stream);
- return;
- }
- }
-
- if (drained?.byteLength) {
- var requests = @getByIdDirectPrivate(reader, "readRequests");
- if (requests?.isNotEmpty()) {
- @readableStreamFulfillReadRequest(stream, drained, false);
- @readableStreamClose(stream);
- return;
- }
-
- @putByIdDirectPrivate(stream, "state", @streamReadable);
- this.@pull = () => {
- var thisResult = @createFulfilledPromise({value: drained, done: false});
- drained = @undefined;
- @readableStreamClose(stream);
- stream = @undefined;
- return thisResult;
- };
- } else if (this._pendingRead) {
- var read = this._pendingRead;
- this._pendingRead = @undefined;
- @putByIdDirectPrivate(this, "pull", @noopDoneFunction);
- @fulfillPromise(read, {value: @undefined, done: true});
- }
-
- @readableStreamClose(stream);
-}
-
-function onDrainDirectStream()
-{
- "use strict";
-
- var straem = this.@controlledReadableStream;
- var reader = @getByIdDirectPrivate(stream, "reader");
- if (!reader || !@isReadableStreamDefaultReader(reader)) {
- return;
- }
-
- var _pendingRead = this._pendingRead;
- this._pendingRead = @undefined;
- if (_pendingRead && @isPromise(_pendingRead)) {
- var drained = this.@sink.drain();
- if (drained?.byteLength) {
- this._pendingRead = @getByIdDirectPrivate(stream, "readRequests")?.shift();
- @fulfillPromise(_pendingRead, {value: drained, done: false});
- } else {
- this._pendingRead = _pendingRead;
- }
- } else if (@getByIdDirectPrivate(stream, "readRequests")?.isNotEmpty()) {
- var drained = this.@sink.drain();
- if (drained?.byteLength) {
- @readableStreamFulfillReadRequest(stream, drained, false);
- }
- } else if (this._deferDrain === -1) {
- this._deferDrain = 1;
- }
-
-}
-
-function initializeArrayBufferStream(underlyingSource, highWaterMark)
-{
- "use strict";
-
- // This is the fallback implementation for direct streams
- // When we don't know what the destination type is
- // We assume it is a Uint8Array.
- var sink = new globalThis.Bun.ArrayBufferSink(highWaterMark ? {highWaterMark, stream: true, asUint8Array: true} : {stream: true, asUint8Array: true});
- var controller = {
- @underlyingSource: underlyingSource,
- @pull: @onPullDirectStream,
- @controlledReadableStream: this,
- @sink: sink,
- close: @onCloseDirectStream,
- write: sink.write.@bind(sink),
- end: @onCloseDirectStream,
- drain: @onDrainDirectStream,
- _pendingRead: @undefined,
- _deferClose: 0,
- _deferDrain: 0,
- _deferCloseReason: @undefined,
- _handleError: @undefined,
- };
-
-
- @putByIdDirectPrivate(this, "readableStreamController", controller);
-
-}
-
-function createReadableStreamController(underlyingSource, strategy, fromLazy) {
- if (fromLazy) {
- @putByIdDirectPrivate(this, "start", @undefined);
- }
-
- const type = underlyingSource.type;
- const typeString = @toString(type);
-
- if (typeString === "bytes") {
- // if (!@readableByteStreamAPIEnabled())
- // @throwTypeError("ReadableByteStreamController is not implemented");
-
- if (strategy.highWaterMark === @undefined)
- strategy.highWaterMark = 0;
- if (strategy.size !== @undefined)
- @throwRangeError("Strategy for a ReadableByteStreamController cannot have a size");
-
- @putByIdDirectPrivate(this, "readableStreamController", new @ReadableByteStreamController(this, underlyingSource, strategy.highWaterMark, @isReadableStream));
- } else if (typeString === "direct") {
- if (strategy.size !== @undefined)
- @throwRangeError("Strategy for a ReadableDirectStreamController cannot have a size");
-
- var highWaterMark = strategy.highWaterMark;
- @initializeArrayBufferStream.@call(this, underlyingSource, highWaterMark);
- } else if (type === @undefined) {
- if (strategy.highWaterMark === @undefined)
- strategy.highWaterMark = 1;
-
- @setupReadableStreamDefaultController(this, underlyingSource, strategy.size, strategy.highWaterMark, underlyingSource.start, underlyingSource.pull, underlyingSource.cancel);
- } else
- @throwRangeError("Invalid type for underlying source");
-
-}
@globalPrivate
function readableStreamToArray(stream) {
@@ -516,14 +211,14 @@ function readableStreamToArrayPublic(stream) {
@globalPrivate
function consumeReadableStream(nativePtr, nativeType, inputStream) {
"use strict";
- const symbol = Symbol.for("Bun.consumeReadableStreamPrototype");
+ const symbol = globalThis.Symbol.for("Bun.consumeReadableStreamPrototype");
var cached = globalThis[symbol];
if (!cached) {
cached = globalThis[symbol] = [];
}
var Prototype = cached[nativeType];
if (Prototype === @undefined) {
- var [doRead, doError, doReadMany, doClose, onClose, deinit] = globalThis[Symbol.for("Bun.lazy")](nativeType);
+ var [doRead, doError, doReadMany, doClose, onClose, deinit] = globalThis[globalThis.Symbol.for("Bun.lazy")](nativeType);
Prototype = class NativeReadableStreamSink {
constructor(reader, ptr) {
@@ -639,100 +334,6 @@ function createEmptyReadableStream() {
return stream;
}
-function lazyLoadStream(stream, autoAllocateChunkSize) {
- "use strict";
-
- @putByIdDirectPrivate(stream, "start", @undefined);
- var bunNativeType = @getByIdDirectPrivate(stream, "bunNativeType");
- var bunNativePtr = @getByIdDirectPrivate(stream, "bunNativePtr");
-
- var cached = globalThis[Symbol.for("Bun.nativeReadableStreamPrototype")] ||= new @Map;
- var Prototype = cached.@get(nativeType);
- if (Prototype === @undefined) {
- var [pull, start, cancel, setClose, deinit] = globalThis[Symbol.for("Bun.lazy")](nativeType);
- var closer = [false];
- var handleResult;
- function handleNativeReadableStreamPromiseResult(val) {
- "use strict";
- var {c, v} = this;
- this.c = @undefined;
- this.v = @undefined;
- handleResult(val, c, v);
- }
-
- handleResult = function handleResult(result, controller, view) {
- "use strict";
-
- if (result && @isPromise(result)) {
- return result.then(handleNativeReadableStreamPromiseResult.bind({c: controller, v: view}), (err) => controller.error(err));
- } else if (result !== false) {
- if (view && view.byteLength === result) {
- controller.byobRequest.respondWithNewView(view);
- } else {
- controller.byobRequest.respond(result);
- }
- }
-
- if (closer[0] || result === false) {
- @enqueueJob(() => controller.close());
- closer[0] = false;
- }
- };
-
- Prototype = class NativeReadableStreamSource {
- constructor(tag, autoAllocateChunkSize) {
- this.pull = this.pull_.bind(tag);
- this.cancel = this.cancel_.bind(tag);
- this.autoAllocateChunkSize = autoAllocateChunkSize;
- }
-
- pull;
- cancel;
-
- type = "bytes";
- autoAllocateChunkSize = 0;
-
- static startSync = start;
-
- pull_(controller) {
- closer[0] = false;
- var result;
-
- const view = controller.byobRequest.view;
- try {
- result = pull(this, view, closer);
- } catch(err) {
- return controller.error(err);
- }
-
- return handleResult(result, controller, view);
- }
-
- cancel_(reason) {
- cancel(this, reason);
- }
-
- static registry = new FinalizationRegistry(deinit);
- }
- cached.@set(nativeType, Prototype);
- }
-
- // either returns the chunk size
- // or throws an error
- // should never return a Promise
- const chunkSize = Prototype.startSync(nativePtr, autoAllocateChunkSize);
-
- // empty file, no need for native back-and-forth on this
- if (chunkSize === 0) {
- @readableStreamClose(stream);
- return;
- }
-
- var instance = new Prototype(nativePtr, chunkSize);
- Prototype.registry.register(instance, nativePtr);
- @createReadableStreamController.@call(stream, instance, @undefined, true);
-}
-
@globalPrivate
function createNativeReadableStream(nativePtr, nativeType, autoAllocateChunkSize) {
"use strict";
@@ -766,18 +367,15 @@ function getReader(options)
throw @makeThisTypeError("ReadableStream", "getReader");
const mode = @toDictionary(options, { }, "ReadableStream.getReader takes an object as first argument").mode;
- if (mode === @undefined)
+ if (mode === @undefined) {
+ var start_ = @getByIdDirectPrivate(this, "start");
+ if (start_) {
+ start_.@call(this);
+ }
return new @ReadableStreamDefaultReader(this);
-
+ }
// String conversion is required by spec, hence double equals.
if (mode == 'byob') {
- var controller = @getByIdDirectPrivate(this, "controller");
- if (@isReadableStreamDefaultController(controller)) {
- @readableStreamDefaultControllerStart(controller);
- } else {
- @readableStreamByteStreamControllerStart(controller);
- }
-
return new @ReadableStreamBYOBReader(this);
}