diff options
Diffstat (limited to 'src/javascript/jsc/builtins/js/ReadableStreamInternals.js')
-rw-r--r-- | src/javascript/jsc/builtins/js/ReadableStreamInternals.js | 1255 |
1 files changed, 0 insertions, 1255 deletions
diff --git a/src/javascript/jsc/builtins/js/ReadableStreamInternals.js b/src/javascript/jsc/builtins/js/ReadableStreamInternals.js deleted file mode 100644 index 3e6590f31..000000000 --- a/src/javascript/jsc/builtins/js/ReadableStreamInternals.js +++ /dev/null @@ -1,1255 +0,0 @@ -/* - * Copyright (C) 2015 Canon Inc. All rights reserved. - * Copyright (C) 2015 Igalia. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions - * are met: - * 1. Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * 2. Redistributions in binary form must reproduce the above copyright - * notice, this list of conditions and the following disclaimer in the - * documentation and/or other materials provided with the distribution. - * - * THIS SOFTWARE IS PROVIDED BY APPLE INC. ``AS IS'' AND ANY - * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE - * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR - * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL APPLE INC. OR - * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, - * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, - * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR - * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY - * OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - */ - -// @internal - -function readableStreamReaderGenericInitialize(reader, stream) -{ - "use strict"; - - @putByIdDirectPrivate(reader, "ownerReadableStream", stream); - @putByIdDirectPrivate(stream, "reader", reader); - if (@getByIdDirectPrivate(stream, "state") === @streamReadable) - @putByIdDirectPrivate(reader, "closedPromiseCapability", @newPromiseCapability(@Promise)); - else if (@getByIdDirectPrivate(stream, "state") === @streamClosed) - @putByIdDirectPrivate(reader, "closedPromiseCapability", { @promise: @Promise.@resolve() }); - else { - @assert(@getByIdDirectPrivate(stream, "state") === @streamErrored); - @putByIdDirectPrivate(reader, "closedPromiseCapability", { @promise: @newHandledRejectedPromise(@getByIdDirectPrivate(stream, "storedError")) }); - } -} - -function privateInitializeReadableStreamDefaultController(stream, underlyingSource, size, highWaterMark) -{ - "use strict"; - - if (!@isReadableStream(stream)) - @throwTypeError("ReadableStreamDefaultController needs a ReadableStream"); - - // readableStreamController is initialized with null value. - if (@getByIdDirectPrivate(stream, "readableStreamController") !== null) - @throwTypeError("ReadableStream already has a controller"); - - - - @putByIdDirectPrivate(this, "controlledReadableStream", stream); - @putByIdDirectPrivate(this, "underlyingSource", underlyingSource); - @putByIdDirectPrivate(this, "queue", @newQueue()); - @putByIdDirectPrivate(this, "started", -1); - @putByIdDirectPrivate(this, "closeRequested", false); - @putByIdDirectPrivate(this, "pullAgain", false); - @putByIdDirectPrivate(this, "pulling", false); - @putByIdDirectPrivate(this, "strategy", @validateAndNormalizeQueuingStrategy(size, highWaterMark)); - - return this; -} - -function readableStreamDefaultControllerError(controller, error) -{ - "use strict"; - - const stream = @getByIdDirectPrivate(controller, "controlledReadableStream"); - if (@getByIdDirectPrivate(stream, "state") !== @streamReadable) - return; - @putByIdDirectPrivate(controller, "queue", @newQueue()); - - @readableStreamError(stream, error); -} - -function readableStreamPipeTo(stream, sink) -{ - "use strict"; - @assert(@isReadableStream(stream)); - - const reader = new @ReadableStreamDefaultReader(stream); - - @getByIdDirectPrivate(reader, "closedPromiseCapability").@promise.@then(() => { }, (e) => { sink.error(e); }); - - function doPipe() { - @readableStreamDefaultReaderRead(reader).@then(function(result) { - if (result.done) { - sink.close(); - return; - } - try { - sink.enqueue(result.value); - } catch (e) { - sink.error("ReadableStream chunk enqueueing in the sink failed"); - return; - } - doPipe(); - }, function(e) { - sink.error(e); - }); - } - doPipe(); -} - - - -function acquireReadableStreamDefaultReader(stream) -{ - "use strict"; - var start = @getByIdDirectPrivate(stream, "start"); - if (start) { - start.@call(stream); - } - - return new @ReadableStreamDefaultReader(stream); -} - -// https://streams.spec.whatwg.org/#set-up-readable-stream-default-controller, starting from step 6. -// The other part is implemented in privateInitializeReadableStreamDefaultController. -function setupReadableStreamDefaultController(stream, underlyingSource, size, highWaterMark, startMethod, pullMethod, cancelMethod) -{ - "use strict"; - - const controller = new @ReadableStreamDefaultController(stream, underlyingSource, size, highWaterMark, @isReadableStream); - - const pullAlgorithm = () => @promiseInvokeOrNoopMethod(underlyingSource, pullMethod, [controller]); - const cancelAlgorithm = (reason) => @promiseInvokeOrNoopMethod(underlyingSource, cancelMethod, [reason]); - - @putByIdDirectPrivate(controller, "pullAlgorithm", pullAlgorithm); - @putByIdDirectPrivate(controller, "cancelAlgorithm", cancelAlgorithm); - @putByIdDirectPrivate(controller, "pull", @readableStreamDefaultControllerPull); - @putByIdDirectPrivate(controller, "cancel", @readableStreamDefaultControllerCancel); - @putByIdDirectPrivate(stream, "readableStreamController", controller); - - @readableStreamDefaultControllerStart(controller); - -} - - -function createReadableStreamController(stream, underlyingSource, strategy) { - "use strict"; - - const type = underlyingSource.type; - const typeString = @toString(type); - - if (typeString === "bytes") { - // if (!@readableByteStreamAPIEnabled()) - // @throwTypeError("ReadableByteStreamController is not implemented"); - - if (strategy.highWaterMark === @undefined) - strategy.highWaterMark = 0; - if (strategy.size !== @undefined) - @throwRangeError("Strategy for a ReadableByteStreamController cannot have a size"); - - @putByIdDirectPrivate(stream, "readableStreamController", new @ReadableByteStreamController(stream, underlyingSource, strategy.highWaterMark, @isReadableStream)); - } else if (typeString === "direct") { - var highWaterMark = strategy?.highWaterMark; - @initializeArrayBufferStream.@call(stream, underlyingSource, highWaterMark); - } else if (type === @undefined) { - if (strategy.highWaterMark === @undefined) - strategy.highWaterMark = 1; - - @setupReadableStreamDefaultController(stream, underlyingSource, strategy.size, strategy.highWaterMark, underlyingSource.start, underlyingSource.pull, underlyingSource.cancel); - } else - @throwRangeError("Invalid type for underlying source"); - -} - -function readableStreamDefaultControllerStart(controller) { - "use strict"; - - - - if (@getByIdDirectPrivate(controller, "started") !== -1) - return; - - const underlyingSource = @getByIdDirectPrivate(controller, "underlyingSource"); - const startMethod = underlyingSource.start; - @putByIdDirectPrivate(controller, "started", 0); - - @promiseInvokeOrNoopMethodNoCatch(underlyingSource, startMethod, [controller]).@then(() => { - @putByIdDirectPrivate(controller, "started", 1); - @assert(!@getByIdDirectPrivate(controller, "pulling")); - @assert(!@getByIdDirectPrivate(controller, "pullAgain")); - @readableStreamDefaultControllerCallPullIfNeeded(controller); - }, (error) => { - @readableStreamDefaultControllerError(controller, error); - }); -} - - -// FIXME: Replace readableStreamPipeTo by below function. -// This method implements the latest https://streams.spec.whatwg.org/#readable-stream-pipe-to. -function readableStreamPipeToWritableStream(source, destination, preventClose, preventAbort, preventCancel, signal) -{ - "use strict"; - - @assert(@isReadableStream(source)); - @assert(@isWritableStream(destination)); - @assert(!@isReadableStreamLocked(source)); - @assert(!@isWritableStreamLocked(destination)); - @assert(signal === @undefined || @isAbortSignal(signal)); - - if (@getByIdDirectPrivate(source, "underlyingByteSource") !== @undefined) - return @Promise.@reject("Piping to a readable bytestream is not supported"); - - let pipeState = { source : source, destination : destination, preventAbort : preventAbort, preventCancel : preventCancel, preventClose : preventClose, signal : signal }; - - pipeState.reader = @acquireReadableStreamDefaultReader(source); - pipeState.writer = @acquireWritableStreamDefaultWriter(destination); - - @putByIdDirectPrivate(source, "disturbed", true); - - pipeState.finalized = false; - pipeState.shuttingDown = false; - pipeState.promiseCapability = @newPromiseCapability(@Promise); - pipeState.pendingReadPromiseCapability = @newPromiseCapability(@Promise); - pipeState.pendingReadPromiseCapability.@resolve.@call(); - pipeState.pendingWritePromise = @Promise.@resolve(); - - if (signal !== @undefined) { - const algorithm = () => { - if (pipeState.finalized) - return; - - const error = @makeDOMException("AbortError", "abort pipeTo from signal"); - - @pipeToShutdownWithAction(pipeState, () => { - const shouldAbortDestination = !pipeState.preventAbort && @getByIdDirectPrivate(pipeState.destination, "state") === "writable"; - const promiseDestination = shouldAbortDestination ? @writableStreamAbort(pipeState.destination, error) : @Promise.@resolve(); - - const shouldAbortSource = !pipeState.preventCancel && @getByIdDirectPrivate(pipeState.source, "state") === @streamReadable; - const promiseSource = shouldAbortSource ? @readableStreamCancel(pipeState.source, error) : @Promise.@resolve(); - - let promiseCapability = @newPromiseCapability(@Promise); - let shouldWait = true; - let handleResolvedPromise = () => { - if (shouldWait) { - shouldWait = false; - return; - } - promiseCapability.@resolve.@call(); - } - let handleRejectedPromise = (e) => { - promiseCapability.@reject.@call(@undefined, e); - } - promiseDestination.@then(handleResolvedPromise, handleRejectedPromise); - promiseSource.@then(handleResolvedPromise, handleRejectedPromise); - return promiseCapability.@promise; - }, error); - }; - if (@whenSignalAborted(signal, algorithm)) - return pipeState.promiseCapability.@promise; - } - - @pipeToErrorsMustBePropagatedForward(pipeState); - @pipeToErrorsMustBePropagatedBackward(pipeState); - @pipeToClosingMustBePropagatedForward(pipeState); - @pipeToClosingMustBePropagatedBackward(pipeState); - - @pipeToLoop(pipeState); - - return pipeState.promiseCapability.@promise; -} - -function pipeToLoop(pipeState) -{ - "use strict"; - if (pipeState.shuttingDown) - return; - - @pipeToDoReadWrite(pipeState).@then((result) => { - if (result) - @pipeToLoop(pipeState); - }); -} - -function pipeToDoReadWrite(pipeState) -{ - "use strict"; - @assert(!pipeState.shuttingDown); - - pipeState.pendingReadPromiseCapability = @newPromiseCapability(@Promise); - @getByIdDirectPrivate(pipeState.writer, "readyPromise").@promise.@then(() => { - if (pipeState.shuttingDown) { - pipeState.pendingReadPromiseCapability.@resolve.@call(@undefined, false); - return; - } - - @readableStreamDefaultReaderRead(pipeState.reader).@then((result) => { - const canWrite = !result.done && @getByIdDirectPrivate(pipeState.writer, "stream") !== @undefined; - pipeState.pendingReadPromiseCapability.@resolve.@call(@undefined, canWrite); - if (!canWrite) - return; - - pipeState.pendingWritePromise = @writableStreamDefaultWriterWrite(pipeState.writer, result.value); - }, (e) => { - pipeState.pendingReadPromiseCapability.@resolve.@call(@undefined, false); - }); - }, (e) => { - pipeState.pendingReadPromiseCapability.@resolve.@call(@undefined, false); - }); - return pipeState.pendingReadPromiseCapability.@promise; -} - -function pipeToErrorsMustBePropagatedForward(pipeState) -{ - "use strict"; - - const action = () => { - pipeState.pendingReadPromiseCapability.@resolve.@call(@undefined, false); - const error = @getByIdDirectPrivate(pipeState.source, "storedError"); - if (!pipeState.preventAbort) { - @pipeToShutdownWithAction(pipeState, () => @writableStreamAbort(pipeState.destination, error), error); - return; - } - @pipeToShutdown(pipeState, error); - }; - - if (@getByIdDirectPrivate(pipeState.source, "state") === @streamErrored) { - action(); - return; - } - - @getByIdDirectPrivate(pipeState.reader, "closedPromiseCapability").@promise.@then(@undefined, action); -} - -function pipeToErrorsMustBePropagatedBackward(pipeState) -{ - "use strict"; - const action = () => { - const error = @getByIdDirectPrivate(pipeState.destination, "storedError"); - if (!pipeState.preventCancel) { - @pipeToShutdownWithAction(pipeState, () => @readableStreamCancel(pipeState.source, error), error); - return; - } - @pipeToShutdown(pipeState, error); - }; - if (@getByIdDirectPrivate(pipeState.destination, "state") === "errored") { - action(); - return; - } - @getByIdDirectPrivate(pipeState.writer, "closedPromise").@promise.@then(@undefined, action); -} - -function pipeToClosingMustBePropagatedForward(pipeState) -{ - "use strict"; - const action = () => { - pipeState.pendingReadPromiseCapability.@resolve.@call(@undefined, false); - const error = @getByIdDirectPrivate(pipeState.source, "storedError"); - if (!pipeState.preventClose) { - @pipeToShutdownWithAction(pipeState, () => @writableStreamDefaultWriterCloseWithErrorPropagation(pipeState.writer)); - return; - } - @pipeToShutdown(pipeState); - }; - if (@getByIdDirectPrivate(pipeState.source, "state") === @streamClosed) { - action(); - return; - } - @getByIdDirectPrivate(pipeState.reader, "closedPromiseCapability").@promise.@then(action, @undefined); -} - -function pipeToClosingMustBePropagatedBackward(pipeState) -{ - "use strict"; - if (!@writableStreamCloseQueuedOrInFlight(pipeState.destination) && @getByIdDirectPrivate(pipeState.destination, "state") !== "closed") - return; - - // @assert no chunks have been read/written - - const error = @makeTypeError("closing is propagated backward"); - if (!pipeState.preventCancel) { - @pipeToShutdownWithAction(pipeState, () => @readableStreamCancel(pipeState.source, error), error); - return; - } - @pipeToShutdown(pipeState, error); -} - -function pipeToShutdownWithAction(pipeState, action) -{ - "use strict"; - - if (pipeState.shuttingDown) - return; - - pipeState.shuttingDown = true; - - const hasError = arguments.length > 2; - const error = arguments[2]; - const finalize = () => { - const promise = action(); - promise.@then(() => { - if (hasError) - @pipeToFinalize(pipeState, error); - else - @pipeToFinalize(pipeState); - }, (e) => { - @pipeToFinalize(pipeState, e); - }); - }; - - if (@getByIdDirectPrivate(pipeState.destination, "state") === "writable" && !@writableStreamCloseQueuedOrInFlight(pipeState.destination)) { - pipeState.pendingReadPromiseCapability.@promise.@then(() => { - pipeState.pendingWritePromise.@then(finalize, finalize); - }, (e) => @pipeToFinalize(pipeState, e)); - return; - } - - finalize(); -} - -function pipeToShutdown(pipeState) -{ - "use strict"; - - if (pipeState.shuttingDown) - return; - - pipeState.shuttingDown = true; - - const hasError = arguments.length > 1; - const error = arguments[1]; - const finalize = () => { - if (hasError) - @pipeToFinalize(pipeState, error); - else - @pipeToFinalize(pipeState); - }; - - if (@getByIdDirectPrivate(pipeState.destination, "state") === "writable" && !@writableStreamCloseQueuedOrInFlight(pipeState.destination)) { - pipeState.pendingReadPromiseCapability.@promise.@then(() => { - pipeState.pendingWritePromise.@then(finalize, finalize); - }, (e) => @pipeToFinalize(pipeState, e)); - return; - } - finalize(); -} - -function pipeToFinalize(pipeState) -{ - "use strict"; - - @writableStreamDefaultWriterRelease(pipeState.writer); - @readableStreamReaderGenericRelease(pipeState.reader); - - // Instead of removing the abort algorithm as per spec, we make it a no-op which is equivalent. - pipeState.finalized = true; - - if (arguments.length > 1) - pipeState.promiseCapability.@reject.@call(@undefined, arguments[1]); - else - pipeState.promiseCapability.@resolve.@call(); -} - -function readableStreamTee(stream, shouldClone) -{ - "use strict"; - - @assert(@isReadableStream(stream)); - @assert(typeof(shouldClone) === "boolean"); - - const reader = new @ReadableStreamDefaultReader(stream); - - const teeState = { - closedOrErrored: false, - canceled1: false, - canceled2: false, - reason1: @undefined, - reason2: @undefined, - }; - - teeState.cancelPromiseCapability = @newPromiseCapability(@Promise); - - const pullFunction = @readableStreamTeePullFunction(teeState, reader, shouldClone); - - const branch1Source = { }; - @putByIdDirectPrivate(branch1Source, "pull", pullFunction); - @putByIdDirectPrivate(branch1Source, "cancel", @readableStreamTeeBranch1CancelFunction(teeState, stream)); - - const branch2Source = { }; - @putByIdDirectPrivate(branch2Source, "pull", pullFunction); - @putByIdDirectPrivate(branch2Source, "cancel", @readableStreamTeeBranch2CancelFunction(teeState, stream)); - - const branch1 = new @ReadableStream(branch1Source); - const branch2 = new @ReadableStream(branch2Source); - - @getByIdDirectPrivate(reader, "closedPromiseCapability").@promise.@then(@undefined, function(e) { - if (teeState.closedOrErrored) - return; - @readableStreamDefaultControllerError(branch1.@readableStreamController, e); - @readableStreamDefaultControllerError(branch2.@readableStreamController, e); - teeState.closedOrErrored = true; - if (!teeState.canceled1 || !teeState.canceled2) - teeState.cancelPromiseCapability.@resolve.@call(); - }); - - // Additional fields compared to the spec, as they are needed within pull/cancel functions. - teeState.branch1 = branch1; - teeState.branch2 = branch2; - - return [branch1, branch2]; -} - -function readableStreamTeePullFunction(teeState, reader, shouldClone) -{ - "use strict"; - - return function() { - @Promise.prototype.@then.@call(@readableStreamDefaultReaderRead(reader), function(result) { - @assert(@isObject(result)); - @assert(typeof result.done === "boolean"); - if (result.done && !teeState.closedOrErrored) { - if (!teeState.canceled1) - @readableStreamDefaultControllerClose(teeState.branch1.@readableStreamController); - if (!teeState.canceled2) - @readableStreamDefaultControllerClose(teeState.branch2.@readableStreamController); - teeState.closedOrErrored = true; - if (!teeState.canceled1 || !teeState.canceled2) - teeState.cancelPromiseCapability.@resolve.@call(); - } - if (teeState.closedOrErrored) - return; - if (!teeState.canceled1) - @readableStreamDefaultControllerEnqueue(teeState.branch1.@readableStreamController, result.value); - if (!teeState.canceled2) - @readableStreamDefaultControllerEnqueue(teeState.branch2.@readableStreamController, shouldClone ? @structuredCloneForStream(result.value) : result.value); - }); - } -} - -function readableStreamTeeBranch1CancelFunction(teeState, stream) -{ - "use strict"; - - return function(r) { - teeState.canceled1 = true; - teeState.reason1 = r; - if (teeState.canceled2) { - @readableStreamCancel(stream, [teeState.reason1, teeState.reason2]).@then( - teeState.cancelPromiseCapability.@resolve, - teeState.cancelPromiseCapability.@reject); - } - return teeState.cancelPromiseCapability.@promise; - } -} - -function readableStreamTeeBranch2CancelFunction(teeState, stream) -{ - "use strict"; - - return function(r) { - teeState.canceled2 = true; - teeState.reason2 = r; - if (teeState.canceled1) { - @readableStreamCancel(stream, [teeState.reason1, teeState.reason2]).@then( - teeState.cancelPromiseCapability.@resolve, - teeState.cancelPromiseCapability.@reject); - } - return teeState.cancelPromiseCapability.@promise; - } -} - -function isReadableStream(stream) -{ - "use strict"; - - // Spec tells to return true only if stream has a readableStreamController internal slot. - // However, since it is a private slot, it cannot be checked using hasOwnProperty(). - // Therefore, readableStreamController is initialized with null value. - return @isObject(stream) && @getByIdDirectPrivate(stream, "readableStreamController") !== @undefined; -} - -function isReadableStreamDefaultReader(reader) -{ - "use strict"; - - // Spec tells to return true only if reader has a readRequests internal slot. - // However, since it is a private slot, it cannot be checked using hasOwnProperty(). - // Since readRequests is initialized with an empty array, the following test is ok. - return @isObject(reader) && !!@getByIdDirectPrivate(reader, "readRequests"); -} - -function isReadableStreamDefaultController(controller) -{ - "use strict"; - - // Spec tells to return true only if controller has an underlyingSource internal slot. - // However, since it is a private slot, it cannot be checked using hasOwnProperty(). - // underlyingSource is obtained in ReadableStream constructor: if undefined, it is set - // to an empty object. Therefore, following test is ok. - return @isObject(controller) && !!@getByIdDirectPrivate(controller, "underlyingSource"); -} - - -@globalPrivate -function assignDirectStream() { - "use strict"; - - var stream = this; -} - - -function handleDirectStreamError(e) { - "use strict"; - - var controller = this; - var sink = controller.@sink; - if (sink) { - @putByIdDirectPrivate(controller, "sink", @undefined); - try { - sink.close(e); - } catch (f) {} - } - - this.error = this.drain = this.write = this.close = this.end = @onReadableStreamDirectControllerClosed; - - if (typeof this.@underlyingSource.close === 'function') { - try { - this.@underlyingSource.close.@call(this.@underlyingSource, e); - } catch (e) { - } - } - - try { - var pend = controller._pendingRead; - if (pend) { - controller._pendingRead = @undefined; - @rejectPromise(pend, e); - } - } catch (f) {} - var stream = controller.@controlledReadableStream; - if (stream) @readableStreamError(stream, e); -} - -function handleDirectStreamErrorReject(e) { - @handleDirectStreamError.@call(this, e); - return @Promise.@reject(e); -} - -function onPullDirectStream(controller) -{ - - "use strict"; - - var stream = controller.@controlledReadableStream; - if (!stream || @getByIdDirectPrivate(stream, "state") !== @streamReadable) - return; - - // pull is in progress - // this is a recursive call - // ignore it - if (controller._deferClose === -1) { - return; - } - - - controller._deferClose = -1; - controller._deferDrain = -1; - var deferClose; - var deferDrain; - - // Direct streams allow @pull to be called multiple times, unlike the spec. - // Backpressure is handled by the destination, not by the underlying source. - // In this case, we rely on the heuristic that repeatedly draining in the same tick - // is bad for performance - // this code is only run when consuming a direct stream from JS - // without the HTTP server or anything else - try { - var result = controller.@underlyingSource.pull( - controller, - ); - - if (result && @isPromise(result)) { - if (controller._handleError === @undefined) { - controller._handleError = @handleDirectStreamErrorReject.bind(controller); - } - - @Promise.prototype.catch.@call(result, controller._handleError); - } - } catch(e) { - return @handleDirectStreamErrorReject.@call(controller, e); - } finally { - deferClose = controller._deferClose; - deferDrain = controller._deferDrain; - controller._deferDrain = controller._deferClose = 0; - } - - - var promiseToReturn; - - - if (controller._pendingRead === @undefined) { - controller._pendingRead = promiseToReturn = @newPromise(); - } else { - promiseToReturn = @readableStreamAddReadRequest(stream); - } - - - // they called close during @pull() - // we delay that - if (deferClose === 1) { - var reason = controller._deferCloseReason; - controller._deferCloseReason = @undefined; - @onCloseDirectStream.@call(controller, reason); - return promiseToReturn; - } - - // not done, but they called drain() - if (deferDrain === 1) { - @onDrainDirectStream.@call(controller); - } - - - return promiseToReturn; -} - -function noopDoneFunction() { - return @Promise.@resolve({value: @undefined, done: true}); -} - -function onReadableStreamDirectControllerClosed(reason) -{ - "use strict"; - @throwTypeError("ReadableStreamDirectController is now closed"); -} - -function onCloseDirectStream(reason) -{ - "use strict"; - var stream = this.@controlledReadableStream; - if (!stream || @getByIdDirectPrivate(stream, "state") !== @streamReadable) - return; - - if (this._deferClose !== 0) { - this._deferClose = 1; - this._deferCloseReason = reason; - return; - } - - @putByIdDirectPrivate(stream, "state", @streamClosing); - if (typeof this.@underlyingSource.close === 'function') { - try { - this.@underlyingSource.close.@call(this.@underlyingSource, reason); - } catch (e) { - - } - } - - var drained; - try { - drained = this.@sink.end(); - @putByIdDirectPrivate(this, "sink", @undefined); - } catch (e) { - if (this._pendingRead) { - var read = this._pendingRead; - this._pendingRead = @undefined; - @rejectPromise(read, e); - } - @readableStreamError(stream, e); - return; - } - - this.error = this.drain = this.write = this.close = this.end = @onReadableStreamDirectControllerClosed; - - var reader = @getByIdDirectPrivate(stream, "reader"); - - if (reader && @isReadableStreamDefaultReader(reader)) { - var _pendingRead = this._pendingRead; - if (_pendingRead && @isPromise(_pendingRead) && drained?.byteLength) { - this._pendingRead = @undefined; - @fulfillPromise(_pendingRead, {value: drained, done: false}); - @readableStreamClose(stream); - return; - } - } - - if (drained?.byteLength) { - var requests = @getByIdDirectPrivate(reader, "readRequests"); - if (requests?.isNotEmpty()) { - @readableStreamFulfillReadRequest(stream, drained, false); - @readableStreamClose(stream); - return; - } - - @putByIdDirectPrivate(stream, "state", @streamReadable); - this.@pull = () => { - var thisResult = @createFulfilledPromise({value: drained, done: false}); - drained = @undefined; - @readableStreamClose(stream); - stream = @undefined; - return thisResult; - }; - } else if (this._pendingRead) { - var read = this._pendingRead; - this._pendingRead = @undefined; - @putByIdDirectPrivate(this, "pull", @noopDoneFunction); - @fulfillPromise(read, {value: @undefined, done: true}); - } - - @readableStreamClose(stream); -} - -function onDrainDirectStream() -{ - "use strict"; - - var stream = this.@controlledReadableStream; - var reader = @getByIdDirectPrivate(stream, "reader"); - if (!reader || !@isReadableStreamDefaultReader(reader)) { - return; - } - - var _pendingRead = this._pendingRead; - this._pendingRead = @undefined; - if (_pendingRead && @isPromise(_pendingRead)) { - var drained = this.@sink.drain(); - if (drained?.byteLength) { - this._pendingRead = @getByIdDirectPrivate(stream, "readRequests")?.shift(); - @fulfillPromise(_pendingRead, {value: drained, done: false}); - } else { - this._pendingRead = _pendingRead; - } - } else if (@getByIdDirectPrivate(stream, "readRequests")?.isNotEmpty()) { - var drained = this.@sink.drain(); - if (drained?.byteLength) { - @readableStreamFulfillReadRequest(stream, drained, false); - } - } else if (this._deferDrain === -1) { - this._deferDrain = 1; - } - -} - -function initializeArrayBufferStream(underlyingSource, highWaterMark) -{ - "use strict"; - - // This is the fallback implementation for direct streams - // When we don't know what the destination type is - // We assume it is a Uint8Array. - - var opts = highWaterMark ? {highWaterMark, stream: true, asUint8Array: true} : {stream: true, asUint8Array: true}; - var sink = new globalThis.Bun.ArrayBufferSink(); - sink.start(opts); - - var controller = { - @underlyingSource: underlyingSource, - @pull: @onPullDirectStream, - @controlledReadableStream: this, - @sink: sink, - close: @onCloseDirectStream, - write: sink.write.bind(sink), - error: @handleDirectStreamError, - end: @onCloseDirectStream, - @close: @onCloseDirectStream, - drain: @onDrainDirectStream, - _pendingRead: @undefined, - _deferClose: 0, - _deferDrain: 0, - _deferCloseReason: @undefined, - _handleError: @undefined, - }; - - - @putByIdDirectPrivate(this, "readableStreamController", controller); - -} - -function readableStreamError(stream, error) -{ - "use strict"; - - @assert(@isReadableStream(stream)); - @assert(@getByIdDirectPrivate(stream, "state") === @streamReadable); - @putByIdDirectPrivate(stream, "state", @streamErrored); - @putByIdDirectPrivate(stream, "storedError", error); - - const reader = @getByIdDirectPrivate(stream, "reader"); - - if (!reader) - return; - - if (@isReadableStreamDefaultReader(reader)) { - const requests = @getByIdDirectPrivate(reader, "readRequests"); - @putByIdDirectPrivate(reader, "readRequests", @createFIFO()); - for (var request = requests.shift(); request; request = requests.shift()) - @rejectPromise(request, error); - } else { - @assert(@isReadableStreamBYOBReader(reader)); - const requests = @getByIdDirectPrivate(reader, "readIntoRequests"); - @putByIdDirectPrivate(reader, "readIntoRequests", @createFIFO()); - for (var request = requests.shift(); request; request = requests.shift()) - @rejectPromise(request, error); - } - - @getByIdDirectPrivate(reader, "closedPromiseCapability").@reject.@call(@undefined, error); - const promise = @getByIdDirectPrivate(reader, "closedPromiseCapability").@promise; - @markPromiseAsHandled(promise); -} - -function readableStreamDefaultControllerShouldCallPull(controller) -{ - "use strict"; - - const stream = @getByIdDirectPrivate(controller, "controlledReadableStream"); - - if (!@readableStreamDefaultControllerCanCloseOrEnqueue(controller)) - return false; - if (!(@getByIdDirectPrivate(controller, "started") === 1)) - return false; - if ((!@isReadableStreamLocked(stream) || !@getByIdDirectPrivate(@getByIdDirectPrivate(stream, "reader"), "readRequests")?.isNotEmpty()) && @readableStreamDefaultControllerGetDesiredSize(controller) <= 0) - return false; - const desiredSize = @readableStreamDefaultControllerGetDesiredSize(controller); - @assert(desiredSize !== null); - return desiredSize > 0; -} - -function readableStreamDefaultControllerCallPullIfNeeded(controller) -{ - "use strict"; - - // FIXME: use @readableStreamDefaultControllerShouldCallPull - const stream = @getByIdDirectPrivate(controller, "controlledReadableStream"); - - if (!@readableStreamDefaultControllerCanCloseOrEnqueue(controller)) - return; - if (!(@getByIdDirectPrivate(controller, "started") === 1)) - return; - if ((!@isReadableStreamLocked(stream) || !@getByIdDirectPrivate(@getByIdDirectPrivate(stream, "reader"), "readRequests")?.isNotEmpty()) && @readableStreamDefaultControllerGetDesiredSize(controller) <= 0) - return; - - if (@getByIdDirectPrivate(controller, "pulling")) { - @putByIdDirectPrivate(controller, "pullAgain", true); - return; - } - - - @assert(!@getByIdDirectPrivate(controller, "pullAgain")); - @putByIdDirectPrivate(controller, "pulling", true); - - @getByIdDirectPrivate(controller, "pullAlgorithm").@call(@undefined).@then(function() { - @putByIdDirectPrivate(controller, "pulling", false); - if (@getByIdDirectPrivate(controller, "pullAgain")) { - @putByIdDirectPrivate(controller, "pullAgain", false); - - @readableStreamDefaultControllerCallPullIfNeeded(controller); - } - }, function(error) { - @readableStreamDefaultControllerError(controller, error); - }); -} - -function isReadableStreamLocked(stream) -{ - "use strict"; - - @assert(@isReadableStream(stream)); - return !!@getByIdDirectPrivate(stream, "reader"); -} - -function readableStreamDefaultControllerGetDesiredSize(controller) -{ - "use strict"; - - const stream = @getByIdDirectPrivate(controller, "controlledReadableStream"); - const state = @getByIdDirectPrivate(stream, "state"); - - if (state === @streamErrored) - return null; - if (state === @streamClosed) - return 0; - - return @getByIdDirectPrivate(controller, "strategy").highWaterMark - @getByIdDirectPrivate(controller, "queue").size; -} - - -function readableStreamReaderGenericCancel(reader, reason) -{ - "use strict"; - - const stream = @getByIdDirectPrivate(reader, "ownerReadableStream"); - @assert(!!stream); - return @readableStreamCancel(stream, reason); -} - -function readableStreamCancel(stream, reason) -{ - "use strict"; - - @putByIdDirectPrivate(stream, "disturbed", true); - const state = @getByIdDirectPrivate(stream, "state"); - if (state === @streamClosed) - return @Promise.@resolve(); - if (state === @streamErrored) - return @Promise.@reject(@getByIdDirectPrivate(stream, "storedError")); - @readableStreamClose(stream); - - var controller = @getByIdDirectPrivate(stream, "readableStreamController"); - return controller.@cancel(controller, reason).@then(function() { }); -} - -function readableStreamDefaultControllerCancel(controller, reason) -{ - "use strict"; - - @putByIdDirectPrivate(controller, "queue", @newQueue()); - return @getByIdDirectPrivate(controller, "cancelAlgorithm").@call(@undefined, reason); -} - -function readableStreamDefaultControllerPull(controller) -{ - "use strict"; - - var queue = @getByIdDirectPrivate(controller, "queue"); - if (queue.content.isNotEmpty()) { - const chunk = @dequeueValue(queue); - if (@getByIdDirectPrivate(controller, "closeRequested") && queue.content.isEmpty()) - @readableStreamClose(@getByIdDirectPrivate(controller, "controlledReadableStream")); - else - @readableStreamDefaultControllerCallPullIfNeeded(controller); - - return @createFulfilledPromise({ value: chunk, done: false }); - } - const pendingPromise = @readableStreamAddReadRequest(@getByIdDirectPrivate(controller, "controlledReadableStream")); - @readableStreamDefaultControllerCallPullIfNeeded(controller); - return pendingPromise; -} - -function readableStreamDefaultControllerClose(controller) -{ - "use strict"; - - @assert(@readableStreamDefaultControllerCanCloseOrEnqueue(controller)); - @putByIdDirectPrivate(controller, "closeRequested", true); - if (@getByIdDirectPrivate(controller, "queue")?.content?.isEmpty()) - @readableStreamClose(@getByIdDirectPrivate(controller, "controlledReadableStream")); -} - -function readableStreamClose(stream) -{ - "use strict"; - - @assert(@getByIdDirectPrivate(stream, "state") === @streamReadable); - @putByIdDirectPrivate(stream, "state", @streamClosed); - if (!@getByIdDirectPrivate(stream, "reader")) - return; - - if (@isReadableStreamDefaultReader(@getByIdDirectPrivate(stream, "reader"))) { - const requests = @getByIdDirectPrivate(@getByIdDirectPrivate(stream, "reader"), "readRequests"); - if (requests.isNotEmpty()) { - @putByIdDirectPrivate(@getByIdDirectPrivate(stream, "reader"), "readRequests", @createFIFO()); - - for (var request = requests.shift(); request; request = requests.shift()) - @fulfillPromise(request, { value: @undefined, done: true }); - } - } - - @getByIdDirectPrivate(@getByIdDirectPrivate(stream, "reader"), "closedPromiseCapability").@resolve.@call(); -} - -function readableStreamFulfillReadRequest(stream, chunk, done) -{ - "use strict"; - const readRequest = @getByIdDirectPrivate(@getByIdDirectPrivate(stream, "reader"), "readRequests").shift(); - @fulfillPromise(readRequest, { value: chunk, done: done }); -} - -function readableStreamDefaultControllerEnqueue(controller, chunk) -{ - "use strict"; - - const stream = @getByIdDirectPrivate(controller, "controlledReadableStream"); - // this is checked by callers - @assert(@readableStreamDefaultControllerCanCloseOrEnqueue(controller)); - - if (@isReadableStreamLocked(stream) && @getByIdDirectPrivate(@getByIdDirectPrivate(stream, "reader"), "readRequests")?.isNotEmpty()) { - @readableStreamFulfillReadRequest(stream, chunk, false); - @readableStreamDefaultControllerCallPullIfNeeded(controller); - return; - } - - try { - let chunkSize = 1; - if (@getByIdDirectPrivate(controller, "strategy").size !== @undefined) - chunkSize = @getByIdDirectPrivate(controller, "strategy").size(chunk); - @enqueueValueWithSize(@getByIdDirectPrivate(controller, "queue"), chunk, chunkSize); - } - catch(error) { - @readableStreamDefaultControllerError(controller, error); - throw error; - } - @readableStreamDefaultControllerCallPullIfNeeded(controller); -} - -function readableStreamDefaultReaderRead(reader) -{ - "use strict"; - - const stream = @getByIdDirectPrivate(reader, "ownerReadableStream"); - @assert(!!stream); - const state = @getByIdDirectPrivate(stream, "state"); - - @putByIdDirectPrivate(stream, "disturbed", true); - if (state === @streamClosed) - return @createFulfilledPromise({ value: @undefined, done: true }); - if (state === @streamErrored) - return @Promise.@reject(@getByIdDirectPrivate(stream, "storedError")); - @assert(state === @streamReadable); - - return @getByIdDirectPrivate(stream, "readableStreamController").@pull(@getByIdDirectPrivate(stream, "readableStreamController")); -} - -function readableStreamAddReadRequest(stream) -{ - "use strict"; - - @assert(@isReadableStreamDefaultReader(@getByIdDirectPrivate(stream, "reader"))); - @assert(@getByIdDirectPrivate(stream, "state") == @streamReadable); - - const readRequest = @newPromise(); - - @getByIdDirectPrivate(@getByIdDirectPrivate(stream, "reader"), "readRequests").push(readRequest); - - return readRequest; -} - -function isReadableStreamDisturbed(stream) -{ - "use strict"; - - @assert(@isReadableStream(stream)); - return @getByIdDirectPrivate(stream, "disturbed"); -} - -function readableStreamReaderGenericRelease(reader) -{ - "use strict"; - - @assert(!!@getByIdDirectPrivate(reader, "ownerReadableStream")); - @assert(@getByIdDirectPrivate(@getByIdDirectPrivate(reader, "ownerReadableStream"), "reader") === reader); - - if (@getByIdDirectPrivate(@getByIdDirectPrivate(reader, "ownerReadableStream"), "state") === @streamReadable) - @getByIdDirectPrivate(reader, "closedPromiseCapability").@reject.@call(@undefined, @makeTypeError("releasing lock of reader whose stream is still in readable state")); - else - @putByIdDirectPrivate(reader, "closedPromiseCapability", { @promise: @newHandledRejectedPromise(@makeTypeError("reader released lock")) }); - - const promise = @getByIdDirectPrivate(reader, "closedPromiseCapability").@promise; - @markPromiseAsHandled(promise); - @putByIdDirectPrivate(@getByIdDirectPrivate(reader, "ownerReadableStream"), "reader", @undefined); - @putByIdDirectPrivate(reader, "ownerReadableStream", @undefined); -} - -function readableStreamDefaultControllerCanCloseOrEnqueue(controller) -{ - "use strict"; - - return !@getByIdDirectPrivate(controller, "closeRequested") && @getByIdDirectPrivate(@getByIdDirectPrivate(controller, "controlledReadableStream"), "state") === @streamReadable; -} - - -function lazyLoadStream(stream, autoAllocateChunkSize) { - "use strict"; - - var nativeType = @getByIdDirectPrivate(stream, "bunNativeType"); - var nativePtr = @getByIdDirectPrivate(stream, "bunNativePtr"); - var cached = @lazyStreamPrototypeMap; - var Prototype = cached.@get(nativeType); - if (Prototype === @undefined) { - var [pull, start, cancel, setClose, deinit] = @lazyLoad(nativeType); - var closer = [false]; - var handleResult; - function handleNativeReadableStreamPromiseResult(val) { - "use strict"; - var {c, v} = this; - this.c = @undefined; - this.v = @undefined; - handleResult(val, c, v); - } - - handleResult = function handleResult(result, controller, view) { - "use strict"; - - if (result && @isPromise(result)) { - return result.then(handleNativeReadableStreamPromiseResult.bind({c: controller, v: view}), (err) => controller.error(err)); - } else if (result !== false) { - if (view && view.byteLength === result) { - controller.byobRequest.respondWithNewView(view); - } else { - controller.byobRequest.respond(result); - } - } - - if (closer[0] || result === false) { - @enqueueJob(() => controller.close()); - closer[0] = false; - } - }; - - Prototype = class NativeReadableStreamSource { - constructor(tag, autoAllocateChunkSize) { - this.pull = this.pull_.bind(tag); - this.cancel = this.cancel_.bind(tag); - this.autoAllocateChunkSize = autoAllocateChunkSize; - } - - pull; - cancel; - - type = "bytes"; - autoAllocateChunkSize = 0; - - static startSync = start; - - pull_(controller) { - closer[0] = false; - var result; - - const view = controller.byobRequest.view; - try { - result = pull(this, view, closer); - } catch(err) { - return controller.error(err); - } - - return handleResult(result, controller, view); - } - - cancel_(reason) { - cancel(this, reason); - } - static deinit = deinit; - static registry = new FinalizationRegistry(deinit); - } - cached.@set(nativeType, Prototype); - } - - const chunkSize = Prototype.startSync(nativePtr, autoAllocateChunkSize); - - // empty file, no need for native back-and-forth on this - if (chunkSize === 0) { - @readableStreamClose(stream); - return null; - } - var instance = new Prototype(nativePtr, chunkSize); - Prototype.registry.register(instance, nativePtr); - return instance; -}
\ No newline at end of file |