diff options
Diffstat (limited to 'src/bun.js/builtins/ts/WritableStreamInternals.ts')
-rw-r--r-- | src/bun.js/builtins/ts/WritableStreamInternals.ts | 790 |
1 files changed, 0 insertions, 790 deletions
diff --git a/src/bun.js/builtins/ts/WritableStreamInternals.ts b/src/bun.js/builtins/ts/WritableStreamInternals.ts deleted file mode 100644 index f436a285e..000000000 --- a/src/bun.js/builtins/ts/WritableStreamInternals.ts +++ /dev/null @@ -1,790 +0,0 @@ -/* - * Copyright (C) 2015 Canon Inc. - * Copyright (C) 2015 Igalia - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions - * are met: - * 1. Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * 2. Redistributions in binary form must reproduce the above copyright - * notice, this list of conditions and the following disclaimer in the - * documentation and/or other materials provided with the distribution. - * - * THIS SOFTWARE IS PROVIDED BY APPLE INC. ``AS IS'' AND ANY - * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE - * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR - * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL APPLE INC. OR - * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, - * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, - * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR - * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY - * OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - */ - -// @internal - -export function isWritableStream(stream) { - return $isObject(stream) && !!$getByIdDirectPrivate(stream, "underlyingSink"); -} - -export function isWritableStreamDefaultWriter(writer) { - return $isObject(writer) && !!$getByIdDirectPrivate(writer, "closedPromise"); -} - -export function acquireWritableStreamDefaultWriter(stream) { - return new WritableStreamDefaultWriter(stream); -} - -// https://streams.spec.whatwg.org/#create-writable-stream -export function createWritableStream( - startAlgorithm, - writeAlgorithm, - closeAlgorithm, - abortAlgorithm, - highWaterMark, - sizeAlgorithm, -) { - $assert(typeof highWaterMark === "number" && !isNaN(highWaterMark) && highWaterMark >= 0); - - const internalStream = {}; - $initializeWritableStreamSlots(internalStream, {}); - const controller = new WritableStreamDefaultController(); - - $setUpWritableStreamDefaultController( - internalStream, - controller, - startAlgorithm, - writeAlgorithm, - closeAlgorithm, - abortAlgorithm, - highWaterMark, - sizeAlgorithm, - ); - - return $createWritableStreamFromInternal(internalStream); -} - -export function createInternalWritableStreamFromUnderlyingSink(underlyingSink, strategy) { - const stream = {}; - - if (underlyingSink === undefined) underlyingSink = {}; - - if (strategy === undefined) strategy = {}; - - if (!$isObject(underlyingSink)) $throwTypeError("WritableStream constructor takes an object as first argument"); - - if ("type" in underlyingSink) $throwRangeError("Invalid type is specified"); - - const sizeAlgorithm = $extractSizeAlgorithm(strategy); - const highWaterMark = $extractHighWaterMark(strategy, 1); - - const underlyingSinkDict = {}; - if ("start" in underlyingSink) { - underlyingSinkDict["start"] = underlyingSink["start"]; - if (typeof underlyingSinkDict["start"] !== "function") $throwTypeError("underlyingSink.start should be a function"); - } - if ("write" in underlyingSink) { - underlyingSinkDict["write"] = underlyingSink["write"]; - if (typeof underlyingSinkDict["write"] !== "function") $throwTypeError("underlyingSink.write should be a function"); - } - if ("close" in underlyingSink) { - underlyingSinkDict["close"] = underlyingSink["close"]; - if (typeof underlyingSinkDict["close"] !== "function") $throwTypeError("underlyingSink.close should be a function"); - } - if ("abort" in underlyingSink) { - underlyingSinkDict["abort"] = underlyingSink["abort"]; - if (typeof underlyingSinkDict["abort"] !== "function") $throwTypeError("underlyingSink.abort should be a function"); - } - - $initializeWritableStreamSlots(stream, underlyingSink); - $setUpWritableStreamDefaultControllerFromUnderlyingSink( - stream, - underlyingSink, - underlyingSinkDict, - highWaterMark, - sizeAlgorithm, - ); - - return stream; -} - -export function initializeWritableStreamSlots(stream, underlyingSink) { - $putByIdDirectPrivate(stream, "state", "writable"); - $putByIdDirectPrivate(stream, "storedError", undefined); - $putByIdDirectPrivate(stream, "writer", undefined); - $putByIdDirectPrivate(stream, "controller", undefined); - $putByIdDirectPrivate(stream, "inFlightWriteRequest", undefined); - $putByIdDirectPrivate(stream, "closeRequest", undefined); - $putByIdDirectPrivate(stream, "inFlightCloseRequest", undefined); - $putByIdDirectPrivate(stream, "pendingAbortRequest", undefined); - $putByIdDirectPrivate(stream, "writeRequests", $createFIFO()); - $putByIdDirectPrivate(stream, "backpressure", false); - $putByIdDirectPrivate(stream, "underlyingSink", underlyingSink); -} - -export function writableStreamCloseForBindings(stream) { - if ($isWritableStreamLocked(stream)) - return Promise.$reject($makeTypeError("WritableStream.close method can only be used on non locked WritableStream")); - - if ($writableStreamCloseQueuedOrInFlight(stream)) - return Promise.$reject( - $makeTypeError("WritableStream.close method can only be used on a being close WritableStream"), - ); - - return $writableStreamClose(stream); -} - -export function writableStreamAbortForBindings(stream, reason) { - if ($isWritableStreamLocked(stream)) - return Promise.$reject($makeTypeError("WritableStream.abort method can only be used on non locked WritableStream")); - - return $writableStreamAbort(stream, reason); -} - -export function isWritableStreamLocked(stream) { - return $getByIdDirectPrivate(stream, "writer") !== undefined; -} - -export function setUpWritableStreamDefaultWriter(writer, stream) { - if ($isWritableStreamLocked(stream)) $throwTypeError("WritableStream is locked"); - - $putByIdDirectPrivate(writer, "stream", stream); - $putByIdDirectPrivate(stream, "writer", writer); - - const readyPromiseCapability = $newPromiseCapability(Promise); - const closedPromiseCapability = $newPromiseCapability(Promise); - $putByIdDirectPrivate(writer, "readyPromise", readyPromiseCapability); - $putByIdDirectPrivate(writer, "closedPromise", closedPromiseCapability); - - const state = $getByIdDirectPrivate(stream, "state"); - if (state === "writable") { - if ($writableStreamCloseQueuedOrInFlight(stream) || !$getByIdDirectPrivate(stream, "backpressure")) - readyPromiseCapability.$resolve.$call(); - } else if (state === "erroring") { - readyPromiseCapability.$reject.$call(undefined, $getByIdDirectPrivate(stream, "storedError")); - $markPromiseAsHandled(readyPromiseCapability.$promise); - } else if (state === "closed") { - readyPromiseCapability.$resolve.$call(); - closedPromiseCapability.$resolve.$call(); - } else { - $assert(state === "errored"); - const storedError = $getByIdDirectPrivate(stream, "storedError"); - readyPromiseCapability.$reject.$call(undefined, storedError); - $markPromiseAsHandled(readyPromiseCapability.$promise); - closedPromiseCapability.$reject.$call(undefined, storedError); - $markPromiseAsHandled(closedPromiseCapability.$promise); - } -} - -export function writableStreamAbort(stream, reason) { - const state = $getByIdDirectPrivate(stream, "state"); - if (state === "closed" || state === "errored") return Promise.$resolve(); - - const pendingAbortRequest = $getByIdDirectPrivate(stream, "pendingAbortRequest"); - if (pendingAbortRequest !== undefined) return pendingAbortRequest.promise.$promise; - - $assert(state === "writable" || state === "erroring"); - let wasAlreadyErroring = false; - if (state === "erroring") { - wasAlreadyErroring = true; - reason = undefined; - } - - const abortPromiseCapability = $newPromiseCapability(Promise); - $putByIdDirectPrivate(stream, "pendingAbortRequest", { - promise: abortPromiseCapability, - reason: reason, - wasAlreadyErroring: wasAlreadyErroring, - }); - - if (!wasAlreadyErroring) $writableStreamStartErroring(stream, reason); - return abortPromiseCapability.$promise; -} - -export function writableStreamClose(stream) { - const state = $getByIdDirectPrivate(stream, "state"); - if (state === "closed" || state === "errored") - return Promise.$reject($makeTypeError("Cannot close a writable stream that is closed or errored")); - - $assert(state === "writable" || state === "erroring"); - $assert(!$writableStreamCloseQueuedOrInFlight(stream)); - - const closePromiseCapability = $newPromiseCapability(Promise); - $putByIdDirectPrivate(stream, "closeRequest", closePromiseCapability); - - const writer = $getByIdDirectPrivate(stream, "writer"); - if (writer !== undefined && $getByIdDirectPrivate(stream, "backpressure") && state === "writable") - $getByIdDirectPrivate(writer, "readyPromise").$resolve.$call(); - - $writableStreamDefaultControllerClose($getByIdDirectPrivate(stream, "controller")); - - return closePromiseCapability.$promise; -} - -export function writableStreamAddWriteRequest(stream) { - $assert($isWritableStreamLocked(stream)); - $assert($getByIdDirectPrivate(stream, "state") === "writable"); - - const writePromiseCapability = $newPromiseCapability(Promise); - const writeRequests = $getByIdDirectPrivate(stream, "writeRequests"); - writeRequests.push(writePromiseCapability); - return writePromiseCapability.$promise; -} - -export function writableStreamCloseQueuedOrInFlight(stream) { - return ( - $getByIdDirectPrivate(stream, "closeRequest") !== undefined || - $getByIdDirectPrivate(stream, "inFlightCloseRequest") !== undefined - ); -} - -export function writableStreamDealWithRejection(stream, error) { - const state = $getByIdDirectPrivate(stream, "state"); - if (state === "writable") { - $writableStreamStartErroring(stream, error); - return; - } - - $assert(state === "erroring"); - $writableStreamFinishErroring(stream); -} - -export function writableStreamFinishErroring(stream) { - $assert($getByIdDirectPrivate(stream, "state") === "erroring"); - $assert(!$writableStreamHasOperationMarkedInFlight(stream)); - - $putByIdDirectPrivate(stream, "state", "errored"); - - const controller = $getByIdDirectPrivate(stream, "controller"); - $getByIdDirectPrivate(controller, "errorSteps").$call(); - - const storedError = $getByIdDirectPrivate(stream, "storedError"); - const requests = $getByIdDirectPrivate(stream, "writeRequests"); - for (var request = requests.shift(); request; request = requests.shift()) - request.$reject.$call(undefined, storedError); - - // TODO: is this still necessary? - $putByIdDirectPrivate(stream, "writeRequests", $createFIFO()); - - const abortRequest = $getByIdDirectPrivate(stream, "pendingAbortRequest"); - if (abortRequest === undefined) { - $writableStreamRejectCloseAndClosedPromiseIfNeeded(stream); - return; - } - - $putByIdDirectPrivate(stream, "pendingAbortRequest", undefined); - if (abortRequest.wasAlreadyErroring) { - abortRequest.promise.$reject.$call(undefined, storedError); - $writableStreamRejectCloseAndClosedPromiseIfNeeded(stream); - return; - } - - $getByIdDirectPrivate(controller, "abortSteps") - .$call(undefined, abortRequest.reason) - .$then( - () => { - abortRequest.promise.$resolve.$call(); - $writableStreamRejectCloseAndClosedPromiseIfNeeded(stream); - }, - reason => { - abortRequest.promise.$reject.$call(undefined, reason); - $writableStreamRejectCloseAndClosedPromiseIfNeeded(stream); - }, - ); -} - -export function writableStreamFinishInFlightClose(stream) { - const inFlightCloseRequest = $getByIdDirectPrivate(stream, "inFlightCloseRequest"); - inFlightCloseRequest.$resolve.$call(); - - $putByIdDirectPrivate(stream, "inFlightCloseRequest", undefined); - - const state = $getByIdDirectPrivate(stream, "state"); - $assert(state === "writable" || state === "erroring"); - - if (state === "erroring") { - $putByIdDirectPrivate(stream, "storedError", undefined); - const abortRequest = $getByIdDirectPrivate(stream, "pendingAbortRequest"); - if (abortRequest !== undefined) { - abortRequest.promise.$resolve.$call(); - $putByIdDirectPrivate(stream, "pendingAbortRequest", undefined); - } - } - - $putByIdDirectPrivate(stream, "state", "closed"); - - const writer = $getByIdDirectPrivate(stream, "writer"); - if (writer !== undefined) $getByIdDirectPrivate(writer, "closedPromise").$resolve.$call(); - - $assert($getByIdDirectPrivate(stream, "pendingAbortRequest") === undefined); - $assert($getByIdDirectPrivate(stream, "storedError") === undefined); -} - -export function writableStreamFinishInFlightCloseWithError(stream, error) { - const inFlightCloseRequest = $getByIdDirectPrivate(stream, "inFlightCloseRequest"); - $assert(inFlightCloseRequest !== undefined); - inFlightCloseRequest.$reject.$call(undefined, error); - - $putByIdDirectPrivate(stream, "inFlightCloseRequest", undefined); - - const state = $getByIdDirectPrivate(stream, "state"); - $assert(state === "writable" || state === "erroring"); - - const abortRequest = $getByIdDirectPrivate(stream, "pendingAbortRequest"); - if (abortRequest !== undefined) { - abortRequest.promise.$reject.$call(undefined, error); - $putByIdDirectPrivate(stream, "pendingAbortRequest", undefined); - } - - $writableStreamDealWithRejection(stream, error); -} - -export function writableStreamFinishInFlightWrite(stream) { - const inFlightWriteRequest = $getByIdDirectPrivate(stream, "inFlightWriteRequest"); - $assert(inFlightWriteRequest !== undefined); - inFlightWriteRequest.$resolve.$call(); - - $putByIdDirectPrivate(stream, "inFlightWriteRequest", undefined); -} - -export function writableStreamFinishInFlightWriteWithError(stream, error) { - const inFlightWriteRequest = $getByIdDirectPrivate(stream, "inFlightWriteRequest"); - $assert(inFlightWriteRequest !== undefined); - inFlightWriteRequest.$reject.$call(undefined, error); - - $putByIdDirectPrivate(stream, "inFlightWriteRequest", undefined); - - const state = $getByIdDirectPrivate(stream, "state"); - $assert(state === "writable" || state === "erroring"); - - $writableStreamDealWithRejection(stream, error); -} - -export function writableStreamHasOperationMarkedInFlight(stream) { - return ( - $getByIdDirectPrivate(stream, "inFlightWriteRequest") !== undefined || - $getByIdDirectPrivate(stream, "inFlightCloseRequest") !== undefined - ); -} - -export function writableStreamMarkCloseRequestInFlight(stream) { - const closeRequest = $getByIdDirectPrivate(stream, "closeRequest"); - $assert($getByIdDirectPrivate(stream, "inFlightCloseRequest") === undefined); - $assert(closeRequest !== undefined); - - $putByIdDirectPrivate(stream, "inFlightCloseRequest", closeRequest); - $putByIdDirectPrivate(stream, "closeRequest", undefined); -} - -export function writableStreamMarkFirstWriteRequestInFlight(stream) { - const writeRequests = $getByIdDirectPrivate(stream, "writeRequests"); - $assert($getByIdDirectPrivate(stream, "inFlightWriteRequest") === undefined); - $assert(writeRequests.isNotEmpty()); - - const writeRequest = writeRequests.shift(); - $putByIdDirectPrivate(stream, "inFlightWriteRequest", writeRequest); -} - -export function writableStreamRejectCloseAndClosedPromiseIfNeeded(stream) { - $assert($getByIdDirectPrivate(stream, "state") === "errored"); - - const storedError = $getByIdDirectPrivate(stream, "storedError"); - - const closeRequest = $getByIdDirectPrivate(stream, "closeRequest"); - if (closeRequest !== undefined) { - $assert($getByIdDirectPrivate(stream, "inFlightCloseRequest") === undefined); - closeRequest.$reject.$call(undefined, storedError); - $putByIdDirectPrivate(stream, "closeRequest", undefined); - } - - const writer = $getByIdDirectPrivate(stream, "writer"); - if (writer !== undefined) { - const closedPromise = $getByIdDirectPrivate(writer, "closedPromise"); - closedPromise.$reject.$call(undefined, storedError); - $markPromiseAsHandled(closedPromise.$promise); - } -} - -export function writableStreamStartErroring(stream, reason) { - $assert($getByIdDirectPrivate(stream, "storedError") === undefined); - $assert($getByIdDirectPrivate(stream, "state") === "writable"); - - const controller = $getByIdDirectPrivate(stream, "controller"); - $assert(controller !== undefined); - - $putByIdDirectPrivate(stream, "state", "erroring"); - $putByIdDirectPrivate(stream, "storedError", reason); - - const writer = $getByIdDirectPrivate(stream, "writer"); - if (writer !== undefined) $writableStreamDefaultWriterEnsureReadyPromiseRejected(writer, reason); - - if (!$writableStreamHasOperationMarkedInFlight(stream) && $getByIdDirectPrivate(controller, "started") === 1) - $writableStreamFinishErroring(stream); -} - -export function writableStreamUpdateBackpressure(stream, backpressure) { - $assert($getByIdDirectPrivate(stream, "state") === "writable"); - $assert(!$writableStreamCloseQueuedOrInFlight(stream)); - - const writer = $getByIdDirectPrivate(stream, "writer"); - if (writer !== undefined && backpressure !== $getByIdDirectPrivate(stream, "backpressure")) { - if (backpressure) $putByIdDirectPrivate(writer, "readyPromise", $newPromiseCapability(Promise)); - else $getByIdDirectPrivate(writer, "readyPromise").$resolve.$call(); - } - $putByIdDirectPrivate(stream, "backpressure", backpressure); -} - -export function writableStreamDefaultWriterAbort(writer, reason) { - const stream = $getByIdDirectPrivate(writer, "stream"); - $assert(stream !== undefined); - return $writableStreamAbort(stream, reason); -} - -export function writableStreamDefaultWriterClose(writer) { - const stream = $getByIdDirectPrivate(writer, "stream"); - $assert(stream !== undefined); - return $writableStreamClose(stream); -} - -export function writableStreamDefaultWriterCloseWithErrorPropagation(writer) { - const stream = $getByIdDirectPrivate(writer, "stream"); - $assert(stream !== undefined); - - const state = $getByIdDirectPrivate(stream, "state"); - - if ($writableStreamCloseQueuedOrInFlight(stream) || state === "closed") return Promise.$resolve(); - - if (state === "errored") return Promise.$reject($getByIdDirectPrivate(stream, "storedError")); - - $assert(state === "writable" || state === "erroring"); - return $writableStreamDefaultWriterClose(writer); -} - -export function writableStreamDefaultWriterEnsureClosedPromiseRejected(writer, error) { - let closedPromiseCapability = $getByIdDirectPrivate(writer, "closedPromise"); - let closedPromise = closedPromiseCapability.$promise; - - if (($getPromiseInternalField(closedPromise, $promiseFieldFlags) & $promiseStateMask) !== $promiseStatePending) { - closedPromiseCapability = $newPromiseCapability(Promise); - closedPromise = closedPromiseCapability.$promise; - $putByIdDirectPrivate(writer, "closedPromise", closedPromiseCapability); - } - - closedPromiseCapability.$reject.$call(undefined, error); - $markPromiseAsHandled(closedPromise); -} - -export function writableStreamDefaultWriterEnsureReadyPromiseRejected(writer, error) { - let readyPromiseCapability = $getByIdDirectPrivate(writer, "readyPromise"); - let readyPromise = readyPromiseCapability.$promise; - - if (($getPromiseInternalField(readyPromise, $promiseFieldFlags) & $promiseStateMask) !== $promiseStatePending) { - readyPromiseCapability = $newPromiseCapability(Promise); - readyPromise = readyPromiseCapability.$promise; - $putByIdDirectPrivate(writer, "readyPromise", readyPromiseCapability); - } - - readyPromiseCapability.$reject.$call(undefined, error); - $markPromiseAsHandled(readyPromise); -} - -export function writableStreamDefaultWriterGetDesiredSize(writer) { - const stream = $getByIdDirectPrivate(writer, "stream"); - $assert(stream !== undefined); - - const state = $getByIdDirectPrivate(stream, "state"); - - if (state === "errored" || state === "erroring") return null; - - if (state === "closed") return 0; - - return $writableStreamDefaultControllerGetDesiredSize($getByIdDirectPrivate(stream, "controller")); -} - -export function writableStreamDefaultWriterRelease(writer) { - const stream = $getByIdDirectPrivate(writer, "stream"); - $assert(stream !== undefined); - $assert($getByIdDirectPrivate(stream, "writer") === writer); - - const releasedError = $makeTypeError("writableStreamDefaultWriterRelease"); - - $writableStreamDefaultWriterEnsureReadyPromiseRejected(writer, releasedError); - $writableStreamDefaultWriterEnsureClosedPromiseRejected(writer, releasedError); - - $putByIdDirectPrivate(stream, "writer", undefined); - $putByIdDirectPrivate(writer, "stream", undefined); -} - -export function writableStreamDefaultWriterWrite(writer, chunk) { - const stream = $getByIdDirectPrivate(writer, "stream"); - $assert(stream !== undefined); - - const controller = $getByIdDirectPrivate(stream, "controller"); - $assert(controller !== undefined); - const chunkSize = $writableStreamDefaultControllerGetChunkSize(controller, chunk); - - if (stream !== $getByIdDirectPrivate(writer, "stream")) - return Promise.$reject($makeTypeError("writer is not stream's writer")); - - const state = $getByIdDirectPrivate(stream, "state"); - if (state === "errored") return Promise.$reject($getByIdDirectPrivate(stream, "storedError")); - - if ($writableStreamCloseQueuedOrInFlight(stream) || state === "closed") - return Promise.$reject($makeTypeError("stream is closing or closed")); - - if ($writableStreamCloseQueuedOrInFlight(stream) || state === "closed") - return Promise.$reject($makeTypeError("stream is closing or closed")); - - if (state === "erroring") return Promise.$reject($getByIdDirectPrivate(stream, "storedError")); - - $assert(state === "writable"); - - const promise = $writableStreamAddWriteRequest(stream); - $writableStreamDefaultControllerWrite(controller, chunk, chunkSize); - return promise; -} - -export function setUpWritableStreamDefaultController( - stream, - controller, - startAlgorithm, - writeAlgorithm, - closeAlgorithm, - abortAlgorithm, - highWaterMark, - sizeAlgorithm, -) { - $assert($isWritableStream(stream)); - $assert($getByIdDirectPrivate(stream, "controller") === undefined); - - $putByIdDirectPrivate(controller, "stream", stream); - $putByIdDirectPrivate(stream, "controller", controller); - - $resetQueue($getByIdDirectPrivate(controller, "queue")); - - $putByIdDirectPrivate(controller, "started", -1); - $putByIdDirectPrivate(controller, "startAlgorithm", startAlgorithm); - $putByIdDirectPrivate(controller, "strategySizeAlgorithm", sizeAlgorithm); - $putByIdDirectPrivate(controller, "strategyHWM", highWaterMark); - $putByIdDirectPrivate(controller, "writeAlgorithm", writeAlgorithm); - $putByIdDirectPrivate(controller, "closeAlgorithm", closeAlgorithm); - $putByIdDirectPrivate(controller, "abortAlgorithm", abortAlgorithm); - - const backpressure = $writableStreamDefaultControllerGetBackpressure(controller); - $writableStreamUpdateBackpressure(stream, backpressure); - - $writableStreamDefaultControllerStart(controller); -} - -export function writableStreamDefaultControllerStart(controller) { - if ($getByIdDirectPrivate(controller, "started") !== -1) return; - - $putByIdDirectPrivate(controller, "started", 0); - - const startAlgorithm = $getByIdDirectPrivate(controller, "startAlgorithm"); - $putByIdDirectPrivate(controller, "startAlgorithm", undefined); - const stream = $getByIdDirectPrivate(controller, "stream"); - return Promise.$resolve(startAlgorithm.$call()).$then( - () => { - const state = $getByIdDirectPrivate(stream, "state"); - $assert(state === "writable" || state === "erroring"); - $putByIdDirectPrivate(controller, "started", 1); - $writableStreamDefaultControllerAdvanceQueueIfNeeded(controller); - }, - error => { - const state = $getByIdDirectPrivate(stream, "state"); - $assert(state === "writable" || state === "erroring"); - $putByIdDirectPrivate(controller, "started", 1); - $writableStreamDealWithRejection(stream, error); - }, - ); -} - -export function setUpWritableStreamDefaultControllerFromUnderlyingSink( - stream, - underlyingSink, - underlyingSinkDict, - highWaterMark, - sizeAlgorithm, -) { - const controller = new $WritableStreamDefaultController(); - - let startAlgorithm = () => {}; - let writeAlgorithm = () => { - return Promise.$resolve(); - }; - let closeAlgorithm = () => { - return Promise.$resolve(); - }; - let abortAlgorithm = () => { - return Promise.$resolve(); - }; - - if ("start" in underlyingSinkDict) { - const startMethod = underlyingSinkDict["start"]; - startAlgorithm = () => $promiseInvokeOrNoopMethodNoCatch(underlyingSink, startMethod, [controller]); - } - if ("write" in underlyingSinkDict) { - const writeMethod = underlyingSinkDict["write"]; - writeAlgorithm = chunk => $promiseInvokeOrNoopMethod(underlyingSink, writeMethod, [chunk, controller]); - } - if ("close" in underlyingSinkDict) { - const closeMethod = underlyingSinkDict["close"]; - closeAlgorithm = () => $promiseInvokeOrNoopMethod(underlyingSink, closeMethod, []); - } - if ("abort" in underlyingSinkDict) { - const abortMethod = underlyingSinkDict["abort"]; - abortAlgorithm = reason => $promiseInvokeOrNoopMethod(underlyingSink, abortMethod, [reason]); - } - - $setUpWritableStreamDefaultController( - stream, - controller, - startAlgorithm, - writeAlgorithm, - closeAlgorithm, - abortAlgorithm, - highWaterMark, - sizeAlgorithm, - ); -} - -export function writableStreamDefaultControllerAdvanceQueueIfNeeded(controller) { - const stream = $getByIdDirectPrivate(controller, "stream"); - - if ($getByIdDirectPrivate(controller, "started") !== 1) return; - - $assert(stream !== undefined); - if ($getByIdDirectPrivate(stream, "inFlightWriteRequest") !== undefined) return; - - const state = $getByIdDirectPrivate(stream, "state"); - $assert(state !== "closed" || state !== "errored"); - if (state === "erroring") { - $writableStreamFinishErroring(stream); - return; - } - - const queue = $getByIdDirectPrivate(controller, "queue"); - - if (queue.content?.isEmpty() ?? false) return; - - const value = $peekQueueValue(queue); - if (value === $isCloseSentinel) $writableStreamDefaultControllerProcessClose(controller); - else $writableStreamDefaultControllerProcessWrite(controller, value); -} - -export function isCloseSentinel() {} - -export function writableStreamDefaultControllerClearAlgorithms(controller) { - $putByIdDirectPrivate(controller, "writeAlgorithm", undefined); - $putByIdDirectPrivate(controller, "closeAlgorithm", undefined); - $putByIdDirectPrivate(controller, "abortAlgorithm", undefined); - $putByIdDirectPrivate(controller, "strategySizeAlgorithm", undefined); -} - -export function writableStreamDefaultControllerClose(controller) { - $enqueueValueWithSize($getByIdDirectPrivate(controller, "queue"), $isCloseSentinel, 0); - $writableStreamDefaultControllerAdvanceQueueIfNeeded(controller); -} - -export function writableStreamDefaultControllerError(controller, error) { - const stream = $getByIdDirectPrivate(controller, "stream"); - $assert(stream !== undefined); - $assert($getByIdDirectPrivate(stream, "state") === "writable"); - - $writableStreamDefaultControllerClearAlgorithms(controller); - $writableStreamStartErroring(stream, error); -} - -export function writableStreamDefaultControllerErrorIfNeeded(controller, error) { - const stream = $getByIdDirectPrivate(controller, "stream"); - if ($getByIdDirectPrivate(stream, "state") === "writable") $writableStreamDefaultControllerError(controller, error); -} - -export function writableStreamDefaultControllerGetBackpressure(controller) { - const desiredSize = $writableStreamDefaultControllerGetDesiredSize(controller); - return desiredSize <= 0; -} - -export function writableStreamDefaultControllerGetChunkSize(controller, chunk) { - try { - return $getByIdDirectPrivate(controller, "strategySizeAlgorithm").$call(undefined, chunk); - } catch (e) { - $writableStreamDefaultControllerErrorIfNeeded(controller, e); - return 1; - } -} - -export function writableStreamDefaultControllerGetDesiredSize(controller) { - return $getByIdDirectPrivate(controller, "strategyHWM") - $getByIdDirectPrivate(controller, "queue").size; -} - -export function writableStreamDefaultControllerProcessClose(controller) { - const stream = $getByIdDirectPrivate(controller, "stream"); - - $writableStreamMarkCloseRequestInFlight(stream); - $dequeueValue($getByIdDirectPrivate(controller, "queue")); - - $assert($getByIdDirectPrivate(controller, "queue").content?.isEmpty()); - - const sinkClosePromise = $getByIdDirectPrivate(controller, "closeAlgorithm").$call(); - $writableStreamDefaultControllerClearAlgorithms(controller); - - sinkClosePromise.$then( - () => { - $writableStreamFinishInFlightClose(stream); - }, - reason => { - $writableStreamFinishInFlightCloseWithError(stream, reason); - }, - ); -} - -export function writableStreamDefaultControllerProcessWrite(controller, chunk) { - const stream = $getByIdDirectPrivate(controller, "stream"); - - $writableStreamMarkFirstWriteRequestInFlight(stream); - - const sinkWritePromise = $getByIdDirectPrivate(controller, "writeAlgorithm").$call(undefined, chunk); - - sinkWritePromise.$then( - () => { - $writableStreamFinishInFlightWrite(stream); - const state = $getByIdDirectPrivate(stream, "state"); - $assert(state === "writable" || state === "erroring"); - - $dequeueValue($getByIdDirectPrivate(controller, "queue")); - if (!$writableStreamCloseQueuedOrInFlight(stream) && state === "writable") { - const backpressure = $writableStreamDefaultControllerGetBackpressure(controller); - $writableStreamUpdateBackpressure(stream, backpressure); - } - $writableStreamDefaultControllerAdvanceQueueIfNeeded(controller); - }, - reason => { - const state = $getByIdDirectPrivate(stream, "state"); - if (state === "writable") $writableStreamDefaultControllerClearAlgorithms(controller); - - $writableStreamFinishInFlightWriteWithError(stream, reason); - }, - ); -} - -export function writableStreamDefaultControllerWrite(controller, chunk, chunkSize) { - try { - $enqueueValueWithSize($getByIdDirectPrivate(controller, "queue"), chunk, chunkSize); - - const stream = $getByIdDirectPrivate(controller, "stream"); - - const state = $getByIdDirectPrivate(stream, "state"); - if (!$writableStreamCloseQueuedOrInFlight(stream) && state === "writable") { - const backpressure = $writableStreamDefaultControllerGetBackpressure(controller); - $writableStreamUpdateBackpressure(stream, backpressure); - } - $writableStreamDefaultControllerAdvanceQueueIfNeeded(controller); - } catch (e) { - $writableStreamDefaultControllerErrorIfNeeded(controller, e); - } -} |