aboutsummaryrefslogtreecommitdiff
path: root/src/js/builtins/WritableStreamInternals.ts
diff options
context:
space:
mode:
Diffstat (limited to 'src/js/builtins/WritableStreamInternals.ts')
-rw-r--r--src/js/builtins/WritableStreamInternals.ts790
1 files changed, 790 insertions, 0 deletions
diff --git a/src/js/builtins/WritableStreamInternals.ts b/src/js/builtins/WritableStreamInternals.ts
new file mode 100644
index 000000000..f436a285e
--- /dev/null
+++ b/src/js/builtins/WritableStreamInternals.ts
@@ -0,0 +1,790 @@
+/*
+ * Copyright (C) 2015 Canon Inc.
+ * Copyright (C) 2015 Igalia
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ * 1. Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY APPLE INC. ``AS IS'' AND ANY
+ * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+ * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL APPLE INC. OR
+ * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
+ * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
+ * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
+ * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY
+ * OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+// @internal
+
+export function isWritableStream(stream) {
+ return $isObject(stream) && !!$getByIdDirectPrivate(stream, "underlyingSink");
+}
+
+export function isWritableStreamDefaultWriter(writer) {
+ return $isObject(writer) && !!$getByIdDirectPrivate(writer, "closedPromise");
+}
+
+export function acquireWritableStreamDefaultWriter(stream) {
+ return new WritableStreamDefaultWriter(stream);
+}
+
+// https://streams.spec.whatwg.org/#create-writable-stream
+export function createWritableStream(
+ startAlgorithm,
+ writeAlgorithm,
+ closeAlgorithm,
+ abortAlgorithm,
+ highWaterMark,
+ sizeAlgorithm,
+) {
+ $assert(typeof highWaterMark === "number" && !isNaN(highWaterMark) && highWaterMark >= 0);
+
+ const internalStream = {};
+ $initializeWritableStreamSlots(internalStream, {});
+ const controller = new WritableStreamDefaultController();
+
+ $setUpWritableStreamDefaultController(
+ internalStream,
+ controller,
+ startAlgorithm,
+ writeAlgorithm,
+ closeAlgorithm,
+ abortAlgorithm,
+ highWaterMark,
+ sizeAlgorithm,
+ );
+
+ return $createWritableStreamFromInternal(internalStream);
+}
+
+export function createInternalWritableStreamFromUnderlyingSink(underlyingSink, strategy) {
+ const stream = {};
+
+ if (underlyingSink === undefined) underlyingSink = {};
+
+ if (strategy === undefined) strategy = {};
+
+ if (!$isObject(underlyingSink)) $throwTypeError("WritableStream constructor takes an object as first argument");
+
+ if ("type" in underlyingSink) $throwRangeError("Invalid type is specified");
+
+ const sizeAlgorithm = $extractSizeAlgorithm(strategy);
+ const highWaterMark = $extractHighWaterMark(strategy, 1);
+
+ const underlyingSinkDict = {};
+ if ("start" in underlyingSink) {
+ underlyingSinkDict["start"] = underlyingSink["start"];
+ if (typeof underlyingSinkDict["start"] !== "function") $throwTypeError("underlyingSink.start should be a function");
+ }
+ if ("write" in underlyingSink) {
+ underlyingSinkDict["write"] = underlyingSink["write"];
+ if (typeof underlyingSinkDict["write"] !== "function") $throwTypeError("underlyingSink.write should be a function");
+ }
+ if ("close" in underlyingSink) {
+ underlyingSinkDict["close"] = underlyingSink["close"];
+ if (typeof underlyingSinkDict["close"] !== "function") $throwTypeError("underlyingSink.close should be a function");
+ }
+ if ("abort" in underlyingSink) {
+ underlyingSinkDict["abort"] = underlyingSink["abort"];
+ if (typeof underlyingSinkDict["abort"] !== "function") $throwTypeError("underlyingSink.abort should be a function");
+ }
+
+ $initializeWritableStreamSlots(stream, underlyingSink);
+ $setUpWritableStreamDefaultControllerFromUnderlyingSink(
+ stream,
+ underlyingSink,
+ underlyingSinkDict,
+ highWaterMark,
+ sizeAlgorithm,
+ );
+
+ return stream;
+}
+
+export function initializeWritableStreamSlots(stream, underlyingSink) {
+ $putByIdDirectPrivate(stream, "state", "writable");
+ $putByIdDirectPrivate(stream, "storedError", undefined);
+ $putByIdDirectPrivate(stream, "writer", undefined);
+ $putByIdDirectPrivate(stream, "controller", undefined);
+ $putByIdDirectPrivate(stream, "inFlightWriteRequest", undefined);
+ $putByIdDirectPrivate(stream, "closeRequest", undefined);
+ $putByIdDirectPrivate(stream, "inFlightCloseRequest", undefined);
+ $putByIdDirectPrivate(stream, "pendingAbortRequest", undefined);
+ $putByIdDirectPrivate(stream, "writeRequests", $createFIFO());
+ $putByIdDirectPrivate(stream, "backpressure", false);
+ $putByIdDirectPrivate(stream, "underlyingSink", underlyingSink);
+}
+
+export function writableStreamCloseForBindings(stream) {
+ if ($isWritableStreamLocked(stream))
+ return Promise.$reject($makeTypeError("WritableStream.close method can only be used on non locked WritableStream"));
+
+ if ($writableStreamCloseQueuedOrInFlight(stream))
+ return Promise.$reject(
+ $makeTypeError("WritableStream.close method can only be used on a being close WritableStream"),
+ );
+
+ return $writableStreamClose(stream);
+}
+
+export function writableStreamAbortForBindings(stream, reason) {
+ if ($isWritableStreamLocked(stream))
+ return Promise.$reject($makeTypeError("WritableStream.abort method can only be used on non locked WritableStream"));
+
+ return $writableStreamAbort(stream, reason);
+}
+
+export function isWritableStreamLocked(stream) {
+ return $getByIdDirectPrivate(stream, "writer") !== undefined;
+}
+
+export function setUpWritableStreamDefaultWriter(writer, stream) {
+ if ($isWritableStreamLocked(stream)) $throwTypeError("WritableStream is locked");
+
+ $putByIdDirectPrivate(writer, "stream", stream);
+ $putByIdDirectPrivate(stream, "writer", writer);
+
+ const readyPromiseCapability = $newPromiseCapability(Promise);
+ const closedPromiseCapability = $newPromiseCapability(Promise);
+ $putByIdDirectPrivate(writer, "readyPromise", readyPromiseCapability);
+ $putByIdDirectPrivate(writer, "closedPromise", closedPromiseCapability);
+
+ const state = $getByIdDirectPrivate(stream, "state");
+ if (state === "writable") {
+ if ($writableStreamCloseQueuedOrInFlight(stream) || !$getByIdDirectPrivate(stream, "backpressure"))
+ readyPromiseCapability.$resolve.$call();
+ } else if (state === "erroring") {
+ readyPromiseCapability.$reject.$call(undefined, $getByIdDirectPrivate(stream, "storedError"));
+ $markPromiseAsHandled(readyPromiseCapability.$promise);
+ } else if (state === "closed") {
+ readyPromiseCapability.$resolve.$call();
+ closedPromiseCapability.$resolve.$call();
+ } else {
+ $assert(state === "errored");
+ const storedError = $getByIdDirectPrivate(stream, "storedError");
+ readyPromiseCapability.$reject.$call(undefined, storedError);
+ $markPromiseAsHandled(readyPromiseCapability.$promise);
+ closedPromiseCapability.$reject.$call(undefined, storedError);
+ $markPromiseAsHandled(closedPromiseCapability.$promise);
+ }
+}
+
+export function writableStreamAbort(stream, reason) {
+ const state = $getByIdDirectPrivate(stream, "state");
+ if (state === "closed" || state === "errored") return Promise.$resolve();
+
+ const pendingAbortRequest = $getByIdDirectPrivate(stream, "pendingAbortRequest");
+ if (pendingAbortRequest !== undefined) return pendingAbortRequest.promise.$promise;
+
+ $assert(state === "writable" || state === "erroring");
+ let wasAlreadyErroring = false;
+ if (state === "erroring") {
+ wasAlreadyErroring = true;
+ reason = undefined;
+ }
+
+ const abortPromiseCapability = $newPromiseCapability(Promise);
+ $putByIdDirectPrivate(stream, "pendingAbortRequest", {
+ promise: abortPromiseCapability,
+ reason: reason,
+ wasAlreadyErroring: wasAlreadyErroring,
+ });
+
+ if (!wasAlreadyErroring) $writableStreamStartErroring(stream, reason);
+ return abortPromiseCapability.$promise;
+}
+
+export function writableStreamClose(stream) {
+ const state = $getByIdDirectPrivate(stream, "state");
+ if (state === "closed" || state === "errored")
+ return Promise.$reject($makeTypeError("Cannot close a writable stream that is closed or errored"));
+
+ $assert(state === "writable" || state === "erroring");
+ $assert(!$writableStreamCloseQueuedOrInFlight(stream));
+
+ const closePromiseCapability = $newPromiseCapability(Promise);
+ $putByIdDirectPrivate(stream, "closeRequest", closePromiseCapability);
+
+ const writer = $getByIdDirectPrivate(stream, "writer");
+ if (writer !== undefined && $getByIdDirectPrivate(stream, "backpressure") && state === "writable")
+ $getByIdDirectPrivate(writer, "readyPromise").$resolve.$call();
+
+ $writableStreamDefaultControllerClose($getByIdDirectPrivate(stream, "controller"));
+
+ return closePromiseCapability.$promise;
+}
+
+export function writableStreamAddWriteRequest(stream) {
+ $assert($isWritableStreamLocked(stream));
+ $assert($getByIdDirectPrivate(stream, "state") === "writable");
+
+ const writePromiseCapability = $newPromiseCapability(Promise);
+ const writeRequests = $getByIdDirectPrivate(stream, "writeRequests");
+ writeRequests.push(writePromiseCapability);
+ return writePromiseCapability.$promise;
+}
+
+export function writableStreamCloseQueuedOrInFlight(stream) {
+ return (
+ $getByIdDirectPrivate(stream, "closeRequest") !== undefined ||
+ $getByIdDirectPrivate(stream, "inFlightCloseRequest") !== undefined
+ );
+}
+
+export function writableStreamDealWithRejection(stream, error) {
+ const state = $getByIdDirectPrivate(stream, "state");
+ if (state === "writable") {
+ $writableStreamStartErroring(stream, error);
+ return;
+ }
+
+ $assert(state === "erroring");
+ $writableStreamFinishErroring(stream);
+}
+
+export function writableStreamFinishErroring(stream) {
+ $assert($getByIdDirectPrivate(stream, "state") === "erroring");
+ $assert(!$writableStreamHasOperationMarkedInFlight(stream));
+
+ $putByIdDirectPrivate(stream, "state", "errored");
+
+ const controller = $getByIdDirectPrivate(stream, "controller");
+ $getByIdDirectPrivate(controller, "errorSteps").$call();
+
+ const storedError = $getByIdDirectPrivate(stream, "storedError");
+ const requests = $getByIdDirectPrivate(stream, "writeRequests");
+ for (var request = requests.shift(); request; request = requests.shift())
+ request.$reject.$call(undefined, storedError);
+
+ // TODO: is this still necessary?
+ $putByIdDirectPrivate(stream, "writeRequests", $createFIFO());
+
+ const abortRequest = $getByIdDirectPrivate(stream, "pendingAbortRequest");
+ if (abortRequest === undefined) {
+ $writableStreamRejectCloseAndClosedPromiseIfNeeded(stream);
+ return;
+ }
+
+ $putByIdDirectPrivate(stream, "pendingAbortRequest", undefined);
+ if (abortRequest.wasAlreadyErroring) {
+ abortRequest.promise.$reject.$call(undefined, storedError);
+ $writableStreamRejectCloseAndClosedPromiseIfNeeded(stream);
+ return;
+ }
+
+ $getByIdDirectPrivate(controller, "abortSteps")
+ .$call(undefined, abortRequest.reason)
+ .$then(
+ () => {
+ abortRequest.promise.$resolve.$call();
+ $writableStreamRejectCloseAndClosedPromiseIfNeeded(stream);
+ },
+ reason => {
+ abortRequest.promise.$reject.$call(undefined, reason);
+ $writableStreamRejectCloseAndClosedPromiseIfNeeded(stream);
+ },
+ );
+}
+
+export function writableStreamFinishInFlightClose(stream) {
+ const inFlightCloseRequest = $getByIdDirectPrivate(stream, "inFlightCloseRequest");
+ inFlightCloseRequest.$resolve.$call();
+
+ $putByIdDirectPrivate(stream, "inFlightCloseRequest", undefined);
+
+ const state = $getByIdDirectPrivate(stream, "state");
+ $assert(state === "writable" || state === "erroring");
+
+ if (state === "erroring") {
+ $putByIdDirectPrivate(stream, "storedError", undefined);
+ const abortRequest = $getByIdDirectPrivate(stream, "pendingAbortRequest");
+ if (abortRequest !== undefined) {
+ abortRequest.promise.$resolve.$call();
+ $putByIdDirectPrivate(stream, "pendingAbortRequest", undefined);
+ }
+ }
+
+ $putByIdDirectPrivate(stream, "state", "closed");
+
+ const writer = $getByIdDirectPrivate(stream, "writer");
+ if (writer !== undefined) $getByIdDirectPrivate(writer, "closedPromise").$resolve.$call();
+
+ $assert($getByIdDirectPrivate(stream, "pendingAbortRequest") === undefined);
+ $assert($getByIdDirectPrivate(stream, "storedError") === undefined);
+}
+
+export function writableStreamFinishInFlightCloseWithError(stream, error) {
+ const inFlightCloseRequest = $getByIdDirectPrivate(stream, "inFlightCloseRequest");
+ $assert(inFlightCloseRequest !== undefined);
+ inFlightCloseRequest.$reject.$call(undefined, error);
+
+ $putByIdDirectPrivate(stream, "inFlightCloseRequest", undefined);
+
+ const state = $getByIdDirectPrivate(stream, "state");
+ $assert(state === "writable" || state === "erroring");
+
+ const abortRequest = $getByIdDirectPrivate(stream, "pendingAbortRequest");
+ if (abortRequest !== undefined) {
+ abortRequest.promise.$reject.$call(undefined, error);
+ $putByIdDirectPrivate(stream, "pendingAbortRequest", undefined);
+ }
+
+ $writableStreamDealWithRejection(stream, error);
+}
+
+export function writableStreamFinishInFlightWrite(stream) {
+ const inFlightWriteRequest = $getByIdDirectPrivate(stream, "inFlightWriteRequest");
+ $assert(inFlightWriteRequest !== undefined);
+ inFlightWriteRequest.$resolve.$call();
+
+ $putByIdDirectPrivate(stream, "inFlightWriteRequest", undefined);
+}
+
+export function writableStreamFinishInFlightWriteWithError(stream, error) {
+ const inFlightWriteRequest = $getByIdDirectPrivate(stream, "inFlightWriteRequest");
+ $assert(inFlightWriteRequest !== undefined);
+ inFlightWriteRequest.$reject.$call(undefined, error);
+
+ $putByIdDirectPrivate(stream, "inFlightWriteRequest", undefined);
+
+ const state = $getByIdDirectPrivate(stream, "state");
+ $assert(state === "writable" || state === "erroring");
+
+ $writableStreamDealWithRejection(stream, error);
+}
+
+export function writableStreamHasOperationMarkedInFlight(stream) {
+ return (
+ $getByIdDirectPrivate(stream, "inFlightWriteRequest") !== undefined ||
+ $getByIdDirectPrivate(stream, "inFlightCloseRequest") !== undefined
+ );
+}
+
+export function writableStreamMarkCloseRequestInFlight(stream) {
+ const closeRequest = $getByIdDirectPrivate(stream, "closeRequest");
+ $assert($getByIdDirectPrivate(stream, "inFlightCloseRequest") === undefined);
+ $assert(closeRequest !== undefined);
+
+ $putByIdDirectPrivate(stream, "inFlightCloseRequest", closeRequest);
+ $putByIdDirectPrivate(stream, "closeRequest", undefined);
+}
+
+export function writableStreamMarkFirstWriteRequestInFlight(stream) {
+ const writeRequests = $getByIdDirectPrivate(stream, "writeRequests");
+ $assert($getByIdDirectPrivate(stream, "inFlightWriteRequest") === undefined);
+ $assert(writeRequests.isNotEmpty());
+
+ const writeRequest = writeRequests.shift();
+ $putByIdDirectPrivate(stream, "inFlightWriteRequest", writeRequest);
+}
+
+export function writableStreamRejectCloseAndClosedPromiseIfNeeded(stream) {
+ $assert($getByIdDirectPrivate(stream, "state") === "errored");
+
+ const storedError = $getByIdDirectPrivate(stream, "storedError");
+
+ const closeRequest = $getByIdDirectPrivate(stream, "closeRequest");
+ if (closeRequest !== undefined) {
+ $assert($getByIdDirectPrivate(stream, "inFlightCloseRequest") === undefined);
+ closeRequest.$reject.$call(undefined, storedError);
+ $putByIdDirectPrivate(stream, "closeRequest", undefined);
+ }
+
+ const writer = $getByIdDirectPrivate(stream, "writer");
+ if (writer !== undefined) {
+ const closedPromise = $getByIdDirectPrivate(writer, "closedPromise");
+ closedPromise.$reject.$call(undefined, storedError);
+ $markPromiseAsHandled(closedPromise.$promise);
+ }
+}
+
+export function writableStreamStartErroring(stream, reason) {
+ $assert($getByIdDirectPrivate(stream, "storedError") === undefined);
+ $assert($getByIdDirectPrivate(stream, "state") === "writable");
+
+ const controller = $getByIdDirectPrivate(stream, "controller");
+ $assert(controller !== undefined);
+
+ $putByIdDirectPrivate(stream, "state", "erroring");
+ $putByIdDirectPrivate(stream, "storedError", reason);
+
+ const writer = $getByIdDirectPrivate(stream, "writer");
+ if (writer !== undefined) $writableStreamDefaultWriterEnsureReadyPromiseRejected(writer, reason);
+
+ if (!$writableStreamHasOperationMarkedInFlight(stream) && $getByIdDirectPrivate(controller, "started") === 1)
+ $writableStreamFinishErroring(stream);
+}
+
+export function writableStreamUpdateBackpressure(stream, backpressure) {
+ $assert($getByIdDirectPrivate(stream, "state") === "writable");
+ $assert(!$writableStreamCloseQueuedOrInFlight(stream));
+
+ const writer = $getByIdDirectPrivate(stream, "writer");
+ if (writer !== undefined && backpressure !== $getByIdDirectPrivate(stream, "backpressure")) {
+ if (backpressure) $putByIdDirectPrivate(writer, "readyPromise", $newPromiseCapability(Promise));
+ else $getByIdDirectPrivate(writer, "readyPromise").$resolve.$call();
+ }
+ $putByIdDirectPrivate(stream, "backpressure", backpressure);
+}
+
+export function writableStreamDefaultWriterAbort(writer, reason) {
+ const stream = $getByIdDirectPrivate(writer, "stream");
+ $assert(stream !== undefined);
+ return $writableStreamAbort(stream, reason);
+}
+
+export function writableStreamDefaultWriterClose(writer) {
+ const stream = $getByIdDirectPrivate(writer, "stream");
+ $assert(stream !== undefined);
+ return $writableStreamClose(stream);
+}
+
+export function writableStreamDefaultWriterCloseWithErrorPropagation(writer) {
+ const stream = $getByIdDirectPrivate(writer, "stream");
+ $assert(stream !== undefined);
+
+ const state = $getByIdDirectPrivate(stream, "state");
+
+ if ($writableStreamCloseQueuedOrInFlight(stream) || state === "closed") return Promise.$resolve();
+
+ if (state === "errored") return Promise.$reject($getByIdDirectPrivate(stream, "storedError"));
+
+ $assert(state === "writable" || state === "erroring");
+ return $writableStreamDefaultWriterClose(writer);
+}
+
+export function writableStreamDefaultWriterEnsureClosedPromiseRejected(writer, error) {
+ let closedPromiseCapability = $getByIdDirectPrivate(writer, "closedPromise");
+ let closedPromise = closedPromiseCapability.$promise;
+
+ if (($getPromiseInternalField(closedPromise, $promiseFieldFlags) & $promiseStateMask) !== $promiseStatePending) {
+ closedPromiseCapability = $newPromiseCapability(Promise);
+ closedPromise = closedPromiseCapability.$promise;
+ $putByIdDirectPrivate(writer, "closedPromise", closedPromiseCapability);
+ }
+
+ closedPromiseCapability.$reject.$call(undefined, error);
+ $markPromiseAsHandled(closedPromise);
+}
+
+export function writableStreamDefaultWriterEnsureReadyPromiseRejected(writer, error) {
+ let readyPromiseCapability = $getByIdDirectPrivate(writer, "readyPromise");
+ let readyPromise = readyPromiseCapability.$promise;
+
+ if (($getPromiseInternalField(readyPromise, $promiseFieldFlags) & $promiseStateMask) !== $promiseStatePending) {
+ readyPromiseCapability = $newPromiseCapability(Promise);
+ readyPromise = readyPromiseCapability.$promise;
+ $putByIdDirectPrivate(writer, "readyPromise", readyPromiseCapability);
+ }
+
+ readyPromiseCapability.$reject.$call(undefined, error);
+ $markPromiseAsHandled(readyPromise);
+}
+
+export function writableStreamDefaultWriterGetDesiredSize(writer) {
+ const stream = $getByIdDirectPrivate(writer, "stream");
+ $assert(stream !== undefined);
+
+ const state = $getByIdDirectPrivate(stream, "state");
+
+ if (state === "errored" || state === "erroring") return null;
+
+ if (state === "closed") return 0;
+
+ return $writableStreamDefaultControllerGetDesiredSize($getByIdDirectPrivate(stream, "controller"));
+}
+
+export function writableStreamDefaultWriterRelease(writer) {
+ const stream = $getByIdDirectPrivate(writer, "stream");
+ $assert(stream !== undefined);
+ $assert($getByIdDirectPrivate(stream, "writer") === writer);
+
+ const releasedError = $makeTypeError("writableStreamDefaultWriterRelease");
+
+ $writableStreamDefaultWriterEnsureReadyPromiseRejected(writer, releasedError);
+ $writableStreamDefaultWriterEnsureClosedPromiseRejected(writer, releasedError);
+
+ $putByIdDirectPrivate(stream, "writer", undefined);
+ $putByIdDirectPrivate(writer, "stream", undefined);
+}
+
+export function writableStreamDefaultWriterWrite(writer, chunk) {
+ const stream = $getByIdDirectPrivate(writer, "stream");
+ $assert(stream !== undefined);
+
+ const controller = $getByIdDirectPrivate(stream, "controller");
+ $assert(controller !== undefined);
+ const chunkSize = $writableStreamDefaultControllerGetChunkSize(controller, chunk);
+
+ if (stream !== $getByIdDirectPrivate(writer, "stream"))
+ return Promise.$reject($makeTypeError("writer is not stream's writer"));
+
+ const state = $getByIdDirectPrivate(stream, "state");
+ if (state === "errored") return Promise.$reject($getByIdDirectPrivate(stream, "storedError"));
+
+ if ($writableStreamCloseQueuedOrInFlight(stream) || state === "closed")
+ return Promise.$reject($makeTypeError("stream is closing or closed"));
+
+ if ($writableStreamCloseQueuedOrInFlight(stream) || state === "closed")
+ return Promise.$reject($makeTypeError("stream is closing or closed"));
+
+ if (state === "erroring") return Promise.$reject($getByIdDirectPrivate(stream, "storedError"));
+
+ $assert(state === "writable");
+
+ const promise = $writableStreamAddWriteRequest(stream);
+ $writableStreamDefaultControllerWrite(controller, chunk, chunkSize);
+ return promise;
+}
+
+export function setUpWritableStreamDefaultController(
+ stream,
+ controller,
+ startAlgorithm,
+ writeAlgorithm,
+ closeAlgorithm,
+ abortAlgorithm,
+ highWaterMark,
+ sizeAlgorithm,
+) {
+ $assert($isWritableStream(stream));
+ $assert($getByIdDirectPrivate(stream, "controller") === undefined);
+
+ $putByIdDirectPrivate(controller, "stream", stream);
+ $putByIdDirectPrivate(stream, "controller", controller);
+
+ $resetQueue($getByIdDirectPrivate(controller, "queue"));
+
+ $putByIdDirectPrivate(controller, "started", -1);
+ $putByIdDirectPrivate(controller, "startAlgorithm", startAlgorithm);
+ $putByIdDirectPrivate(controller, "strategySizeAlgorithm", sizeAlgorithm);
+ $putByIdDirectPrivate(controller, "strategyHWM", highWaterMark);
+ $putByIdDirectPrivate(controller, "writeAlgorithm", writeAlgorithm);
+ $putByIdDirectPrivate(controller, "closeAlgorithm", closeAlgorithm);
+ $putByIdDirectPrivate(controller, "abortAlgorithm", abortAlgorithm);
+
+ const backpressure = $writableStreamDefaultControllerGetBackpressure(controller);
+ $writableStreamUpdateBackpressure(stream, backpressure);
+
+ $writableStreamDefaultControllerStart(controller);
+}
+
+export function writableStreamDefaultControllerStart(controller) {
+ if ($getByIdDirectPrivate(controller, "started") !== -1) return;
+
+ $putByIdDirectPrivate(controller, "started", 0);
+
+ const startAlgorithm = $getByIdDirectPrivate(controller, "startAlgorithm");
+ $putByIdDirectPrivate(controller, "startAlgorithm", undefined);
+ const stream = $getByIdDirectPrivate(controller, "stream");
+ return Promise.$resolve(startAlgorithm.$call()).$then(
+ () => {
+ const state = $getByIdDirectPrivate(stream, "state");
+ $assert(state === "writable" || state === "erroring");
+ $putByIdDirectPrivate(controller, "started", 1);
+ $writableStreamDefaultControllerAdvanceQueueIfNeeded(controller);
+ },
+ error => {
+ const state = $getByIdDirectPrivate(stream, "state");
+ $assert(state === "writable" || state === "erroring");
+ $putByIdDirectPrivate(controller, "started", 1);
+ $writableStreamDealWithRejection(stream, error);
+ },
+ );
+}
+
+export function setUpWritableStreamDefaultControllerFromUnderlyingSink(
+ stream,
+ underlyingSink,
+ underlyingSinkDict,
+ highWaterMark,
+ sizeAlgorithm,
+) {
+ const controller = new $WritableStreamDefaultController();
+
+ let startAlgorithm = () => {};
+ let writeAlgorithm = () => {
+ return Promise.$resolve();
+ };
+ let closeAlgorithm = () => {
+ return Promise.$resolve();
+ };
+ let abortAlgorithm = () => {
+ return Promise.$resolve();
+ };
+
+ if ("start" in underlyingSinkDict) {
+ const startMethod = underlyingSinkDict["start"];
+ startAlgorithm = () => $promiseInvokeOrNoopMethodNoCatch(underlyingSink, startMethod, [controller]);
+ }
+ if ("write" in underlyingSinkDict) {
+ const writeMethod = underlyingSinkDict["write"];
+ writeAlgorithm = chunk => $promiseInvokeOrNoopMethod(underlyingSink, writeMethod, [chunk, controller]);
+ }
+ if ("close" in underlyingSinkDict) {
+ const closeMethod = underlyingSinkDict["close"];
+ closeAlgorithm = () => $promiseInvokeOrNoopMethod(underlyingSink, closeMethod, []);
+ }
+ if ("abort" in underlyingSinkDict) {
+ const abortMethod = underlyingSinkDict["abort"];
+ abortAlgorithm = reason => $promiseInvokeOrNoopMethod(underlyingSink, abortMethod, [reason]);
+ }
+
+ $setUpWritableStreamDefaultController(
+ stream,
+ controller,
+ startAlgorithm,
+ writeAlgorithm,
+ closeAlgorithm,
+ abortAlgorithm,
+ highWaterMark,
+ sizeAlgorithm,
+ );
+}
+
+export function writableStreamDefaultControllerAdvanceQueueIfNeeded(controller) {
+ const stream = $getByIdDirectPrivate(controller, "stream");
+
+ if ($getByIdDirectPrivate(controller, "started") !== 1) return;
+
+ $assert(stream !== undefined);
+ if ($getByIdDirectPrivate(stream, "inFlightWriteRequest") !== undefined) return;
+
+ const state = $getByIdDirectPrivate(stream, "state");
+ $assert(state !== "closed" || state !== "errored");
+ if (state === "erroring") {
+ $writableStreamFinishErroring(stream);
+ return;
+ }
+
+ const queue = $getByIdDirectPrivate(controller, "queue");
+
+ if (queue.content?.isEmpty() ?? false) return;
+
+ const value = $peekQueueValue(queue);
+ if (value === $isCloseSentinel) $writableStreamDefaultControllerProcessClose(controller);
+ else $writableStreamDefaultControllerProcessWrite(controller, value);
+}
+
+export function isCloseSentinel() {}
+
+export function writableStreamDefaultControllerClearAlgorithms(controller) {
+ $putByIdDirectPrivate(controller, "writeAlgorithm", undefined);
+ $putByIdDirectPrivate(controller, "closeAlgorithm", undefined);
+ $putByIdDirectPrivate(controller, "abortAlgorithm", undefined);
+ $putByIdDirectPrivate(controller, "strategySizeAlgorithm", undefined);
+}
+
+export function writableStreamDefaultControllerClose(controller) {
+ $enqueueValueWithSize($getByIdDirectPrivate(controller, "queue"), $isCloseSentinel, 0);
+ $writableStreamDefaultControllerAdvanceQueueIfNeeded(controller);
+}
+
+export function writableStreamDefaultControllerError(controller, error) {
+ const stream = $getByIdDirectPrivate(controller, "stream");
+ $assert(stream !== undefined);
+ $assert($getByIdDirectPrivate(stream, "state") === "writable");
+
+ $writableStreamDefaultControllerClearAlgorithms(controller);
+ $writableStreamStartErroring(stream, error);
+}
+
+export function writableStreamDefaultControllerErrorIfNeeded(controller, error) {
+ const stream = $getByIdDirectPrivate(controller, "stream");
+ if ($getByIdDirectPrivate(stream, "state") === "writable") $writableStreamDefaultControllerError(controller, error);
+}
+
+export function writableStreamDefaultControllerGetBackpressure(controller) {
+ const desiredSize = $writableStreamDefaultControllerGetDesiredSize(controller);
+ return desiredSize <= 0;
+}
+
+export function writableStreamDefaultControllerGetChunkSize(controller, chunk) {
+ try {
+ return $getByIdDirectPrivate(controller, "strategySizeAlgorithm").$call(undefined, chunk);
+ } catch (e) {
+ $writableStreamDefaultControllerErrorIfNeeded(controller, e);
+ return 1;
+ }
+}
+
+export function writableStreamDefaultControllerGetDesiredSize(controller) {
+ return $getByIdDirectPrivate(controller, "strategyHWM") - $getByIdDirectPrivate(controller, "queue").size;
+}
+
+export function writableStreamDefaultControllerProcessClose(controller) {
+ const stream = $getByIdDirectPrivate(controller, "stream");
+
+ $writableStreamMarkCloseRequestInFlight(stream);
+ $dequeueValue($getByIdDirectPrivate(controller, "queue"));
+
+ $assert($getByIdDirectPrivate(controller, "queue").content?.isEmpty());
+
+ const sinkClosePromise = $getByIdDirectPrivate(controller, "closeAlgorithm").$call();
+ $writableStreamDefaultControllerClearAlgorithms(controller);
+
+ sinkClosePromise.$then(
+ () => {
+ $writableStreamFinishInFlightClose(stream);
+ },
+ reason => {
+ $writableStreamFinishInFlightCloseWithError(stream, reason);
+ },
+ );
+}
+
+export function writableStreamDefaultControllerProcessWrite(controller, chunk) {
+ const stream = $getByIdDirectPrivate(controller, "stream");
+
+ $writableStreamMarkFirstWriteRequestInFlight(stream);
+
+ const sinkWritePromise = $getByIdDirectPrivate(controller, "writeAlgorithm").$call(undefined, chunk);
+
+ sinkWritePromise.$then(
+ () => {
+ $writableStreamFinishInFlightWrite(stream);
+ const state = $getByIdDirectPrivate(stream, "state");
+ $assert(state === "writable" || state === "erroring");
+
+ $dequeueValue($getByIdDirectPrivate(controller, "queue"));
+ if (!$writableStreamCloseQueuedOrInFlight(stream) && state === "writable") {
+ const backpressure = $writableStreamDefaultControllerGetBackpressure(controller);
+ $writableStreamUpdateBackpressure(stream, backpressure);
+ }
+ $writableStreamDefaultControllerAdvanceQueueIfNeeded(controller);
+ },
+ reason => {
+ const state = $getByIdDirectPrivate(stream, "state");
+ if (state === "writable") $writableStreamDefaultControllerClearAlgorithms(controller);
+
+ $writableStreamFinishInFlightWriteWithError(stream, reason);
+ },
+ );
+}
+
+export function writableStreamDefaultControllerWrite(controller, chunk, chunkSize) {
+ try {
+ $enqueueValueWithSize($getByIdDirectPrivate(controller, "queue"), chunk, chunkSize);
+
+ const stream = $getByIdDirectPrivate(controller, "stream");
+
+ const state = $getByIdDirectPrivate(stream, "state");
+ if (!$writableStreamCloseQueuedOrInFlight(stream) && state === "writable") {
+ const backpressure = $writableStreamDefaultControllerGetBackpressure(controller);
+ $writableStreamUpdateBackpressure(stream, backpressure);
+ }
+ $writableStreamDefaultControllerAdvanceQueueIfNeeded(controller);
+ } catch (e) {
+ $writableStreamDefaultControllerErrorIfNeeded(controller, e);
+ }
+}