diff options
Diffstat (limited to 'src/js/builtins/WritableStreamInternals.ts')
-rw-r--r-- | src/js/builtins/WritableStreamInternals.ts | 790 |
1 files changed, 790 insertions, 0 deletions
diff --git a/src/js/builtins/WritableStreamInternals.ts b/src/js/builtins/WritableStreamInternals.ts new file mode 100644 index 000000000..f436a285e --- /dev/null +++ b/src/js/builtins/WritableStreamInternals.ts @@ -0,0 +1,790 @@ +/* + * Copyright (C) 2015 Canon Inc. + * Copyright (C) 2015 Igalia + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY APPLE INC. ``AS IS'' AND ANY + * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR + * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL APPLE INC. OR + * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, + * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, + * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR + * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY + * OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +// @internal + +export function isWritableStream(stream) { + return $isObject(stream) && !!$getByIdDirectPrivate(stream, "underlyingSink"); +} + +export function isWritableStreamDefaultWriter(writer) { + return $isObject(writer) && !!$getByIdDirectPrivate(writer, "closedPromise"); +} + +export function acquireWritableStreamDefaultWriter(stream) { + return new WritableStreamDefaultWriter(stream); +} + +// https://streams.spec.whatwg.org/#create-writable-stream +export function createWritableStream( + startAlgorithm, + writeAlgorithm, + closeAlgorithm, + abortAlgorithm, + highWaterMark, + sizeAlgorithm, +) { + $assert(typeof highWaterMark === "number" && !isNaN(highWaterMark) && highWaterMark >= 0); + + const internalStream = {}; + $initializeWritableStreamSlots(internalStream, {}); + const controller = new WritableStreamDefaultController(); + + $setUpWritableStreamDefaultController( + internalStream, + controller, + startAlgorithm, + writeAlgorithm, + closeAlgorithm, + abortAlgorithm, + highWaterMark, + sizeAlgorithm, + ); + + return $createWritableStreamFromInternal(internalStream); +} + +export function createInternalWritableStreamFromUnderlyingSink(underlyingSink, strategy) { + const stream = {}; + + if (underlyingSink === undefined) underlyingSink = {}; + + if (strategy === undefined) strategy = {}; + + if (!$isObject(underlyingSink)) $throwTypeError("WritableStream constructor takes an object as first argument"); + + if ("type" in underlyingSink) $throwRangeError("Invalid type is specified"); + + const sizeAlgorithm = $extractSizeAlgorithm(strategy); + const highWaterMark = $extractHighWaterMark(strategy, 1); + + const underlyingSinkDict = {}; + if ("start" in underlyingSink) { + underlyingSinkDict["start"] = underlyingSink["start"]; + if (typeof underlyingSinkDict["start"] !== "function") $throwTypeError("underlyingSink.start should be a function"); + } + if ("write" in underlyingSink) { + underlyingSinkDict["write"] = underlyingSink["write"]; + if (typeof underlyingSinkDict["write"] !== "function") $throwTypeError("underlyingSink.write should be a function"); + } + if ("close" in underlyingSink) { + underlyingSinkDict["close"] = underlyingSink["close"]; + if (typeof underlyingSinkDict["close"] !== "function") $throwTypeError("underlyingSink.close should be a function"); + } + if ("abort" in underlyingSink) { + underlyingSinkDict["abort"] = underlyingSink["abort"]; + if (typeof underlyingSinkDict["abort"] !== "function") $throwTypeError("underlyingSink.abort should be a function"); + } + + $initializeWritableStreamSlots(stream, underlyingSink); + $setUpWritableStreamDefaultControllerFromUnderlyingSink( + stream, + underlyingSink, + underlyingSinkDict, + highWaterMark, + sizeAlgorithm, + ); + + return stream; +} + +export function initializeWritableStreamSlots(stream, underlyingSink) { + $putByIdDirectPrivate(stream, "state", "writable"); + $putByIdDirectPrivate(stream, "storedError", undefined); + $putByIdDirectPrivate(stream, "writer", undefined); + $putByIdDirectPrivate(stream, "controller", undefined); + $putByIdDirectPrivate(stream, "inFlightWriteRequest", undefined); + $putByIdDirectPrivate(stream, "closeRequest", undefined); + $putByIdDirectPrivate(stream, "inFlightCloseRequest", undefined); + $putByIdDirectPrivate(stream, "pendingAbortRequest", undefined); + $putByIdDirectPrivate(stream, "writeRequests", $createFIFO()); + $putByIdDirectPrivate(stream, "backpressure", false); + $putByIdDirectPrivate(stream, "underlyingSink", underlyingSink); +} + +export function writableStreamCloseForBindings(stream) { + if ($isWritableStreamLocked(stream)) + return Promise.$reject($makeTypeError("WritableStream.close method can only be used on non locked WritableStream")); + + if ($writableStreamCloseQueuedOrInFlight(stream)) + return Promise.$reject( + $makeTypeError("WritableStream.close method can only be used on a being close WritableStream"), + ); + + return $writableStreamClose(stream); +} + +export function writableStreamAbortForBindings(stream, reason) { + if ($isWritableStreamLocked(stream)) + return Promise.$reject($makeTypeError("WritableStream.abort method can only be used on non locked WritableStream")); + + return $writableStreamAbort(stream, reason); +} + +export function isWritableStreamLocked(stream) { + return $getByIdDirectPrivate(stream, "writer") !== undefined; +} + +export function setUpWritableStreamDefaultWriter(writer, stream) { + if ($isWritableStreamLocked(stream)) $throwTypeError("WritableStream is locked"); + + $putByIdDirectPrivate(writer, "stream", stream); + $putByIdDirectPrivate(stream, "writer", writer); + + const readyPromiseCapability = $newPromiseCapability(Promise); + const closedPromiseCapability = $newPromiseCapability(Promise); + $putByIdDirectPrivate(writer, "readyPromise", readyPromiseCapability); + $putByIdDirectPrivate(writer, "closedPromise", closedPromiseCapability); + + const state = $getByIdDirectPrivate(stream, "state"); + if (state === "writable") { + if ($writableStreamCloseQueuedOrInFlight(stream) || !$getByIdDirectPrivate(stream, "backpressure")) + readyPromiseCapability.$resolve.$call(); + } else if (state === "erroring") { + readyPromiseCapability.$reject.$call(undefined, $getByIdDirectPrivate(stream, "storedError")); + $markPromiseAsHandled(readyPromiseCapability.$promise); + } else if (state === "closed") { + readyPromiseCapability.$resolve.$call(); + closedPromiseCapability.$resolve.$call(); + } else { + $assert(state === "errored"); + const storedError = $getByIdDirectPrivate(stream, "storedError"); + readyPromiseCapability.$reject.$call(undefined, storedError); + $markPromiseAsHandled(readyPromiseCapability.$promise); + closedPromiseCapability.$reject.$call(undefined, storedError); + $markPromiseAsHandled(closedPromiseCapability.$promise); + } +} + +export function writableStreamAbort(stream, reason) { + const state = $getByIdDirectPrivate(stream, "state"); + if (state === "closed" || state === "errored") return Promise.$resolve(); + + const pendingAbortRequest = $getByIdDirectPrivate(stream, "pendingAbortRequest"); + if (pendingAbortRequest !== undefined) return pendingAbortRequest.promise.$promise; + + $assert(state === "writable" || state === "erroring"); + let wasAlreadyErroring = false; + if (state === "erroring") { + wasAlreadyErroring = true; + reason = undefined; + } + + const abortPromiseCapability = $newPromiseCapability(Promise); + $putByIdDirectPrivate(stream, "pendingAbortRequest", { + promise: abortPromiseCapability, + reason: reason, + wasAlreadyErroring: wasAlreadyErroring, + }); + + if (!wasAlreadyErroring) $writableStreamStartErroring(stream, reason); + return abortPromiseCapability.$promise; +} + +export function writableStreamClose(stream) { + const state = $getByIdDirectPrivate(stream, "state"); + if (state === "closed" || state === "errored") + return Promise.$reject($makeTypeError("Cannot close a writable stream that is closed or errored")); + + $assert(state === "writable" || state === "erroring"); + $assert(!$writableStreamCloseQueuedOrInFlight(stream)); + + const closePromiseCapability = $newPromiseCapability(Promise); + $putByIdDirectPrivate(stream, "closeRequest", closePromiseCapability); + + const writer = $getByIdDirectPrivate(stream, "writer"); + if (writer !== undefined && $getByIdDirectPrivate(stream, "backpressure") && state === "writable") + $getByIdDirectPrivate(writer, "readyPromise").$resolve.$call(); + + $writableStreamDefaultControllerClose($getByIdDirectPrivate(stream, "controller")); + + return closePromiseCapability.$promise; +} + +export function writableStreamAddWriteRequest(stream) { + $assert($isWritableStreamLocked(stream)); + $assert($getByIdDirectPrivate(stream, "state") === "writable"); + + const writePromiseCapability = $newPromiseCapability(Promise); + const writeRequests = $getByIdDirectPrivate(stream, "writeRequests"); + writeRequests.push(writePromiseCapability); + return writePromiseCapability.$promise; +} + +export function writableStreamCloseQueuedOrInFlight(stream) { + return ( + $getByIdDirectPrivate(stream, "closeRequest") !== undefined || + $getByIdDirectPrivate(stream, "inFlightCloseRequest") !== undefined + ); +} + +export function writableStreamDealWithRejection(stream, error) { + const state = $getByIdDirectPrivate(stream, "state"); + if (state === "writable") { + $writableStreamStartErroring(stream, error); + return; + } + + $assert(state === "erroring"); + $writableStreamFinishErroring(stream); +} + +export function writableStreamFinishErroring(stream) { + $assert($getByIdDirectPrivate(stream, "state") === "erroring"); + $assert(!$writableStreamHasOperationMarkedInFlight(stream)); + + $putByIdDirectPrivate(stream, "state", "errored"); + + const controller = $getByIdDirectPrivate(stream, "controller"); + $getByIdDirectPrivate(controller, "errorSteps").$call(); + + const storedError = $getByIdDirectPrivate(stream, "storedError"); + const requests = $getByIdDirectPrivate(stream, "writeRequests"); + for (var request = requests.shift(); request; request = requests.shift()) + request.$reject.$call(undefined, storedError); + + // TODO: is this still necessary? + $putByIdDirectPrivate(stream, "writeRequests", $createFIFO()); + + const abortRequest = $getByIdDirectPrivate(stream, "pendingAbortRequest"); + if (abortRequest === undefined) { + $writableStreamRejectCloseAndClosedPromiseIfNeeded(stream); + return; + } + + $putByIdDirectPrivate(stream, "pendingAbortRequest", undefined); + if (abortRequest.wasAlreadyErroring) { + abortRequest.promise.$reject.$call(undefined, storedError); + $writableStreamRejectCloseAndClosedPromiseIfNeeded(stream); + return; + } + + $getByIdDirectPrivate(controller, "abortSteps") + .$call(undefined, abortRequest.reason) + .$then( + () => { + abortRequest.promise.$resolve.$call(); + $writableStreamRejectCloseAndClosedPromiseIfNeeded(stream); + }, + reason => { + abortRequest.promise.$reject.$call(undefined, reason); + $writableStreamRejectCloseAndClosedPromiseIfNeeded(stream); + }, + ); +} + +export function writableStreamFinishInFlightClose(stream) { + const inFlightCloseRequest = $getByIdDirectPrivate(stream, "inFlightCloseRequest"); + inFlightCloseRequest.$resolve.$call(); + + $putByIdDirectPrivate(stream, "inFlightCloseRequest", undefined); + + const state = $getByIdDirectPrivate(stream, "state"); + $assert(state === "writable" || state === "erroring"); + + if (state === "erroring") { + $putByIdDirectPrivate(stream, "storedError", undefined); + const abortRequest = $getByIdDirectPrivate(stream, "pendingAbortRequest"); + if (abortRequest !== undefined) { + abortRequest.promise.$resolve.$call(); + $putByIdDirectPrivate(stream, "pendingAbortRequest", undefined); + } + } + + $putByIdDirectPrivate(stream, "state", "closed"); + + const writer = $getByIdDirectPrivate(stream, "writer"); + if (writer !== undefined) $getByIdDirectPrivate(writer, "closedPromise").$resolve.$call(); + + $assert($getByIdDirectPrivate(stream, "pendingAbortRequest") === undefined); + $assert($getByIdDirectPrivate(stream, "storedError") === undefined); +} + +export function writableStreamFinishInFlightCloseWithError(stream, error) { + const inFlightCloseRequest = $getByIdDirectPrivate(stream, "inFlightCloseRequest"); + $assert(inFlightCloseRequest !== undefined); + inFlightCloseRequest.$reject.$call(undefined, error); + + $putByIdDirectPrivate(stream, "inFlightCloseRequest", undefined); + + const state = $getByIdDirectPrivate(stream, "state"); + $assert(state === "writable" || state === "erroring"); + + const abortRequest = $getByIdDirectPrivate(stream, "pendingAbortRequest"); + if (abortRequest !== undefined) { + abortRequest.promise.$reject.$call(undefined, error); + $putByIdDirectPrivate(stream, "pendingAbortRequest", undefined); + } + + $writableStreamDealWithRejection(stream, error); +} + +export function writableStreamFinishInFlightWrite(stream) { + const inFlightWriteRequest = $getByIdDirectPrivate(stream, "inFlightWriteRequest"); + $assert(inFlightWriteRequest !== undefined); + inFlightWriteRequest.$resolve.$call(); + + $putByIdDirectPrivate(stream, "inFlightWriteRequest", undefined); +} + +export function writableStreamFinishInFlightWriteWithError(stream, error) { + const inFlightWriteRequest = $getByIdDirectPrivate(stream, "inFlightWriteRequest"); + $assert(inFlightWriteRequest !== undefined); + inFlightWriteRequest.$reject.$call(undefined, error); + + $putByIdDirectPrivate(stream, "inFlightWriteRequest", undefined); + + const state = $getByIdDirectPrivate(stream, "state"); + $assert(state === "writable" || state === "erroring"); + + $writableStreamDealWithRejection(stream, error); +} + +export function writableStreamHasOperationMarkedInFlight(stream) { + return ( + $getByIdDirectPrivate(stream, "inFlightWriteRequest") !== undefined || + $getByIdDirectPrivate(stream, "inFlightCloseRequest") !== undefined + ); +} + +export function writableStreamMarkCloseRequestInFlight(stream) { + const closeRequest = $getByIdDirectPrivate(stream, "closeRequest"); + $assert($getByIdDirectPrivate(stream, "inFlightCloseRequest") === undefined); + $assert(closeRequest !== undefined); + + $putByIdDirectPrivate(stream, "inFlightCloseRequest", closeRequest); + $putByIdDirectPrivate(stream, "closeRequest", undefined); +} + +export function writableStreamMarkFirstWriteRequestInFlight(stream) { + const writeRequests = $getByIdDirectPrivate(stream, "writeRequests"); + $assert($getByIdDirectPrivate(stream, "inFlightWriteRequest") === undefined); + $assert(writeRequests.isNotEmpty()); + + const writeRequest = writeRequests.shift(); + $putByIdDirectPrivate(stream, "inFlightWriteRequest", writeRequest); +} + +export function writableStreamRejectCloseAndClosedPromiseIfNeeded(stream) { + $assert($getByIdDirectPrivate(stream, "state") === "errored"); + + const storedError = $getByIdDirectPrivate(stream, "storedError"); + + const closeRequest = $getByIdDirectPrivate(stream, "closeRequest"); + if (closeRequest !== undefined) { + $assert($getByIdDirectPrivate(stream, "inFlightCloseRequest") === undefined); + closeRequest.$reject.$call(undefined, storedError); + $putByIdDirectPrivate(stream, "closeRequest", undefined); + } + + const writer = $getByIdDirectPrivate(stream, "writer"); + if (writer !== undefined) { + const closedPromise = $getByIdDirectPrivate(writer, "closedPromise"); + closedPromise.$reject.$call(undefined, storedError); + $markPromiseAsHandled(closedPromise.$promise); + } +} + +export function writableStreamStartErroring(stream, reason) { + $assert($getByIdDirectPrivate(stream, "storedError") === undefined); + $assert($getByIdDirectPrivate(stream, "state") === "writable"); + + const controller = $getByIdDirectPrivate(stream, "controller"); + $assert(controller !== undefined); + + $putByIdDirectPrivate(stream, "state", "erroring"); + $putByIdDirectPrivate(stream, "storedError", reason); + + const writer = $getByIdDirectPrivate(stream, "writer"); + if (writer !== undefined) $writableStreamDefaultWriterEnsureReadyPromiseRejected(writer, reason); + + if (!$writableStreamHasOperationMarkedInFlight(stream) && $getByIdDirectPrivate(controller, "started") === 1) + $writableStreamFinishErroring(stream); +} + +export function writableStreamUpdateBackpressure(stream, backpressure) { + $assert($getByIdDirectPrivate(stream, "state") === "writable"); + $assert(!$writableStreamCloseQueuedOrInFlight(stream)); + + const writer = $getByIdDirectPrivate(stream, "writer"); + if (writer !== undefined && backpressure !== $getByIdDirectPrivate(stream, "backpressure")) { + if (backpressure) $putByIdDirectPrivate(writer, "readyPromise", $newPromiseCapability(Promise)); + else $getByIdDirectPrivate(writer, "readyPromise").$resolve.$call(); + } + $putByIdDirectPrivate(stream, "backpressure", backpressure); +} + +export function writableStreamDefaultWriterAbort(writer, reason) { + const stream = $getByIdDirectPrivate(writer, "stream"); + $assert(stream !== undefined); + return $writableStreamAbort(stream, reason); +} + +export function writableStreamDefaultWriterClose(writer) { + const stream = $getByIdDirectPrivate(writer, "stream"); + $assert(stream !== undefined); + return $writableStreamClose(stream); +} + +export function writableStreamDefaultWriterCloseWithErrorPropagation(writer) { + const stream = $getByIdDirectPrivate(writer, "stream"); + $assert(stream !== undefined); + + const state = $getByIdDirectPrivate(stream, "state"); + + if ($writableStreamCloseQueuedOrInFlight(stream) || state === "closed") return Promise.$resolve(); + + if (state === "errored") return Promise.$reject($getByIdDirectPrivate(stream, "storedError")); + + $assert(state === "writable" || state === "erroring"); + return $writableStreamDefaultWriterClose(writer); +} + +export function writableStreamDefaultWriterEnsureClosedPromiseRejected(writer, error) { + let closedPromiseCapability = $getByIdDirectPrivate(writer, "closedPromise"); + let closedPromise = closedPromiseCapability.$promise; + + if (($getPromiseInternalField(closedPromise, $promiseFieldFlags) & $promiseStateMask) !== $promiseStatePending) { + closedPromiseCapability = $newPromiseCapability(Promise); + closedPromise = closedPromiseCapability.$promise; + $putByIdDirectPrivate(writer, "closedPromise", closedPromiseCapability); + } + + closedPromiseCapability.$reject.$call(undefined, error); + $markPromiseAsHandled(closedPromise); +} + +export function writableStreamDefaultWriterEnsureReadyPromiseRejected(writer, error) { + let readyPromiseCapability = $getByIdDirectPrivate(writer, "readyPromise"); + let readyPromise = readyPromiseCapability.$promise; + + if (($getPromiseInternalField(readyPromise, $promiseFieldFlags) & $promiseStateMask) !== $promiseStatePending) { + readyPromiseCapability = $newPromiseCapability(Promise); + readyPromise = readyPromiseCapability.$promise; + $putByIdDirectPrivate(writer, "readyPromise", readyPromiseCapability); + } + + readyPromiseCapability.$reject.$call(undefined, error); + $markPromiseAsHandled(readyPromise); +} + +export function writableStreamDefaultWriterGetDesiredSize(writer) { + const stream = $getByIdDirectPrivate(writer, "stream"); + $assert(stream !== undefined); + + const state = $getByIdDirectPrivate(stream, "state"); + + if (state === "errored" || state === "erroring") return null; + + if (state === "closed") return 0; + + return $writableStreamDefaultControllerGetDesiredSize($getByIdDirectPrivate(stream, "controller")); +} + +export function writableStreamDefaultWriterRelease(writer) { + const stream = $getByIdDirectPrivate(writer, "stream"); + $assert(stream !== undefined); + $assert($getByIdDirectPrivate(stream, "writer") === writer); + + const releasedError = $makeTypeError("writableStreamDefaultWriterRelease"); + + $writableStreamDefaultWriterEnsureReadyPromiseRejected(writer, releasedError); + $writableStreamDefaultWriterEnsureClosedPromiseRejected(writer, releasedError); + + $putByIdDirectPrivate(stream, "writer", undefined); + $putByIdDirectPrivate(writer, "stream", undefined); +} + +export function writableStreamDefaultWriterWrite(writer, chunk) { + const stream = $getByIdDirectPrivate(writer, "stream"); + $assert(stream !== undefined); + + const controller = $getByIdDirectPrivate(stream, "controller"); + $assert(controller !== undefined); + const chunkSize = $writableStreamDefaultControllerGetChunkSize(controller, chunk); + + if (stream !== $getByIdDirectPrivate(writer, "stream")) + return Promise.$reject($makeTypeError("writer is not stream's writer")); + + const state = $getByIdDirectPrivate(stream, "state"); + if (state === "errored") return Promise.$reject($getByIdDirectPrivate(stream, "storedError")); + + if ($writableStreamCloseQueuedOrInFlight(stream) || state === "closed") + return Promise.$reject($makeTypeError("stream is closing or closed")); + + if ($writableStreamCloseQueuedOrInFlight(stream) || state === "closed") + return Promise.$reject($makeTypeError("stream is closing or closed")); + + if (state === "erroring") return Promise.$reject($getByIdDirectPrivate(stream, "storedError")); + + $assert(state === "writable"); + + const promise = $writableStreamAddWriteRequest(stream); + $writableStreamDefaultControllerWrite(controller, chunk, chunkSize); + return promise; +} + +export function setUpWritableStreamDefaultController( + stream, + controller, + startAlgorithm, + writeAlgorithm, + closeAlgorithm, + abortAlgorithm, + highWaterMark, + sizeAlgorithm, +) { + $assert($isWritableStream(stream)); + $assert($getByIdDirectPrivate(stream, "controller") === undefined); + + $putByIdDirectPrivate(controller, "stream", stream); + $putByIdDirectPrivate(stream, "controller", controller); + + $resetQueue($getByIdDirectPrivate(controller, "queue")); + + $putByIdDirectPrivate(controller, "started", -1); + $putByIdDirectPrivate(controller, "startAlgorithm", startAlgorithm); + $putByIdDirectPrivate(controller, "strategySizeAlgorithm", sizeAlgorithm); + $putByIdDirectPrivate(controller, "strategyHWM", highWaterMark); + $putByIdDirectPrivate(controller, "writeAlgorithm", writeAlgorithm); + $putByIdDirectPrivate(controller, "closeAlgorithm", closeAlgorithm); + $putByIdDirectPrivate(controller, "abortAlgorithm", abortAlgorithm); + + const backpressure = $writableStreamDefaultControllerGetBackpressure(controller); + $writableStreamUpdateBackpressure(stream, backpressure); + + $writableStreamDefaultControllerStart(controller); +} + +export function writableStreamDefaultControllerStart(controller) { + if ($getByIdDirectPrivate(controller, "started") !== -1) return; + + $putByIdDirectPrivate(controller, "started", 0); + + const startAlgorithm = $getByIdDirectPrivate(controller, "startAlgorithm"); + $putByIdDirectPrivate(controller, "startAlgorithm", undefined); + const stream = $getByIdDirectPrivate(controller, "stream"); + return Promise.$resolve(startAlgorithm.$call()).$then( + () => { + const state = $getByIdDirectPrivate(stream, "state"); + $assert(state === "writable" || state === "erroring"); + $putByIdDirectPrivate(controller, "started", 1); + $writableStreamDefaultControllerAdvanceQueueIfNeeded(controller); + }, + error => { + const state = $getByIdDirectPrivate(stream, "state"); + $assert(state === "writable" || state === "erroring"); + $putByIdDirectPrivate(controller, "started", 1); + $writableStreamDealWithRejection(stream, error); + }, + ); +} + +export function setUpWritableStreamDefaultControllerFromUnderlyingSink( + stream, + underlyingSink, + underlyingSinkDict, + highWaterMark, + sizeAlgorithm, +) { + const controller = new $WritableStreamDefaultController(); + + let startAlgorithm = () => {}; + let writeAlgorithm = () => { + return Promise.$resolve(); + }; + let closeAlgorithm = () => { + return Promise.$resolve(); + }; + let abortAlgorithm = () => { + return Promise.$resolve(); + }; + + if ("start" in underlyingSinkDict) { + const startMethod = underlyingSinkDict["start"]; + startAlgorithm = () => $promiseInvokeOrNoopMethodNoCatch(underlyingSink, startMethod, [controller]); + } + if ("write" in underlyingSinkDict) { + const writeMethod = underlyingSinkDict["write"]; + writeAlgorithm = chunk => $promiseInvokeOrNoopMethod(underlyingSink, writeMethod, [chunk, controller]); + } + if ("close" in underlyingSinkDict) { + const closeMethod = underlyingSinkDict["close"]; + closeAlgorithm = () => $promiseInvokeOrNoopMethod(underlyingSink, closeMethod, []); + } + if ("abort" in underlyingSinkDict) { + const abortMethod = underlyingSinkDict["abort"]; + abortAlgorithm = reason => $promiseInvokeOrNoopMethod(underlyingSink, abortMethod, [reason]); + } + + $setUpWritableStreamDefaultController( + stream, + controller, + startAlgorithm, + writeAlgorithm, + closeAlgorithm, + abortAlgorithm, + highWaterMark, + sizeAlgorithm, + ); +} + +export function writableStreamDefaultControllerAdvanceQueueIfNeeded(controller) { + const stream = $getByIdDirectPrivate(controller, "stream"); + + if ($getByIdDirectPrivate(controller, "started") !== 1) return; + + $assert(stream !== undefined); + if ($getByIdDirectPrivate(stream, "inFlightWriteRequest") !== undefined) return; + + const state = $getByIdDirectPrivate(stream, "state"); + $assert(state !== "closed" || state !== "errored"); + if (state === "erroring") { + $writableStreamFinishErroring(stream); + return; + } + + const queue = $getByIdDirectPrivate(controller, "queue"); + + if (queue.content?.isEmpty() ?? false) return; + + const value = $peekQueueValue(queue); + if (value === $isCloseSentinel) $writableStreamDefaultControllerProcessClose(controller); + else $writableStreamDefaultControllerProcessWrite(controller, value); +} + +export function isCloseSentinel() {} + +export function writableStreamDefaultControllerClearAlgorithms(controller) { + $putByIdDirectPrivate(controller, "writeAlgorithm", undefined); + $putByIdDirectPrivate(controller, "closeAlgorithm", undefined); + $putByIdDirectPrivate(controller, "abortAlgorithm", undefined); + $putByIdDirectPrivate(controller, "strategySizeAlgorithm", undefined); +} + +export function writableStreamDefaultControllerClose(controller) { + $enqueueValueWithSize($getByIdDirectPrivate(controller, "queue"), $isCloseSentinel, 0); + $writableStreamDefaultControllerAdvanceQueueIfNeeded(controller); +} + +export function writableStreamDefaultControllerError(controller, error) { + const stream = $getByIdDirectPrivate(controller, "stream"); + $assert(stream !== undefined); + $assert($getByIdDirectPrivate(stream, "state") === "writable"); + + $writableStreamDefaultControllerClearAlgorithms(controller); + $writableStreamStartErroring(stream, error); +} + +export function writableStreamDefaultControllerErrorIfNeeded(controller, error) { + const stream = $getByIdDirectPrivate(controller, "stream"); + if ($getByIdDirectPrivate(stream, "state") === "writable") $writableStreamDefaultControllerError(controller, error); +} + +export function writableStreamDefaultControllerGetBackpressure(controller) { + const desiredSize = $writableStreamDefaultControllerGetDesiredSize(controller); + return desiredSize <= 0; +} + +export function writableStreamDefaultControllerGetChunkSize(controller, chunk) { + try { + return $getByIdDirectPrivate(controller, "strategySizeAlgorithm").$call(undefined, chunk); + } catch (e) { + $writableStreamDefaultControllerErrorIfNeeded(controller, e); + return 1; + } +} + +export function writableStreamDefaultControllerGetDesiredSize(controller) { + return $getByIdDirectPrivate(controller, "strategyHWM") - $getByIdDirectPrivate(controller, "queue").size; +} + +export function writableStreamDefaultControllerProcessClose(controller) { + const stream = $getByIdDirectPrivate(controller, "stream"); + + $writableStreamMarkCloseRequestInFlight(stream); + $dequeueValue($getByIdDirectPrivate(controller, "queue")); + + $assert($getByIdDirectPrivate(controller, "queue").content?.isEmpty()); + + const sinkClosePromise = $getByIdDirectPrivate(controller, "closeAlgorithm").$call(); + $writableStreamDefaultControllerClearAlgorithms(controller); + + sinkClosePromise.$then( + () => { + $writableStreamFinishInFlightClose(stream); + }, + reason => { + $writableStreamFinishInFlightCloseWithError(stream, reason); + }, + ); +} + +export function writableStreamDefaultControllerProcessWrite(controller, chunk) { + const stream = $getByIdDirectPrivate(controller, "stream"); + + $writableStreamMarkFirstWriteRequestInFlight(stream); + + const sinkWritePromise = $getByIdDirectPrivate(controller, "writeAlgorithm").$call(undefined, chunk); + + sinkWritePromise.$then( + () => { + $writableStreamFinishInFlightWrite(stream); + const state = $getByIdDirectPrivate(stream, "state"); + $assert(state === "writable" || state === "erroring"); + + $dequeueValue($getByIdDirectPrivate(controller, "queue")); + if (!$writableStreamCloseQueuedOrInFlight(stream) && state === "writable") { + const backpressure = $writableStreamDefaultControllerGetBackpressure(controller); + $writableStreamUpdateBackpressure(stream, backpressure); + } + $writableStreamDefaultControllerAdvanceQueueIfNeeded(controller); + }, + reason => { + const state = $getByIdDirectPrivate(stream, "state"); + if (state === "writable") $writableStreamDefaultControllerClearAlgorithms(controller); + + $writableStreamFinishInFlightWriteWithError(stream, reason); + }, + ); +} + +export function writableStreamDefaultControllerWrite(controller, chunk, chunkSize) { + try { + $enqueueValueWithSize($getByIdDirectPrivate(controller, "queue"), chunk, chunkSize); + + const stream = $getByIdDirectPrivate(controller, "stream"); + + const state = $getByIdDirectPrivate(stream, "state"); + if (!$writableStreamCloseQueuedOrInFlight(stream) && state === "writable") { + const backpressure = $writableStreamDefaultControllerGetBackpressure(controller); + $writableStreamUpdateBackpressure(stream, backpressure); + } + $writableStreamDefaultControllerAdvanceQueueIfNeeded(controller); + } catch (e) { + $writableStreamDefaultControllerErrorIfNeeded(controller, e); + } +} |