diff options
Diffstat (limited to 'src/bun.js/builtins/js/ReadableStreamInternals.js')
-rw-r--r-- | src/bun.js/builtins/js/ReadableStreamInternals.js | 1255 |
1 files changed, 1255 insertions, 0 deletions
diff --git a/src/bun.js/builtins/js/ReadableStreamInternals.js b/src/bun.js/builtins/js/ReadableStreamInternals.js new file mode 100644 index 000000000..3e6590f31 --- /dev/null +++ b/src/bun.js/builtins/js/ReadableStreamInternals.js @@ -0,0 +1,1255 @@ +/* + * Copyright (C) 2015 Canon Inc. All rights reserved. + * Copyright (C) 2015 Igalia. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY APPLE INC. ``AS IS'' AND ANY + * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR + * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL APPLE INC. OR + * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, + * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, + * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR + * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY + * OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +// @internal + +function readableStreamReaderGenericInitialize(reader, stream) +{ + "use strict"; + + @putByIdDirectPrivate(reader, "ownerReadableStream", stream); + @putByIdDirectPrivate(stream, "reader", reader); + if (@getByIdDirectPrivate(stream, "state") === @streamReadable) + @putByIdDirectPrivate(reader, "closedPromiseCapability", @newPromiseCapability(@Promise)); + else if (@getByIdDirectPrivate(stream, "state") === @streamClosed) + @putByIdDirectPrivate(reader, "closedPromiseCapability", { @promise: @Promise.@resolve() }); + else { + @assert(@getByIdDirectPrivate(stream, "state") === @streamErrored); + @putByIdDirectPrivate(reader, "closedPromiseCapability", { @promise: @newHandledRejectedPromise(@getByIdDirectPrivate(stream, "storedError")) }); + } +} + +function privateInitializeReadableStreamDefaultController(stream, underlyingSource, size, highWaterMark) +{ + "use strict"; + + if (!@isReadableStream(stream)) + @throwTypeError("ReadableStreamDefaultController needs a ReadableStream"); + + // readableStreamController is initialized with null value. + if (@getByIdDirectPrivate(stream, "readableStreamController") !== null) + @throwTypeError("ReadableStream already has a controller"); + + + + @putByIdDirectPrivate(this, "controlledReadableStream", stream); + @putByIdDirectPrivate(this, "underlyingSource", underlyingSource); + @putByIdDirectPrivate(this, "queue", @newQueue()); + @putByIdDirectPrivate(this, "started", -1); + @putByIdDirectPrivate(this, "closeRequested", false); + @putByIdDirectPrivate(this, "pullAgain", false); + @putByIdDirectPrivate(this, "pulling", false); + @putByIdDirectPrivate(this, "strategy", @validateAndNormalizeQueuingStrategy(size, highWaterMark)); + + return this; +} + +function readableStreamDefaultControllerError(controller, error) +{ + "use strict"; + + const stream = @getByIdDirectPrivate(controller, "controlledReadableStream"); + if (@getByIdDirectPrivate(stream, "state") !== @streamReadable) + return; + @putByIdDirectPrivate(controller, "queue", @newQueue()); + + @readableStreamError(stream, error); +} + +function readableStreamPipeTo(stream, sink) +{ + "use strict"; + @assert(@isReadableStream(stream)); + + const reader = new @ReadableStreamDefaultReader(stream); + + @getByIdDirectPrivate(reader, "closedPromiseCapability").@promise.@then(() => { }, (e) => { sink.error(e); }); + + function doPipe() { + @readableStreamDefaultReaderRead(reader).@then(function(result) { + if (result.done) { + sink.close(); + return; + } + try { + sink.enqueue(result.value); + } catch (e) { + sink.error("ReadableStream chunk enqueueing in the sink failed"); + return; + } + doPipe(); + }, function(e) { + sink.error(e); + }); + } + doPipe(); +} + + + +function acquireReadableStreamDefaultReader(stream) +{ + "use strict"; + var start = @getByIdDirectPrivate(stream, "start"); + if (start) { + start.@call(stream); + } + + return new @ReadableStreamDefaultReader(stream); +} + +// https://streams.spec.whatwg.org/#set-up-readable-stream-default-controller, starting from step 6. +// The other part is implemented in privateInitializeReadableStreamDefaultController. +function setupReadableStreamDefaultController(stream, underlyingSource, size, highWaterMark, startMethod, pullMethod, cancelMethod) +{ + "use strict"; + + const controller = new @ReadableStreamDefaultController(stream, underlyingSource, size, highWaterMark, @isReadableStream); + + const pullAlgorithm = () => @promiseInvokeOrNoopMethod(underlyingSource, pullMethod, [controller]); + const cancelAlgorithm = (reason) => @promiseInvokeOrNoopMethod(underlyingSource, cancelMethod, [reason]); + + @putByIdDirectPrivate(controller, "pullAlgorithm", pullAlgorithm); + @putByIdDirectPrivate(controller, "cancelAlgorithm", cancelAlgorithm); + @putByIdDirectPrivate(controller, "pull", @readableStreamDefaultControllerPull); + @putByIdDirectPrivate(controller, "cancel", @readableStreamDefaultControllerCancel); + @putByIdDirectPrivate(stream, "readableStreamController", controller); + + @readableStreamDefaultControllerStart(controller); + +} + + +function createReadableStreamController(stream, underlyingSource, strategy) { + "use strict"; + + const type = underlyingSource.type; + const typeString = @toString(type); + + if (typeString === "bytes") { + // if (!@readableByteStreamAPIEnabled()) + // @throwTypeError("ReadableByteStreamController is not implemented"); + + if (strategy.highWaterMark === @undefined) + strategy.highWaterMark = 0; + if (strategy.size !== @undefined) + @throwRangeError("Strategy for a ReadableByteStreamController cannot have a size"); + + @putByIdDirectPrivate(stream, "readableStreamController", new @ReadableByteStreamController(stream, underlyingSource, strategy.highWaterMark, @isReadableStream)); + } else if (typeString === "direct") { + var highWaterMark = strategy?.highWaterMark; + @initializeArrayBufferStream.@call(stream, underlyingSource, highWaterMark); + } else if (type === @undefined) { + if (strategy.highWaterMark === @undefined) + strategy.highWaterMark = 1; + + @setupReadableStreamDefaultController(stream, underlyingSource, strategy.size, strategy.highWaterMark, underlyingSource.start, underlyingSource.pull, underlyingSource.cancel); + } else + @throwRangeError("Invalid type for underlying source"); + +} + +function readableStreamDefaultControllerStart(controller) { + "use strict"; + + + + if (@getByIdDirectPrivate(controller, "started") !== -1) + return; + + const underlyingSource = @getByIdDirectPrivate(controller, "underlyingSource"); + const startMethod = underlyingSource.start; + @putByIdDirectPrivate(controller, "started", 0); + + @promiseInvokeOrNoopMethodNoCatch(underlyingSource, startMethod, [controller]).@then(() => { + @putByIdDirectPrivate(controller, "started", 1); + @assert(!@getByIdDirectPrivate(controller, "pulling")); + @assert(!@getByIdDirectPrivate(controller, "pullAgain")); + @readableStreamDefaultControllerCallPullIfNeeded(controller); + }, (error) => { + @readableStreamDefaultControllerError(controller, error); + }); +} + + +// FIXME: Replace readableStreamPipeTo by below function. +// This method implements the latest https://streams.spec.whatwg.org/#readable-stream-pipe-to. +function readableStreamPipeToWritableStream(source, destination, preventClose, preventAbort, preventCancel, signal) +{ + "use strict"; + + @assert(@isReadableStream(source)); + @assert(@isWritableStream(destination)); + @assert(!@isReadableStreamLocked(source)); + @assert(!@isWritableStreamLocked(destination)); + @assert(signal === @undefined || @isAbortSignal(signal)); + + if (@getByIdDirectPrivate(source, "underlyingByteSource") !== @undefined) + return @Promise.@reject("Piping to a readable bytestream is not supported"); + + let pipeState = { source : source, destination : destination, preventAbort : preventAbort, preventCancel : preventCancel, preventClose : preventClose, signal : signal }; + + pipeState.reader = @acquireReadableStreamDefaultReader(source); + pipeState.writer = @acquireWritableStreamDefaultWriter(destination); + + @putByIdDirectPrivate(source, "disturbed", true); + + pipeState.finalized = false; + pipeState.shuttingDown = false; + pipeState.promiseCapability = @newPromiseCapability(@Promise); + pipeState.pendingReadPromiseCapability = @newPromiseCapability(@Promise); + pipeState.pendingReadPromiseCapability.@resolve.@call(); + pipeState.pendingWritePromise = @Promise.@resolve(); + + if (signal !== @undefined) { + const algorithm = () => { + if (pipeState.finalized) + return; + + const error = @makeDOMException("AbortError", "abort pipeTo from signal"); + + @pipeToShutdownWithAction(pipeState, () => { + const shouldAbortDestination = !pipeState.preventAbort && @getByIdDirectPrivate(pipeState.destination, "state") === "writable"; + const promiseDestination = shouldAbortDestination ? @writableStreamAbort(pipeState.destination, error) : @Promise.@resolve(); + + const shouldAbortSource = !pipeState.preventCancel && @getByIdDirectPrivate(pipeState.source, "state") === @streamReadable; + const promiseSource = shouldAbortSource ? @readableStreamCancel(pipeState.source, error) : @Promise.@resolve(); + + let promiseCapability = @newPromiseCapability(@Promise); + let shouldWait = true; + let handleResolvedPromise = () => { + if (shouldWait) { + shouldWait = false; + return; + } + promiseCapability.@resolve.@call(); + } + let handleRejectedPromise = (e) => { + promiseCapability.@reject.@call(@undefined, e); + } + promiseDestination.@then(handleResolvedPromise, handleRejectedPromise); + promiseSource.@then(handleResolvedPromise, handleRejectedPromise); + return promiseCapability.@promise; + }, error); + }; + if (@whenSignalAborted(signal, algorithm)) + return pipeState.promiseCapability.@promise; + } + + @pipeToErrorsMustBePropagatedForward(pipeState); + @pipeToErrorsMustBePropagatedBackward(pipeState); + @pipeToClosingMustBePropagatedForward(pipeState); + @pipeToClosingMustBePropagatedBackward(pipeState); + + @pipeToLoop(pipeState); + + return pipeState.promiseCapability.@promise; +} + +function pipeToLoop(pipeState) +{ + "use strict"; + if (pipeState.shuttingDown) + return; + + @pipeToDoReadWrite(pipeState).@then((result) => { + if (result) + @pipeToLoop(pipeState); + }); +} + +function pipeToDoReadWrite(pipeState) +{ + "use strict"; + @assert(!pipeState.shuttingDown); + + pipeState.pendingReadPromiseCapability = @newPromiseCapability(@Promise); + @getByIdDirectPrivate(pipeState.writer, "readyPromise").@promise.@then(() => { + if (pipeState.shuttingDown) { + pipeState.pendingReadPromiseCapability.@resolve.@call(@undefined, false); + return; + } + + @readableStreamDefaultReaderRead(pipeState.reader).@then((result) => { + const canWrite = !result.done && @getByIdDirectPrivate(pipeState.writer, "stream") !== @undefined; + pipeState.pendingReadPromiseCapability.@resolve.@call(@undefined, canWrite); + if (!canWrite) + return; + + pipeState.pendingWritePromise = @writableStreamDefaultWriterWrite(pipeState.writer, result.value); + }, (e) => { + pipeState.pendingReadPromiseCapability.@resolve.@call(@undefined, false); + }); + }, (e) => { + pipeState.pendingReadPromiseCapability.@resolve.@call(@undefined, false); + }); + return pipeState.pendingReadPromiseCapability.@promise; +} + +function pipeToErrorsMustBePropagatedForward(pipeState) +{ + "use strict"; + + const action = () => { + pipeState.pendingReadPromiseCapability.@resolve.@call(@undefined, false); + const error = @getByIdDirectPrivate(pipeState.source, "storedError"); + if (!pipeState.preventAbort) { + @pipeToShutdownWithAction(pipeState, () => @writableStreamAbort(pipeState.destination, error), error); + return; + } + @pipeToShutdown(pipeState, error); + }; + + if (@getByIdDirectPrivate(pipeState.source, "state") === @streamErrored) { + action(); + return; + } + + @getByIdDirectPrivate(pipeState.reader, "closedPromiseCapability").@promise.@then(@undefined, action); +} + +function pipeToErrorsMustBePropagatedBackward(pipeState) +{ + "use strict"; + const action = () => { + const error = @getByIdDirectPrivate(pipeState.destination, "storedError"); + if (!pipeState.preventCancel) { + @pipeToShutdownWithAction(pipeState, () => @readableStreamCancel(pipeState.source, error), error); + return; + } + @pipeToShutdown(pipeState, error); + }; + if (@getByIdDirectPrivate(pipeState.destination, "state") === "errored") { + action(); + return; + } + @getByIdDirectPrivate(pipeState.writer, "closedPromise").@promise.@then(@undefined, action); +} + +function pipeToClosingMustBePropagatedForward(pipeState) +{ + "use strict"; + const action = () => { + pipeState.pendingReadPromiseCapability.@resolve.@call(@undefined, false); + const error = @getByIdDirectPrivate(pipeState.source, "storedError"); + if (!pipeState.preventClose) { + @pipeToShutdownWithAction(pipeState, () => @writableStreamDefaultWriterCloseWithErrorPropagation(pipeState.writer)); + return; + } + @pipeToShutdown(pipeState); + }; + if (@getByIdDirectPrivate(pipeState.source, "state") === @streamClosed) { + action(); + return; + } + @getByIdDirectPrivate(pipeState.reader, "closedPromiseCapability").@promise.@then(action, @undefined); +} + +function pipeToClosingMustBePropagatedBackward(pipeState) +{ + "use strict"; + if (!@writableStreamCloseQueuedOrInFlight(pipeState.destination) && @getByIdDirectPrivate(pipeState.destination, "state") !== "closed") + return; + + // @assert no chunks have been read/written + + const error = @makeTypeError("closing is propagated backward"); + if (!pipeState.preventCancel) { + @pipeToShutdownWithAction(pipeState, () => @readableStreamCancel(pipeState.source, error), error); + return; + } + @pipeToShutdown(pipeState, error); +} + +function pipeToShutdownWithAction(pipeState, action) +{ + "use strict"; + + if (pipeState.shuttingDown) + return; + + pipeState.shuttingDown = true; + + const hasError = arguments.length > 2; + const error = arguments[2]; + const finalize = () => { + const promise = action(); + promise.@then(() => { + if (hasError) + @pipeToFinalize(pipeState, error); + else + @pipeToFinalize(pipeState); + }, (e) => { + @pipeToFinalize(pipeState, e); + }); + }; + + if (@getByIdDirectPrivate(pipeState.destination, "state") === "writable" && !@writableStreamCloseQueuedOrInFlight(pipeState.destination)) { + pipeState.pendingReadPromiseCapability.@promise.@then(() => { + pipeState.pendingWritePromise.@then(finalize, finalize); + }, (e) => @pipeToFinalize(pipeState, e)); + return; + } + + finalize(); +} + +function pipeToShutdown(pipeState) +{ + "use strict"; + + if (pipeState.shuttingDown) + return; + + pipeState.shuttingDown = true; + + const hasError = arguments.length > 1; + const error = arguments[1]; + const finalize = () => { + if (hasError) + @pipeToFinalize(pipeState, error); + else + @pipeToFinalize(pipeState); + }; + + if (@getByIdDirectPrivate(pipeState.destination, "state") === "writable" && !@writableStreamCloseQueuedOrInFlight(pipeState.destination)) { + pipeState.pendingReadPromiseCapability.@promise.@then(() => { + pipeState.pendingWritePromise.@then(finalize, finalize); + }, (e) => @pipeToFinalize(pipeState, e)); + return; + } + finalize(); +} + +function pipeToFinalize(pipeState) +{ + "use strict"; + + @writableStreamDefaultWriterRelease(pipeState.writer); + @readableStreamReaderGenericRelease(pipeState.reader); + + // Instead of removing the abort algorithm as per spec, we make it a no-op which is equivalent. + pipeState.finalized = true; + + if (arguments.length > 1) + pipeState.promiseCapability.@reject.@call(@undefined, arguments[1]); + else + pipeState.promiseCapability.@resolve.@call(); +} + +function readableStreamTee(stream, shouldClone) +{ + "use strict"; + + @assert(@isReadableStream(stream)); + @assert(typeof(shouldClone) === "boolean"); + + const reader = new @ReadableStreamDefaultReader(stream); + + const teeState = { + closedOrErrored: false, + canceled1: false, + canceled2: false, + reason1: @undefined, + reason2: @undefined, + }; + + teeState.cancelPromiseCapability = @newPromiseCapability(@Promise); + + const pullFunction = @readableStreamTeePullFunction(teeState, reader, shouldClone); + + const branch1Source = { }; + @putByIdDirectPrivate(branch1Source, "pull", pullFunction); + @putByIdDirectPrivate(branch1Source, "cancel", @readableStreamTeeBranch1CancelFunction(teeState, stream)); + + const branch2Source = { }; + @putByIdDirectPrivate(branch2Source, "pull", pullFunction); + @putByIdDirectPrivate(branch2Source, "cancel", @readableStreamTeeBranch2CancelFunction(teeState, stream)); + + const branch1 = new @ReadableStream(branch1Source); + const branch2 = new @ReadableStream(branch2Source); + + @getByIdDirectPrivate(reader, "closedPromiseCapability").@promise.@then(@undefined, function(e) { + if (teeState.closedOrErrored) + return; + @readableStreamDefaultControllerError(branch1.@readableStreamController, e); + @readableStreamDefaultControllerError(branch2.@readableStreamController, e); + teeState.closedOrErrored = true; + if (!teeState.canceled1 || !teeState.canceled2) + teeState.cancelPromiseCapability.@resolve.@call(); + }); + + // Additional fields compared to the spec, as they are needed within pull/cancel functions. + teeState.branch1 = branch1; + teeState.branch2 = branch2; + + return [branch1, branch2]; +} + +function readableStreamTeePullFunction(teeState, reader, shouldClone) +{ + "use strict"; + + return function() { + @Promise.prototype.@then.@call(@readableStreamDefaultReaderRead(reader), function(result) { + @assert(@isObject(result)); + @assert(typeof result.done === "boolean"); + if (result.done && !teeState.closedOrErrored) { + if (!teeState.canceled1) + @readableStreamDefaultControllerClose(teeState.branch1.@readableStreamController); + if (!teeState.canceled2) + @readableStreamDefaultControllerClose(teeState.branch2.@readableStreamController); + teeState.closedOrErrored = true; + if (!teeState.canceled1 || !teeState.canceled2) + teeState.cancelPromiseCapability.@resolve.@call(); + } + if (teeState.closedOrErrored) + return; + if (!teeState.canceled1) + @readableStreamDefaultControllerEnqueue(teeState.branch1.@readableStreamController, result.value); + if (!teeState.canceled2) + @readableStreamDefaultControllerEnqueue(teeState.branch2.@readableStreamController, shouldClone ? @structuredCloneForStream(result.value) : result.value); + }); + } +} + +function readableStreamTeeBranch1CancelFunction(teeState, stream) +{ + "use strict"; + + return function(r) { + teeState.canceled1 = true; + teeState.reason1 = r; + if (teeState.canceled2) { + @readableStreamCancel(stream, [teeState.reason1, teeState.reason2]).@then( + teeState.cancelPromiseCapability.@resolve, + teeState.cancelPromiseCapability.@reject); + } + return teeState.cancelPromiseCapability.@promise; + } +} + +function readableStreamTeeBranch2CancelFunction(teeState, stream) +{ + "use strict"; + + return function(r) { + teeState.canceled2 = true; + teeState.reason2 = r; + if (teeState.canceled1) { + @readableStreamCancel(stream, [teeState.reason1, teeState.reason2]).@then( + teeState.cancelPromiseCapability.@resolve, + teeState.cancelPromiseCapability.@reject); + } + return teeState.cancelPromiseCapability.@promise; + } +} + +function isReadableStream(stream) +{ + "use strict"; + + // Spec tells to return true only if stream has a readableStreamController internal slot. + // However, since it is a private slot, it cannot be checked using hasOwnProperty(). + // Therefore, readableStreamController is initialized with null value. + return @isObject(stream) && @getByIdDirectPrivate(stream, "readableStreamController") !== @undefined; +} + +function isReadableStreamDefaultReader(reader) +{ + "use strict"; + + // Spec tells to return true only if reader has a readRequests internal slot. + // However, since it is a private slot, it cannot be checked using hasOwnProperty(). + // Since readRequests is initialized with an empty array, the following test is ok. + return @isObject(reader) && !!@getByIdDirectPrivate(reader, "readRequests"); +} + +function isReadableStreamDefaultController(controller) +{ + "use strict"; + + // Spec tells to return true only if controller has an underlyingSource internal slot. + // However, since it is a private slot, it cannot be checked using hasOwnProperty(). + // underlyingSource is obtained in ReadableStream constructor: if undefined, it is set + // to an empty object. Therefore, following test is ok. + return @isObject(controller) && !!@getByIdDirectPrivate(controller, "underlyingSource"); +} + + +@globalPrivate +function assignDirectStream() { + "use strict"; + + var stream = this; +} + + +function handleDirectStreamError(e) { + "use strict"; + + var controller = this; + var sink = controller.@sink; + if (sink) { + @putByIdDirectPrivate(controller, "sink", @undefined); + try { + sink.close(e); + } catch (f) {} + } + + this.error = this.drain = this.write = this.close = this.end = @onReadableStreamDirectControllerClosed; + + if (typeof this.@underlyingSource.close === 'function') { + try { + this.@underlyingSource.close.@call(this.@underlyingSource, e); + } catch (e) { + } + } + + try { + var pend = controller._pendingRead; + if (pend) { + controller._pendingRead = @undefined; + @rejectPromise(pend, e); + } + } catch (f) {} + var stream = controller.@controlledReadableStream; + if (stream) @readableStreamError(stream, e); +} + +function handleDirectStreamErrorReject(e) { + @handleDirectStreamError.@call(this, e); + return @Promise.@reject(e); +} + +function onPullDirectStream(controller) +{ + + "use strict"; + + var stream = controller.@controlledReadableStream; + if (!stream || @getByIdDirectPrivate(stream, "state") !== @streamReadable) + return; + + // pull is in progress + // this is a recursive call + // ignore it + if (controller._deferClose === -1) { + return; + } + + + controller._deferClose = -1; + controller._deferDrain = -1; + var deferClose; + var deferDrain; + + // Direct streams allow @pull to be called multiple times, unlike the spec. + // Backpressure is handled by the destination, not by the underlying source. + // In this case, we rely on the heuristic that repeatedly draining in the same tick + // is bad for performance + // this code is only run when consuming a direct stream from JS + // without the HTTP server or anything else + try { + var result = controller.@underlyingSource.pull( + controller, + ); + + if (result && @isPromise(result)) { + if (controller._handleError === @undefined) { + controller._handleError = @handleDirectStreamErrorReject.bind(controller); + } + + @Promise.prototype.catch.@call(result, controller._handleError); + } + } catch(e) { + return @handleDirectStreamErrorReject.@call(controller, e); + } finally { + deferClose = controller._deferClose; + deferDrain = controller._deferDrain; + controller._deferDrain = controller._deferClose = 0; + } + + + var promiseToReturn; + + + if (controller._pendingRead === @undefined) { + controller._pendingRead = promiseToReturn = @newPromise(); + } else { + promiseToReturn = @readableStreamAddReadRequest(stream); + } + + + // they called close during @pull() + // we delay that + if (deferClose === 1) { + var reason = controller._deferCloseReason; + controller._deferCloseReason = @undefined; + @onCloseDirectStream.@call(controller, reason); + return promiseToReturn; + } + + // not done, but they called drain() + if (deferDrain === 1) { + @onDrainDirectStream.@call(controller); + } + + + return promiseToReturn; +} + +function noopDoneFunction() { + return @Promise.@resolve({value: @undefined, done: true}); +} + +function onReadableStreamDirectControllerClosed(reason) +{ + "use strict"; + @throwTypeError("ReadableStreamDirectController is now closed"); +} + +function onCloseDirectStream(reason) +{ + "use strict"; + var stream = this.@controlledReadableStream; + if (!stream || @getByIdDirectPrivate(stream, "state") !== @streamReadable) + return; + + if (this._deferClose !== 0) { + this._deferClose = 1; + this._deferCloseReason = reason; + return; + } + + @putByIdDirectPrivate(stream, "state", @streamClosing); + if (typeof this.@underlyingSource.close === 'function') { + try { + this.@underlyingSource.close.@call(this.@underlyingSource, reason); + } catch (e) { + + } + } + + var drained; + try { + drained = this.@sink.end(); + @putByIdDirectPrivate(this, "sink", @undefined); + } catch (e) { + if (this._pendingRead) { + var read = this._pendingRead; + this._pendingRead = @undefined; + @rejectPromise(read, e); + } + @readableStreamError(stream, e); + return; + } + + this.error = this.drain = this.write = this.close = this.end = @onReadableStreamDirectControllerClosed; + + var reader = @getByIdDirectPrivate(stream, "reader"); + + if (reader && @isReadableStreamDefaultReader(reader)) { + var _pendingRead = this._pendingRead; + if (_pendingRead && @isPromise(_pendingRead) && drained?.byteLength) { + this._pendingRead = @undefined; + @fulfillPromise(_pendingRead, {value: drained, done: false}); + @readableStreamClose(stream); + return; + } + } + + if (drained?.byteLength) { + var requests = @getByIdDirectPrivate(reader, "readRequests"); + if (requests?.isNotEmpty()) { + @readableStreamFulfillReadRequest(stream, drained, false); + @readableStreamClose(stream); + return; + } + + @putByIdDirectPrivate(stream, "state", @streamReadable); + this.@pull = () => { + var thisResult = @createFulfilledPromise({value: drained, done: false}); + drained = @undefined; + @readableStreamClose(stream); + stream = @undefined; + return thisResult; + }; + } else if (this._pendingRead) { + var read = this._pendingRead; + this._pendingRead = @undefined; + @putByIdDirectPrivate(this, "pull", @noopDoneFunction); + @fulfillPromise(read, {value: @undefined, done: true}); + } + + @readableStreamClose(stream); +} + +function onDrainDirectStream() +{ + "use strict"; + + var stream = this.@controlledReadableStream; + var reader = @getByIdDirectPrivate(stream, "reader"); + if (!reader || !@isReadableStreamDefaultReader(reader)) { + return; + } + + var _pendingRead = this._pendingRead; + this._pendingRead = @undefined; + if (_pendingRead && @isPromise(_pendingRead)) { + var drained = this.@sink.drain(); + if (drained?.byteLength) { + this._pendingRead = @getByIdDirectPrivate(stream, "readRequests")?.shift(); + @fulfillPromise(_pendingRead, {value: drained, done: false}); + } else { + this._pendingRead = _pendingRead; + } + } else if (@getByIdDirectPrivate(stream, "readRequests")?.isNotEmpty()) { + var drained = this.@sink.drain(); + if (drained?.byteLength) { + @readableStreamFulfillReadRequest(stream, drained, false); + } + } else if (this._deferDrain === -1) { + this._deferDrain = 1; + } + +} + +function initializeArrayBufferStream(underlyingSource, highWaterMark) +{ + "use strict"; + + // This is the fallback implementation for direct streams + // When we don't know what the destination type is + // We assume it is a Uint8Array. + + var opts = highWaterMark ? {highWaterMark, stream: true, asUint8Array: true} : {stream: true, asUint8Array: true}; + var sink = new globalThis.Bun.ArrayBufferSink(); + sink.start(opts); + + var controller = { + @underlyingSource: underlyingSource, + @pull: @onPullDirectStream, + @controlledReadableStream: this, + @sink: sink, + close: @onCloseDirectStream, + write: sink.write.bind(sink), + error: @handleDirectStreamError, + end: @onCloseDirectStream, + @close: @onCloseDirectStream, + drain: @onDrainDirectStream, + _pendingRead: @undefined, + _deferClose: 0, + _deferDrain: 0, + _deferCloseReason: @undefined, + _handleError: @undefined, + }; + + + @putByIdDirectPrivate(this, "readableStreamController", controller); + +} + +function readableStreamError(stream, error) +{ + "use strict"; + + @assert(@isReadableStream(stream)); + @assert(@getByIdDirectPrivate(stream, "state") === @streamReadable); + @putByIdDirectPrivate(stream, "state", @streamErrored); + @putByIdDirectPrivate(stream, "storedError", error); + + const reader = @getByIdDirectPrivate(stream, "reader"); + + if (!reader) + return; + + if (@isReadableStreamDefaultReader(reader)) { + const requests = @getByIdDirectPrivate(reader, "readRequests"); + @putByIdDirectPrivate(reader, "readRequests", @createFIFO()); + for (var request = requests.shift(); request; request = requests.shift()) + @rejectPromise(request, error); + } else { + @assert(@isReadableStreamBYOBReader(reader)); + const requests = @getByIdDirectPrivate(reader, "readIntoRequests"); + @putByIdDirectPrivate(reader, "readIntoRequests", @createFIFO()); + for (var request = requests.shift(); request; request = requests.shift()) + @rejectPromise(request, error); + } + + @getByIdDirectPrivate(reader, "closedPromiseCapability").@reject.@call(@undefined, error); + const promise = @getByIdDirectPrivate(reader, "closedPromiseCapability").@promise; + @markPromiseAsHandled(promise); +} + +function readableStreamDefaultControllerShouldCallPull(controller) +{ + "use strict"; + + const stream = @getByIdDirectPrivate(controller, "controlledReadableStream"); + + if (!@readableStreamDefaultControllerCanCloseOrEnqueue(controller)) + return false; + if (!(@getByIdDirectPrivate(controller, "started") === 1)) + return false; + if ((!@isReadableStreamLocked(stream) || !@getByIdDirectPrivate(@getByIdDirectPrivate(stream, "reader"), "readRequests")?.isNotEmpty()) && @readableStreamDefaultControllerGetDesiredSize(controller) <= 0) + return false; + const desiredSize = @readableStreamDefaultControllerGetDesiredSize(controller); + @assert(desiredSize !== null); + return desiredSize > 0; +} + +function readableStreamDefaultControllerCallPullIfNeeded(controller) +{ + "use strict"; + + // FIXME: use @readableStreamDefaultControllerShouldCallPull + const stream = @getByIdDirectPrivate(controller, "controlledReadableStream"); + + if (!@readableStreamDefaultControllerCanCloseOrEnqueue(controller)) + return; + if (!(@getByIdDirectPrivate(controller, "started") === 1)) + return; + if ((!@isReadableStreamLocked(stream) || !@getByIdDirectPrivate(@getByIdDirectPrivate(stream, "reader"), "readRequests")?.isNotEmpty()) && @readableStreamDefaultControllerGetDesiredSize(controller) <= 0) + return; + + if (@getByIdDirectPrivate(controller, "pulling")) { + @putByIdDirectPrivate(controller, "pullAgain", true); + return; + } + + + @assert(!@getByIdDirectPrivate(controller, "pullAgain")); + @putByIdDirectPrivate(controller, "pulling", true); + + @getByIdDirectPrivate(controller, "pullAlgorithm").@call(@undefined).@then(function() { + @putByIdDirectPrivate(controller, "pulling", false); + if (@getByIdDirectPrivate(controller, "pullAgain")) { + @putByIdDirectPrivate(controller, "pullAgain", false); + + @readableStreamDefaultControllerCallPullIfNeeded(controller); + } + }, function(error) { + @readableStreamDefaultControllerError(controller, error); + }); +} + +function isReadableStreamLocked(stream) +{ + "use strict"; + + @assert(@isReadableStream(stream)); + return !!@getByIdDirectPrivate(stream, "reader"); +} + +function readableStreamDefaultControllerGetDesiredSize(controller) +{ + "use strict"; + + const stream = @getByIdDirectPrivate(controller, "controlledReadableStream"); + const state = @getByIdDirectPrivate(stream, "state"); + + if (state === @streamErrored) + return null; + if (state === @streamClosed) + return 0; + + return @getByIdDirectPrivate(controller, "strategy").highWaterMark - @getByIdDirectPrivate(controller, "queue").size; +} + + +function readableStreamReaderGenericCancel(reader, reason) +{ + "use strict"; + + const stream = @getByIdDirectPrivate(reader, "ownerReadableStream"); + @assert(!!stream); + return @readableStreamCancel(stream, reason); +} + +function readableStreamCancel(stream, reason) +{ + "use strict"; + + @putByIdDirectPrivate(stream, "disturbed", true); + const state = @getByIdDirectPrivate(stream, "state"); + if (state === @streamClosed) + return @Promise.@resolve(); + if (state === @streamErrored) + return @Promise.@reject(@getByIdDirectPrivate(stream, "storedError")); + @readableStreamClose(stream); + + var controller = @getByIdDirectPrivate(stream, "readableStreamController"); + return controller.@cancel(controller, reason).@then(function() { }); +} + +function readableStreamDefaultControllerCancel(controller, reason) +{ + "use strict"; + + @putByIdDirectPrivate(controller, "queue", @newQueue()); + return @getByIdDirectPrivate(controller, "cancelAlgorithm").@call(@undefined, reason); +} + +function readableStreamDefaultControllerPull(controller) +{ + "use strict"; + + var queue = @getByIdDirectPrivate(controller, "queue"); + if (queue.content.isNotEmpty()) { + const chunk = @dequeueValue(queue); + if (@getByIdDirectPrivate(controller, "closeRequested") && queue.content.isEmpty()) + @readableStreamClose(@getByIdDirectPrivate(controller, "controlledReadableStream")); + else + @readableStreamDefaultControllerCallPullIfNeeded(controller); + + return @createFulfilledPromise({ value: chunk, done: false }); + } + const pendingPromise = @readableStreamAddReadRequest(@getByIdDirectPrivate(controller, "controlledReadableStream")); + @readableStreamDefaultControllerCallPullIfNeeded(controller); + return pendingPromise; +} + +function readableStreamDefaultControllerClose(controller) +{ + "use strict"; + + @assert(@readableStreamDefaultControllerCanCloseOrEnqueue(controller)); + @putByIdDirectPrivate(controller, "closeRequested", true); + if (@getByIdDirectPrivate(controller, "queue")?.content?.isEmpty()) + @readableStreamClose(@getByIdDirectPrivate(controller, "controlledReadableStream")); +} + +function readableStreamClose(stream) +{ + "use strict"; + + @assert(@getByIdDirectPrivate(stream, "state") === @streamReadable); + @putByIdDirectPrivate(stream, "state", @streamClosed); + if (!@getByIdDirectPrivate(stream, "reader")) + return; + + if (@isReadableStreamDefaultReader(@getByIdDirectPrivate(stream, "reader"))) { + const requests = @getByIdDirectPrivate(@getByIdDirectPrivate(stream, "reader"), "readRequests"); + if (requests.isNotEmpty()) { + @putByIdDirectPrivate(@getByIdDirectPrivate(stream, "reader"), "readRequests", @createFIFO()); + + for (var request = requests.shift(); request; request = requests.shift()) + @fulfillPromise(request, { value: @undefined, done: true }); + } + } + + @getByIdDirectPrivate(@getByIdDirectPrivate(stream, "reader"), "closedPromiseCapability").@resolve.@call(); +} + +function readableStreamFulfillReadRequest(stream, chunk, done) +{ + "use strict"; + const readRequest = @getByIdDirectPrivate(@getByIdDirectPrivate(stream, "reader"), "readRequests").shift(); + @fulfillPromise(readRequest, { value: chunk, done: done }); +} + +function readableStreamDefaultControllerEnqueue(controller, chunk) +{ + "use strict"; + + const stream = @getByIdDirectPrivate(controller, "controlledReadableStream"); + // this is checked by callers + @assert(@readableStreamDefaultControllerCanCloseOrEnqueue(controller)); + + if (@isReadableStreamLocked(stream) && @getByIdDirectPrivate(@getByIdDirectPrivate(stream, "reader"), "readRequests")?.isNotEmpty()) { + @readableStreamFulfillReadRequest(stream, chunk, false); + @readableStreamDefaultControllerCallPullIfNeeded(controller); + return; + } + + try { + let chunkSize = 1; + if (@getByIdDirectPrivate(controller, "strategy").size !== @undefined) + chunkSize = @getByIdDirectPrivate(controller, "strategy").size(chunk); + @enqueueValueWithSize(@getByIdDirectPrivate(controller, "queue"), chunk, chunkSize); + } + catch(error) { + @readableStreamDefaultControllerError(controller, error); + throw error; + } + @readableStreamDefaultControllerCallPullIfNeeded(controller); +} + +function readableStreamDefaultReaderRead(reader) +{ + "use strict"; + + const stream = @getByIdDirectPrivate(reader, "ownerReadableStream"); + @assert(!!stream); + const state = @getByIdDirectPrivate(stream, "state"); + + @putByIdDirectPrivate(stream, "disturbed", true); + if (state === @streamClosed) + return @createFulfilledPromise({ value: @undefined, done: true }); + if (state === @streamErrored) + return @Promise.@reject(@getByIdDirectPrivate(stream, "storedError")); + @assert(state === @streamReadable); + + return @getByIdDirectPrivate(stream, "readableStreamController").@pull(@getByIdDirectPrivate(stream, "readableStreamController")); +} + +function readableStreamAddReadRequest(stream) +{ + "use strict"; + + @assert(@isReadableStreamDefaultReader(@getByIdDirectPrivate(stream, "reader"))); + @assert(@getByIdDirectPrivate(stream, "state") == @streamReadable); + + const readRequest = @newPromise(); + + @getByIdDirectPrivate(@getByIdDirectPrivate(stream, "reader"), "readRequests").push(readRequest); + + return readRequest; +} + +function isReadableStreamDisturbed(stream) +{ + "use strict"; + + @assert(@isReadableStream(stream)); + return @getByIdDirectPrivate(stream, "disturbed"); +} + +function readableStreamReaderGenericRelease(reader) +{ + "use strict"; + + @assert(!!@getByIdDirectPrivate(reader, "ownerReadableStream")); + @assert(@getByIdDirectPrivate(@getByIdDirectPrivate(reader, "ownerReadableStream"), "reader") === reader); + + if (@getByIdDirectPrivate(@getByIdDirectPrivate(reader, "ownerReadableStream"), "state") === @streamReadable) + @getByIdDirectPrivate(reader, "closedPromiseCapability").@reject.@call(@undefined, @makeTypeError("releasing lock of reader whose stream is still in readable state")); + else + @putByIdDirectPrivate(reader, "closedPromiseCapability", { @promise: @newHandledRejectedPromise(@makeTypeError("reader released lock")) }); + + const promise = @getByIdDirectPrivate(reader, "closedPromiseCapability").@promise; + @markPromiseAsHandled(promise); + @putByIdDirectPrivate(@getByIdDirectPrivate(reader, "ownerReadableStream"), "reader", @undefined); + @putByIdDirectPrivate(reader, "ownerReadableStream", @undefined); +} + +function readableStreamDefaultControllerCanCloseOrEnqueue(controller) +{ + "use strict"; + + return !@getByIdDirectPrivate(controller, "closeRequested") && @getByIdDirectPrivate(@getByIdDirectPrivate(controller, "controlledReadableStream"), "state") === @streamReadable; +} + + +function lazyLoadStream(stream, autoAllocateChunkSize) { + "use strict"; + + var nativeType = @getByIdDirectPrivate(stream, "bunNativeType"); + var nativePtr = @getByIdDirectPrivate(stream, "bunNativePtr"); + var cached = @lazyStreamPrototypeMap; + var Prototype = cached.@get(nativeType); + if (Prototype === @undefined) { + var [pull, start, cancel, setClose, deinit] = @lazyLoad(nativeType); + var closer = [false]; + var handleResult; + function handleNativeReadableStreamPromiseResult(val) { + "use strict"; + var {c, v} = this; + this.c = @undefined; + this.v = @undefined; + handleResult(val, c, v); + } + + handleResult = function handleResult(result, controller, view) { + "use strict"; + + if (result && @isPromise(result)) { + return result.then(handleNativeReadableStreamPromiseResult.bind({c: controller, v: view}), (err) => controller.error(err)); + } else if (result !== false) { + if (view && view.byteLength === result) { + controller.byobRequest.respondWithNewView(view); + } else { + controller.byobRequest.respond(result); + } + } + + if (closer[0] || result === false) { + @enqueueJob(() => controller.close()); + closer[0] = false; + } + }; + + Prototype = class NativeReadableStreamSource { + constructor(tag, autoAllocateChunkSize) { + this.pull = this.pull_.bind(tag); + this.cancel = this.cancel_.bind(tag); + this.autoAllocateChunkSize = autoAllocateChunkSize; + } + + pull; + cancel; + + type = "bytes"; + autoAllocateChunkSize = 0; + + static startSync = start; + + pull_(controller) { + closer[0] = false; + var result; + + const view = controller.byobRequest.view; + try { + result = pull(this, view, closer); + } catch(err) { + return controller.error(err); + } + + return handleResult(result, controller, view); + } + + cancel_(reason) { + cancel(this, reason); + } + static deinit = deinit; + static registry = new FinalizationRegistry(deinit); + } + cached.@set(nativeType, Prototype); + } + + const chunkSize = Prototype.startSync(nativePtr, autoAllocateChunkSize); + + // empty file, no need for native back-and-forth on this + if (chunkSize === 0) { + @readableStreamClose(stream); + return null; + } + var instance = new Prototype(nativePtr, chunkSize); + Prototype.registry.register(instance, nativePtr); + return instance; +}
\ No newline at end of file |