diff options
Diffstat (limited to 'src/javascript/jsc/bindings/builtins/js')
4 files changed, 485 insertions, 471 deletions
diff --git a/src/javascript/jsc/bindings/builtins/js/ReadableByteStreamInternals.js b/src/javascript/jsc/bindings/builtins/js/ReadableByteStreamInternals.js index 372c980a1..f59a7bdbc 100644 --- a/src/javascript/jsc/bindings/builtins/js/ReadableByteStreamInternals.js +++ b/src/javascript/jsc/bindings/builtins/js/ReadableByteStreamInternals.js @@ -61,12 +61,20 @@ function privateInitializeReadableByteStreamController(stream, underlyingByteSou @putByIdDirectPrivate(this, "cancel", @readableByteStreamControllerCancel); @putByIdDirectPrivate(this, "pull", @readableByteStreamControllerPull); + if (@getByIdDirectPrivate(underlyingByteSource, "lazy") === true) { + @putByIdDirectPrivate(this, "start", () => @readableStreamByteStreamControllerStart(this)); + } else { + @putByIdDirectPrivate(this, "start", @undefined); + @readableStreamByteStreamControllerStart(this); + } + return this; } function readableStreamByteStreamControllerStart(controller) { "use strict"; - + @putByIdDirectPrivate(controller, "start", @undefined); + if (@getByIdDirectPrivate(controller, "started") !== -1) return; @@ -84,6 +92,7 @@ function readableStreamByteStreamControllerStart(controller) { }); } + function privateInitializeReadableStreamBYOBRequest(controller, view) { "use strict"; diff --git a/src/javascript/jsc/bindings/builtins/js/ReadableStream.js b/src/javascript/jsc/bindings/builtins/js/ReadableStream.js index cf9eba8b6..cfe68d13c 100644 --- a/src/javascript/jsc/bindings/builtins/js/ReadableStream.js +++ b/src/javascript/jsc/bindings/builtins/js/ReadableStream.js @@ -52,38 +52,27 @@ function initializeReadableStream(underlyingSource, strategy) const isDirect = underlyingSource.type === "direct"; // direct streams are always lazy - const isLazy = isDirect || !!underlyingSource.@lazy; + const isUnderlyingSourceLazy = !!underlyingSource.@lazy; + const isLazy = isDirect || isUnderlyingSourceLazy; - - @putByIdDirectPrivate(this, "direct", isDirect); - - // FIXME: We should introduce https://streams.spec.whatwg.org/#create-readable-stream. - // For now, we emulate this with underlyingSource with private properties. - if (@getByIdDirectPrivate(underlyingSource, "pull") !== @undefined) { - @putByIdDirectPrivate(this, "underlyingSource", @undefined); + // // FIXME: We should introduce https://streams.spec.whatwg.org/#create-readable-stream. + // // For now, we emulate this with underlyingSource with private properties. + if (@getByIdDirectPrivate(underlyingSource, "pull") !== @undefined && !isLazy) { const size = @getByIdDirectPrivate(strategy, "size"); const highWaterMark = @getByIdDirectPrivate(strategy, "highWaterMark"); @setupReadableStreamDefaultController(this, underlyingSource, size, highWaterMark !== @undefined ? highWaterMark : 1, @getByIdDirectPrivate(underlyingSource, "start"), @getByIdDirectPrivate(underlyingSource, "pull"), @getByIdDirectPrivate(underlyingSource, "cancel")); return this; } - - if (isLazy) { - - if (isDirect) { - if ("start" in underlyingSource && typeof underlyingSource.start === "function") - @throwTypeError("\"start\" for direct streams are not implemented yet"); - - - @putByIdDirectPrivate(this, "underlyingSource", underlyingSource); - @putByIdDirectPrivate(this, "start", () => @createReadableStreamController(this, underlyingSource, strategy, true)); - } else { - @putByIdDirectPrivate(this, "underlyingSource", @undefined); - const autoAllocateChunkSize = underlyingSource.autoAllocateChunkSize; - @putByIdDirectPrivate(this, "start", () => @lazyLoadStream(this, autoAllocateChunkSize)); - } + if (isDirect) { + if ("start" in underlyingSource && typeof underlyingSource.start === "function") + @throwTypeError("\"start\" for direct streams are not implemented yet"); + + @putByIdDirectPrivate(this, "start", () => @createReadableStreamController.@call(this, underlyingSource, strategy, true)); + } else if (isLazy) { + const autoAllocateChunkSize = underlyingSource.autoAllocateChunkSize; + @putByIdDirectPrivate(this, "start", () => @lazyLoadStream(this, autoAllocateChunkSize)); } else { - @putByIdDirectPrivate(this, "underlyingSource", @undefined); @putByIdDirectPrivate(this, "start", @undefined); @createReadableStreamController.@call(this, underlyingSource, strategy, false); } @@ -92,300 +81,6 @@ function initializeReadableStream(underlyingSource, strategy) return this; } -function handleDirectStreamError(e) { - "use strict"; - - var controller = this; - var sink = controller.@sink; - if (sink) { - @putByIdDirectPrivate(controller "sink", @undefined); - try { - sink.close(e); - } catch (f) {} - } - - this.error = this.drain = this.write = this.close = this.end = @onReadableStreamDirectControllerClosed; - - if (typeof this.@underlyingSource.close === 'function') { - try { - this.@underlyingSource.close.@call(this.@underlyingSource, e); - } catch (e) { - } - } - - try { - var pend = controller._pendingRead; - if (pend) { - controller._pendingRead = @undefined; - @rejectPromise(pend, e); - } - } catch (f) {} - var stream = controller.@controlledReadableStream; - if (stream) @readableStreamError(stream, e); -} - -function handleDirectStreamErrorReject(e) { - @handleDirectStreamError.@call(this, e); - return @Promise.@reject(e); -} - -function onPullDirectStream(controller) -{ - - "use strict"; - - var stream = controller.@controlledReadableStream; - if (!stream || @getByIdDirectPrivate(stream, "state") !== @streamReadable) - return; - - // pull is in progress - // this is a recursive call - // ignore it - if (controller._deferClose === -1) { - return; - } - - - controller._deferClose = -1; - controller._deferDrain = -1; - var deferClose; - var deferDrain; - - // Direct streams allow @pull to be called multiple times, unlike the spec. - // Backpressure is handled by the destination, not by the underlying source. - // In this case, we rely on the heuristic that repeatedly draining in the same tick - // is bad for performance - // this code is only run when consuming a direct stream from JS - // without the HTTP server or anything else - try { - var result = controller.@underlyingSource.@pull( - controller, - ); - - if (result && @isPromise(result)) { - if (controller._handleError === @undefined) { - controller._handleError = @handleDirectStreamErrorReject.@bind(controller); - } - - @Promise.prototype.@catch.@call(result, controller._handleError); - } - } catch(e) { - return @handleDirectStreamErrorReject.@call(controller, e); - } finally { - deferDrain = controller._deferClose; - deferClose = controller._deferDrain; - controller._deferDrain = 0; - controller._deferClose = 0; - - } - - - var promiseToReturn; - - - if (controller._pendingRead === @undefined) { - controller._pendingRead = promiseToReturn = @newPromise(); - } else { - promiseToReturn = @readableStreamAddReadRequest(stream); - } - - // they called close during @pull() - // we delay that - if (deferClose === 1) { - var reason = controller._deferCloseReason; - controller._deferCloseReason = @undefined; - @onCloseDirectStream.@call(controller, reason); - return promiseToReturn; - } - - // not done, but they called drain() - if (deferDrain === 1) { - @onDrainDirectStream.@call(controller); - } - - - return promiseToReturn; -} - -function noopDoneFunction() { - return @Promise.@resolve({value: @undefined, done: true}); -} - -function onReadableStreamDirectControllerClosed(reason) -{ - "use strict"; - @throwTypeError("ReadableStreamDirectController is now closed"); -} - -function onCloseDirectStream(reason) -{ - "use strict"; - var stream = this.@controlledReadableStream; - if (!stream || @getByIdDirectPrivate(stream, "state") !== @streamReadable) - return; - - if (this._deferClose !== 0) { - this._deferClose = 1; - this._deferCloseReason = reason; - return; - } - - @putByIdDirectPrivate(stream, "state", @streamClosing); - if (typeof this.@underlyingSource.close === 'function') { - try { - this.@underlyingSource.close.@call(this.@underlyingSource, reason); - } catch (e) { - - } - } - - var drained; - try { - drained = this.@sink.end(); - @putByIdDirectPrivate(this, "sink", @undefined); - } catch (e) { - if (this._pendingRead) { - var read = this._pendingRead; - this._pendingRead = @undefined; - @rejectPromise(read, e); - } - @readableStreamError(stream, e); - return; - } - - this.error = this.drain = this.write = this.close = this.end = @onReadableStreamDirectControllerClosed; - - var reader = @getByIdDirectPrivate(stream, "reader"); - - if (reader && @isReadableStreamDefaultReader(reader)) { - var _pendingRead = this._pendingRead; - if (_pendingRead && @isPromise(_pendingRead) && drained?.byteLength) { - this._pendingRead = @undefined; - @fulfillPromise(_pendingRead, {value: drained, done: false}); - @readableStreamClose(stream); - return; - } - } - - if (drained?.byteLength) { - var requests = @getByIdDirectPrivate(reader, "readRequests"); - if (requests?.isNotEmpty()) { - @readableStreamFulfillReadRequest(stream, drained, false); - @readableStreamClose(stream); - return; - } - - @putByIdDirectPrivate(stream, "state", @streamReadable); - this.@pull = () => { - var thisResult = @createFulfilledPromise({value: drained, done: false}); - drained = @undefined; - @readableStreamClose(stream); - stream = @undefined; - return thisResult; - }; - } else if (this._pendingRead) { - var read = this._pendingRead; - this._pendingRead = @undefined; - @putByIdDirectPrivate(this, "pull", @noopDoneFunction); - @fulfillPromise(read, {value: @undefined, done: true}); - } - - @readableStreamClose(stream); -} - -function onDrainDirectStream() -{ - "use strict"; - - var straem = this.@controlledReadableStream; - var reader = @getByIdDirectPrivate(stream, "reader"); - if (!reader || !@isReadableStreamDefaultReader(reader)) { - return; - } - - var _pendingRead = this._pendingRead; - this._pendingRead = @undefined; - if (_pendingRead && @isPromise(_pendingRead)) { - var drained = this.@sink.drain(); - if (drained?.byteLength) { - this._pendingRead = @getByIdDirectPrivate(stream, "readRequests")?.shift(); - @fulfillPromise(_pendingRead, {value: drained, done: false}); - } else { - this._pendingRead = _pendingRead; - } - } else if (@getByIdDirectPrivate(stream, "readRequests")?.isNotEmpty()) { - var drained = this.@sink.drain(); - if (drained?.byteLength) { - @readableStreamFulfillReadRequest(stream, drained, false); - } - } else if (this._deferDrain === -1) { - this._deferDrain = 1; - } - -} - -function initializeArrayBufferStream(underlyingSource, highWaterMark) -{ - "use strict"; - - // This is the fallback implementation for direct streams - // When we don't know what the destination type is - // We assume it is a Uint8Array. - var sink = new globalThis.Bun.ArrayBufferSink(highWaterMark ? {highWaterMark, stream: true, asUint8Array: true} : {stream: true, asUint8Array: true}); - var controller = { - @underlyingSource: underlyingSource, - @pull: @onPullDirectStream, - @controlledReadableStream: this, - @sink: sink, - close: @onCloseDirectStream, - write: sink.write.@bind(sink), - end: @onCloseDirectStream, - drain: @onDrainDirectStream, - _pendingRead: @undefined, - _deferClose: 0, - _deferDrain: 0, - _deferCloseReason: @undefined, - _handleError: @undefined, - }; - - - @putByIdDirectPrivate(this, "readableStreamController", controller); - -} - -function createReadableStreamController(underlyingSource, strategy, fromLazy) { - if (fromLazy) { - @putByIdDirectPrivate(this, "start", @undefined); - } - - const type = underlyingSource.type; - const typeString = @toString(type); - - if (typeString === "bytes") { - // if (!@readableByteStreamAPIEnabled()) - // @throwTypeError("ReadableByteStreamController is not implemented"); - - if (strategy.highWaterMark === @undefined) - strategy.highWaterMark = 0; - if (strategy.size !== @undefined) - @throwRangeError("Strategy for a ReadableByteStreamController cannot have a size"); - - @putByIdDirectPrivate(this, "readableStreamController", new @ReadableByteStreamController(this, underlyingSource, strategy.highWaterMark, @isReadableStream)); - } else if (typeString === "direct") { - if (strategy.size !== @undefined) - @throwRangeError("Strategy for a ReadableDirectStreamController cannot have a size"); - - var highWaterMark = strategy.highWaterMark; - @initializeArrayBufferStream.@call(this, underlyingSource, highWaterMark); - } else if (type === @undefined) { - if (strategy.highWaterMark === @undefined) - strategy.highWaterMark = 1; - - @setupReadableStreamDefaultController(this, underlyingSource, strategy.size, strategy.highWaterMark, underlyingSource.start, underlyingSource.pull, underlyingSource.cancel); - } else - @throwRangeError("Invalid type for underlying source"); - -} @globalPrivate function readableStreamToArray(stream) { @@ -516,14 +211,14 @@ function readableStreamToArrayPublic(stream) { @globalPrivate function consumeReadableStream(nativePtr, nativeType, inputStream) { "use strict"; - const symbol = Symbol.for("Bun.consumeReadableStreamPrototype"); + const symbol = globalThis.Symbol.for("Bun.consumeReadableStreamPrototype"); var cached = globalThis[symbol]; if (!cached) { cached = globalThis[symbol] = []; } var Prototype = cached[nativeType]; if (Prototype === @undefined) { - var [doRead, doError, doReadMany, doClose, onClose, deinit] = globalThis[Symbol.for("Bun.lazy")](nativeType); + var [doRead, doError, doReadMany, doClose, onClose, deinit] = globalThis[globalThis.Symbol.for("Bun.lazy")](nativeType); Prototype = class NativeReadableStreamSink { constructor(reader, ptr) { @@ -639,100 +334,6 @@ function createEmptyReadableStream() { return stream; } -function lazyLoadStream(stream, autoAllocateChunkSize) { - "use strict"; - - @putByIdDirectPrivate(stream, "start", @undefined); - var bunNativeType = @getByIdDirectPrivate(stream, "bunNativeType"); - var bunNativePtr = @getByIdDirectPrivate(stream, "bunNativePtr"); - - var cached = globalThis[Symbol.for("Bun.nativeReadableStreamPrototype")] ||= new @Map; - var Prototype = cached.@get(nativeType); - if (Prototype === @undefined) { - var [pull, start, cancel, setClose, deinit] = globalThis[Symbol.for("Bun.lazy")](nativeType); - var closer = [false]; - var handleResult; - function handleNativeReadableStreamPromiseResult(val) { - "use strict"; - var {c, v} = this; - this.c = @undefined; - this.v = @undefined; - handleResult(val, c, v); - } - - handleResult = function handleResult(result, controller, view) { - "use strict"; - - if (result && @isPromise(result)) { - return result.then(handleNativeReadableStreamPromiseResult.bind({c: controller, v: view}), (err) => controller.error(err)); - } else if (result !== false) { - if (view && view.byteLength === result) { - controller.byobRequest.respondWithNewView(view); - } else { - controller.byobRequest.respond(result); - } - } - - if (closer[0] || result === false) { - @enqueueJob(() => controller.close()); - closer[0] = false; - } - }; - - Prototype = class NativeReadableStreamSource { - constructor(tag, autoAllocateChunkSize) { - this.pull = this.pull_.bind(tag); - this.cancel = this.cancel_.bind(tag); - this.autoAllocateChunkSize = autoAllocateChunkSize; - } - - pull; - cancel; - - type = "bytes"; - autoAllocateChunkSize = 0; - - static startSync = start; - - pull_(controller) { - closer[0] = false; - var result; - - const view = controller.byobRequest.view; - try { - result = pull(this, view, closer); - } catch(err) { - return controller.error(err); - } - - return handleResult(result, controller, view); - } - - cancel_(reason) { - cancel(this, reason); - } - - static registry = new FinalizationRegistry(deinit); - } - cached.@set(nativeType, Prototype); - } - - // either returns the chunk size - // or throws an error - // should never return a Promise - const chunkSize = Prototype.startSync(nativePtr, autoAllocateChunkSize); - - // empty file, no need for native back-and-forth on this - if (chunkSize === 0) { - @readableStreamClose(stream); - return; - } - - var instance = new Prototype(nativePtr, chunkSize); - Prototype.registry.register(instance, nativePtr); - @createReadableStreamController.@call(stream, instance, @undefined, true); -} - @globalPrivate function createNativeReadableStream(nativePtr, nativeType, autoAllocateChunkSize) { "use strict"; @@ -766,18 +367,15 @@ function getReader(options) throw @makeThisTypeError("ReadableStream", "getReader"); const mode = @toDictionary(options, { }, "ReadableStream.getReader takes an object as first argument").mode; - if (mode === @undefined) + if (mode === @undefined) { + var start_ = @getByIdDirectPrivate(this, "start"); + if (start_) { + start_.@call(this); + } return new @ReadableStreamDefaultReader(this); - + } // String conversion is required by spec, hence double equals. if (mode == 'byob') { - var controller = @getByIdDirectPrivate(this, "controller"); - if (@isReadableStreamDefaultController(controller)) { - @readableStreamDefaultControllerStart(controller); - } else { - @readableStreamByteStreamControllerStart(controller); - } - return new @ReadableStreamBYOBReader(this); } diff --git a/src/javascript/jsc/bindings/builtins/js/ReadableStreamInternals.js b/src/javascript/jsc/bindings/builtins/js/ReadableStreamInternals.js index 5d0779745..d2dfd5137 100644 --- a/src/javascript/jsc/bindings/builtins/js/ReadableStreamInternals.js +++ b/src/javascript/jsc/bindings/builtins/js/ReadableStreamInternals.js @@ -67,48 +67,6 @@ function privateInitializeReadableStreamDefaultController(stream, underlyingSour return this; } - -// https://streams.spec.whatwg.org/#set-up-readable-stream-default-controller, starting from step 6. -// The other part is implemented in privateInitializeReadableStreamDefaultController. -function setupReadableStreamDefaultController(stream, underlyingSource, size, highWaterMark, startMethod, pullMethod, cancelMethod) -{ - "use strict"; - - const controller = new @ReadableStreamDefaultController(stream, underlyingSource, size, highWaterMark, @isReadableStream); - - const pullAlgorithm = () => @promiseInvokeOrNoopMethod(underlyingSource, pullMethod, [controller]); - const cancelAlgorithm = (reason) => @promiseInvokeOrNoopMethod(underlyingSource, cancelMethod, [reason]); - - @putByIdDirectPrivate(controller, "pullAlgorithm", pullAlgorithm); - @putByIdDirectPrivate(controller, "cancelAlgorithm", cancelAlgorithm); - @putByIdDirectPrivate(controller, "pull", @readableStreamDefaultControllerPull); - @putByIdDirectPrivate(controller, "cancel", @readableStreamDefaultControllerCancel); - @putByIdDirectPrivate(stream, "readableStreamController", controller); - - if (@getByIdDirectPrivate(controller, "sink") === @undefined) { - @readableStreamDefaultControllerStart(controller); - } - -} - -function readableStreamDefaultControllerStart(controller) { - if (@getByIdDirectPrivate(controller, "started") !== -1) - return; - - const underlyingSource = @getByIdDirectPrivate(controller, "underlyingSource"); - const startMethod = underlyingSource.start; - @putByIdDirectPrivate(controller, "started", 0); - - return @promiseInvokeOrNoopMethodNoCatch(underlyingSource, startMethod, [controller]).@then(() => { - @putByIdDirectPrivate(controller, "started", 1); - @assert(!@getByIdDirectPrivate(controller, "pulling")); - @assert(!@getByIdDirectPrivate(controller, "pullAgain")); - @readableStreamDefaultControllerCallPullIfNeeded(controller); - }, (error) => { - @readableStreamDefaultControllerError(controller, error); - }); -} - function readableStreamDefaultControllerError(controller, error) { "use strict"; @@ -157,12 +115,88 @@ function acquireReadableStreamDefaultReader(stream) "use strict"; var start = @getByIdDirectPrivate(stream, "start"); if (start) { - start(); + start.@call(stream); } return new @ReadableStreamDefaultReader(stream); } +// https://streams.spec.whatwg.org/#set-up-readable-stream-default-controller, starting from step 6. +// The other part is implemented in privateInitializeReadableStreamDefaultController. +function setupReadableStreamDefaultController(stream, underlyingSource, size, highWaterMark, startMethod, pullMethod, cancelMethod) +{ + "use strict"; + + const controller = new @ReadableStreamDefaultController(stream, underlyingSource, size, highWaterMark, @isReadableStream); + + const pullAlgorithm = () => @promiseInvokeOrNoopMethod(underlyingSource, pullMethod, [controller]); + const cancelAlgorithm = (reason) => @promiseInvokeOrNoopMethod(underlyingSource, cancelMethod, [reason]); + + @putByIdDirectPrivate(controller, "pullAlgorithm", pullAlgorithm); + @putByIdDirectPrivate(controller, "cancelAlgorithm", cancelAlgorithm); + @putByIdDirectPrivate(controller, "pull", @readableStreamDefaultControllerPull); + @putByIdDirectPrivate(controller, "cancel", @readableStreamDefaultControllerCancel); + @putByIdDirectPrivate(stream, "readableStreamController", controller); + + @readableStreamDefaultControllerStart(controller); + +} + + +function createReadableStreamController(underlyingSource, strategy, fromLazy) { + if (fromLazy) { + @putByIdDirectPrivate(this, "start", @undefined); + } + + const type = underlyingSource.type; + const typeString = @toString(type); + + if (typeString === "bytes") { + // if (!@readableByteStreamAPIEnabled()) + // @throwTypeError("ReadableByteStreamController is not implemented"); + + if (strategy.highWaterMark === @undefined) + strategy.highWaterMark = 0; + if (strategy.size !== @undefined) + @throwRangeError("Strategy for a ReadableByteStreamController cannot have a size"); + + @putByIdDirectPrivate(this, "readableStreamController", new @ReadableByteStreamController(this, underlyingSource, strategy.highWaterMark, @isReadableStream)); + } else if (typeString === "direct") { + var highWaterMark = strategy?.highWaterMark; + @initializeArrayBufferStream.@call(this, underlyingSource, highWaterMark); + } else if (type === @undefined) { + if (strategy.highWaterMark === @undefined) + strategy.highWaterMark = 1; + + @setupReadableStreamDefaultController(this, underlyingSource, strategy.size, strategy.highWaterMark, underlyingSource.start, underlyingSource.pull, underlyingSource.cancel); + } else + @throwRangeError("Invalid type for underlying source"); + +} + +function readableStreamDefaultControllerStart(controller) { + "use strict"; + + + + if (@getByIdDirectPrivate(controller, "started") !== -1) + return; + + const underlyingSource = @getByIdDirectPrivate(controller, "underlyingSource"); + const startMethod = underlyingSource.start; + @putByIdDirectPrivate(controller, "started", 0); + + @promiseInvokeOrNoopMethodNoCatch(underlyingSource, startMethod, [controller]).@then(() => { + @putByIdDirectPrivate(controller, "started", 1); + @assert(!@getByIdDirectPrivate(controller, "pulling")); + @assert(!@getByIdDirectPrivate(controller, "pullAgain")); + @readableStreamDefaultControllerCallPullIfNeeded(controller); + }, (error) => { + @readableStreamDefaultControllerError(controller, error); + }); +} + + // FIXME: Replace readableStreamPipeTo by below function. // This method implements the latest https://streams.spec.whatwg.org/#readable-stream-pipe-to. function readableStreamPipeToWritableStream(source, destination, preventClose, preventAbort, preventCancel, signal) @@ -563,6 +597,281 @@ function isReadableStreamDefaultController(controller) return @isObject(controller) && !!@getByIdDirectPrivate(controller, "underlyingSource"); } + +@globalPrivate +function assignDirectStream() { + "use strict"; + + var stream = this; +} + + +function handleDirectStreamError(e) { + "use strict"; + + var controller = this; + var sink = controller.@sink; + if (sink) { + @putByIdDirectPrivate(controller, "sink", @undefined); + try { + sink.close(e); + } catch (f) {} + } + + this.error = this.drain = this.write = this.close = this.end = @onReadableStreamDirectControllerClosed; + + if (typeof this.@underlyingSource.close === 'function') { + try { + this.@underlyingSource.close.@call(this.@underlyingSource, e); + } catch (e) { + } + } + + try { + var pend = controller._pendingRead; + if (pend) { + controller._pendingRead = @undefined; + @rejectPromise(pend, e); + } + } catch (f) {} + var stream = controller.@controlledReadableStream; + if (stream) @readableStreamError(stream, e); +} + +function handleDirectStreamErrorReject(e) { + @handleDirectStreamError.@call(this, e); + return @Promise.@reject(e); +} + +function onPullDirectStream(controller) +{ + + "use strict"; + + var stream = controller.@controlledReadableStream; + if (!stream || @getByIdDirectPrivate(stream, "state") !== @streamReadable) + return; + + // pull is in progress + // this is a recursive call + // ignore it + if (controller._deferClose === -1) { + return; + } + + + controller._deferClose = -1; + controller._deferDrain = -1; + var deferClose; + var deferDrain; + + // Direct streams allow @pull to be called multiple times, unlike the spec. + // Backpressure is handled by the destination, not by the underlying source. + // In this case, we rely on the heuristic that repeatedly draining in the same tick + // is bad for performance + // this code is only run when consuming a direct stream from JS + // without the HTTP server or anything else + try { + var result = controller.@underlyingSource.pull( + controller, + ); + + if (result && @isPromise(result)) { + if (controller._handleError === @undefined) { + controller._handleError = @handleDirectStreamErrorReject.bind(controller); + } + + @Promise.prototype.catch.@call(result, controller._handleError); + } + } catch(e) { + return @handleDirectStreamErrorReject.@call(controller, e); + } finally { + deferClose = controller._deferClose; + deferDrain = controller._deferDrain; + controller._deferDrain = controller._deferClose = 0; + } + + + var promiseToReturn; + + + if (controller._pendingRead === @undefined) { + controller._pendingRead = promiseToReturn = @newPromise(); + } else { + promiseToReturn = @readableStreamAddReadRequest(stream); + } + + + // they called close during @pull() + // we delay that + if (deferClose === 1) { + var reason = controller._deferCloseReason; + controller._deferCloseReason = @undefined; + @onCloseDirectStream.@call(controller, reason); + return promiseToReturn; + } + + // not done, but they called drain() + if (deferDrain === 1) { + @onDrainDirectStream.@call(controller); + } + + + return promiseToReturn; +} + +function noopDoneFunction() { + return @Promise.@resolve({value: @undefined, done: true}); +} + +function onReadableStreamDirectControllerClosed(reason) +{ + "use strict"; + @throwTypeError("ReadableStreamDirectController is now closed"); +} + +function onCloseDirectStream(reason) +{ + "use strict"; + var stream = this.@controlledReadableStream; + if (!stream || @getByIdDirectPrivate(stream, "state") !== @streamReadable) + return; + + if (this._deferClose !== 0) { + this._deferClose = 1; + this._deferCloseReason = reason; + return; + } + + @putByIdDirectPrivate(stream, "state", @streamClosing); + if (typeof this.@underlyingSource.close === 'function') { + try { + this.@underlyingSource.close.@call(this.@underlyingSource, reason); + } catch (e) { + + } + } + + var drained; + try { + drained = this.@sink.end(); + @putByIdDirectPrivate(this, "sink", @undefined); + } catch (e) { + if (this._pendingRead) { + var read = this._pendingRead; + this._pendingRead = @undefined; + @rejectPromise(read, e); + } + @readableStreamError(stream, e); + return; + } + + this.error = this.drain = this.write = this.close = this.end = @onReadableStreamDirectControllerClosed; + + var reader = @getByIdDirectPrivate(stream, "reader"); + + if (reader && @isReadableStreamDefaultReader(reader)) { + var _pendingRead = this._pendingRead; + if (_pendingRead && @isPromise(_pendingRead) && drained?.byteLength) { + this._pendingRead = @undefined; + @fulfillPromise(_pendingRead, {value: drained, done: false}); + @readableStreamClose(stream); + return; + } + } + + if (drained?.byteLength) { + var requests = @getByIdDirectPrivate(reader, "readRequests"); + if (requests?.isNotEmpty()) { + @readableStreamFulfillReadRequest(stream, drained, false); + @readableStreamClose(stream); + return; + } + + @putByIdDirectPrivate(stream, "state", @streamReadable); + this.@pull = () => { + var thisResult = @createFulfilledPromise({value: drained, done: false}); + drained = @undefined; + @readableStreamClose(stream); + stream = @undefined; + return thisResult; + }; + } else if (this._pendingRead) { + var read = this._pendingRead; + this._pendingRead = @undefined; + @putByIdDirectPrivate(this, "pull", @noopDoneFunction); + @fulfillPromise(read, {value: @undefined, done: true}); + } + + @readableStreamClose(stream); +} + +function onDrainDirectStream() +{ + "use strict"; + + var stream = this.@controlledReadableStream; + var reader = @getByIdDirectPrivate(stream, "reader"); + if (!reader || !@isReadableStreamDefaultReader(reader)) { + return; + } + + var _pendingRead = this._pendingRead; + this._pendingRead = @undefined; + if (_pendingRead && @isPromise(_pendingRead)) { + var drained = this.@sink.drain(); + if (drained?.byteLength) { + this._pendingRead = @getByIdDirectPrivate(stream, "readRequests")?.shift(); + @fulfillPromise(_pendingRead, {value: drained, done: false}); + } else { + this._pendingRead = _pendingRead; + } + } else if (@getByIdDirectPrivate(stream, "readRequests")?.isNotEmpty()) { + var drained = this.@sink.drain(); + if (drained?.byteLength) { + @readableStreamFulfillReadRequest(stream, drained, false); + } + } else if (this._deferDrain === -1) { + this._deferDrain = 1; + } + +} + +function initializeArrayBufferStream(underlyingSource, highWaterMark) +{ + "use strict"; + + // This is the fallback implementation for direct streams + // When we don't know what the destination type is + // We assume it is a Uint8Array. + + var opts = highWaterMark ? {highWaterMark, stream: true, asUint8Array: true} : {stream: true, asUint8Array: true}; + var sink = new globalThis.Bun.ArrayBufferSink(); + sink.start(opts); + + var controller = { + @underlyingSource: underlyingSource, + @pull: @onPullDirectStream, + @controlledReadableStream: this, + @sink: sink, + close: @onCloseDirectStream, + write: sink.write.bind(sink), + error: @handleDirectStreamError, + end: @onCloseDirectStream, + @close: @onCloseDirectStream, + drain: @onDrainDirectStream, + _pendingRead: @undefined, + _deferClose: 0, + _deferDrain: 0, + _deferCloseReason: @undefined, + _handleError: @undefined, + }; + + + @putByIdDirectPrivate(this, "readableStreamController", controller); + +} + function readableStreamError(stream, error) { "use strict"; @@ -597,11 +906,13 @@ function readableStreamError(stream, error) function readableStreamDefaultControllerShouldCallPull(controller) { - const stream = @getByIdDirectPrivate(controller, "controlledReadableStream"); + "use strict"; + const stream = @getByIdDirectPrivate(controller, "controlledReadableStream"); + if (!@readableStreamDefaultControllerCanCloseOrEnqueue(controller)) return false; - if (!(@getByIdDirectPrivate(controller, "started") > 0)) + if (!(@getByIdDirectPrivate(controller, "started") === 1)) return false; if ((!@isReadableStreamLocked(stream) || !@getByIdDirectPrivate(@getByIdDirectPrivate(stream, "reader"), "readRequests")?.isNotEmpty()) && @readableStreamDefaultControllerGetDesiredSize(controller) <= 0) return false; @@ -619,7 +930,7 @@ function readableStreamDefaultControllerCallPullIfNeeded(controller) if (!@readableStreamDefaultControllerCanCloseOrEnqueue(controller)) return; - if (!(@getByIdDirectPrivate(controller, "started") > 0)) + if (!(@getByIdDirectPrivate(controller, "started") === 1)) return; if ((!@isReadableStreamLocked(stream) || !@getByIdDirectPrivate(@getByIdDirectPrivate(stream, "reader"), "readRequests")?.isNotEmpty()) && @readableStreamDefaultControllerGetDesiredSize(controller) <= 0) return; @@ -629,6 +940,7 @@ function readableStreamDefaultControllerCallPullIfNeeded(controller) return; } + @assert(!@getByIdDirectPrivate(controller, "pullAgain")); @putByIdDirectPrivate(controller, "pulling", true); @@ -636,6 +948,7 @@ function readableStreamDefaultControllerCallPullIfNeeded(controller) @putByIdDirectPrivate(controller, "pulling", false); if (@getByIdDirectPrivate(controller, "pullAgain")) { @putByIdDirectPrivate(controller, "pullAgain", false); + @readableStreamDefaultControllerCallPullIfNeeded(controller); } }, function(error) { @@ -849,3 +1162,98 @@ function readableStreamDefaultControllerCanCloseOrEnqueue(controller) return !@getByIdDirectPrivate(controller, "closeRequested") && @getByIdDirectPrivate(@getByIdDirectPrivate(controller, "controlledReadableStream"), "state") === @streamReadable; } + + +function lazyLoadStream(stream, autoAllocateChunkSize) { + "use strict"; + + @putByIdDirectPrivate(stream, "start", @undefined); + var bunNativeType = @getByIdDirectPrivate(stream, "bunNativeType"); + var bunNativePtr = @getByIdDirectPrivate(stream, "bunNativePtr"); + + var cached = globalThis[globalThis.Symbol.for("Bun.nativeReadableStreamPrototype")] ||= new @Map; + var Prototype = cached.@get(nativeType); + if (Prototype === @undefined) { + var [pull, start, cancel, setClose, deinit] = globalThis[globalThis.Symbol.for("Bun.lazy")](nativeType); + var closer = [false]; + var handleResult; + function handleNativeReadableStreamPromiseResult(val) { + "use strict"; + var {c, v} = this; + this.c = @undefined; + this.v = @undefined; + handleResult(val, c, v); + } + + handleResult = function handleResult(result, controller, view) { + "use strict"; + + if (result && @isPromise(result)) { + return result.then(handleNativeReadableStreamPromiseResult.bind({c: controller, v: view}), (err) => controller.error(err)); + } else if (result !== false) { + if (view && view.byteLength === result) { + controller.byobRequest.respondWithNewView(view); + } else { + controller.byobRequest.respond(result); + } + } + + if (closer[0] || result === false) { + @enqueueJob(() => controller.close()); + closer[0] = false; + } + }; + + Prototype = class NativeReadableStreamSource { + constructor(tag, autoAllocateChunkSize) { + this.pull = this.pull_.bind(tag); + this.cancel = this.cancel_.bind(tag); + this.autoAllocateChunkSize = autoAllocateChunkSize; + } + + pull; + cancel; + + type = "bytes"; + autoAllocateChunkSize = 0; + + static startSync = start; + + pull_(controller) { + closer[0] = false; + var result; + + const view = controller.byobRequest.view; + try { + result = pull(this, view, closer); + } catch(err) { + return controller.error(err); + } + + return handleResult(result, controller, view); + } + + cancel_(reason) { + cancel(this, reason); + } + + static registry = new FinalizationRegistry(deinit); + } + cached.@set(nativeType, Prototype); + } + + // either returns the chunk size + // or throws an error + // should never return a Promise + const chunkSize = Prototype.startSync(nativePtr, autoAllocateChunkSize); + + // empty file, no need for native back-and-forth on this + if (chunkSize === 0) { + @readableStreamClose(stream); + return; + } + + var instance = new Prototype(nativePtr, chunkSize); + Prototype.registry.register(instance, nativePtr); + @createReadableStreamController.@call(stream, instance, @undefined, true); +}
\ No newline at end of file diff --git a/src/javascript/jsc/bindings/builtins/js/WritableStreamInternals.js b/src/javascript/jsc/bindings/builtins/js/WritableStreamInternals.js index 5870cd05d..5a97155f2 100644 --- a/src/javascript/jsc/bindings/builtins/js/WritableStreamInternals.js +++ b/src/javascript/jsc/bindings/builtins/js/WritableStreamInternals.js @@ -42,7 +42,6 @@ function isWritableStreamDefaultWriter(writer) function acquireWritableStreamDefaultWriter(stream) { - @writableStreamDefaultControllerStart(@getByIdDirectPrivate(stream, "controller")); return new @WritableStreamDefaultWriter(stream); } |