aboutsummaryrefslogtreecommitdiff
path: root/src/js/builtins/TransformStreamInternals.ts
diff options
context:
space:
mode:
Diffstat (limited to 'src/js/builtins/TransformStreamInternals.ts')
-rw-r--r--src/js/builtins/TransformStreamInternals.ts348
1 files changed, 348 insertions, 0 deletions
diff --git a/src/js/builtins/TransformStreamInternals.ts b/src/js/builtins/TransformStreamInternals.ts
new file mode 100644
index 000000000..9994d1282
--- /dev/null
+++ b/src/js/builtins/TransformStreamInternals.ts
@@ -0,0 +1,348 @@
+/*
+ * Copyright (C) 2020 Apple Inc. All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ * 1. Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY APPLE INC. AND ITS CONTRIBUTORS ``AS IS''
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
+ * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+ * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL APPLE INC. OR ITS CONTRIBUTORS
+ * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
+ * THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+// @internal
+
+export function isTransformStream(stream) {
+ return $isObject(stream) && !!$getByIdDirectPrivate(stream, "readable");
+}
+
+export function isTransformStreamDefaultController(controller) {
+ return $isObject(controller) && !!$getByIdDirectPrivate(controller, "transformAlgorithm");
+}
+
+export function createTransformStream(
+ startAlgorithm,
+ transformAlgorithm,
+ flushAlgorithm,
+ writableHighWaterMark,
+ writableSizeAlgorithm,
+ readableHighWaterMark,
+ readableSizeAlgorithm,
+) {
+ if (writableHighWaterMark === undefined) writableHighWaterMark = 1;
+ if (writableSizeAlgorithm === undefined) writableSizeAlgorithm = () => 1;
+ if (readableHighWaterMark === undefined) readableHighWaterMark = 0;
+ if (readableSizeAlgorithm === undefined) readableSizeAlgorithm = () => 1;
+ $assert(writableHighWaterMark >= 0);
+ $assert(readableHighWaterMark >= 0);
+
+ const transform = {};
+ $putByIdDirectPrivate(transform, "TransformStream", true);
+
+ const stream = new TransformStream(transform);
+ const startPromiseCapability = $newPromiseCapability(Promise);
+ $initializeTransformStream(
+ stream,
+ startPromiseCapability.$promise,
+ writableHighWaterMark,
+ writableSizeAlgorithm,
+ readableHighWaterMark,
+ readableSizeAlgorithm,
+ );
+
+ const controller = new TransformStreamDefaultController();
+ $setUpTransformStreamDefaultController(stream, controller, transformAlgorithm, flushAlgorithm);
+
+ startAlgorithm().$then(
+ () => {
+ startPromiseCapability.$resolve.$call();
+ },
+ error => {
+ startPromiseCapability.$reject.$call(undefined, error);
+ },
+ );
+
+ return stream;
+}
+
+export function initializeTransformStream(
+ stream,
+ startPromise,
+ writableHighWaterMark,
+ writableSizeAlgorithm,
+ readableHighWaterMark,
+ readableSizeAlgorithm,
+) {
+ const startAlgorithm = () => {
+ return startPromise;
+ };
+ const writeAlgorithm = chunk => {
+ return $transformStreamDefaultSinkWriteAlgorithm(stream, chunk);
+ };
+ const abortAlgorithm = reason => {
+ return $transformStreamDefaultSinkAbortAlgorithm(stream, reason);
+ };
+ const closeAlgorithm = () => {
+ return $transformStreamDefaultSinkCloseAlgorithm(stream);
+ };
+ const writable = $createWritableStream(
+ startAlgorithm,
+ writeAlgorithm,
+ closeAlgorithm,
+ abortAlgorithm,
+ writableHighWaterMark,
+ writableSizeAlgorithm,
+ );
+
+ const pullAlgorithm = () => {
+ return $transformStreamDefaultSourcePullAlgorithm(stream);
+ };
+ const cancelAlgorithm = reason => {
+ $transformStreamErrorWritableAndUnblockWrite(stream, reason);
+ return Promise.$resolve();
+ };
+ const underlyingSource = {};
+ $putByIdDirectPrivate(underlyingSource, "start", startAlgorithm);
+ $putByIdDirectPrivate(underlyingSource, "pull", pullAlgorithm);
+ $putByIdDirectPrivate(underlyingSource, "cancel", cancelAlgorithm);
+ const options = {};
+ $putByIdDirectPrivate(options, "size", readableSizeAlgorithm);
+ $putByIdDirectPrivate(options, "highWaterMark", readableHighWaterMark);
+ const readable = new ReadableStream(underlyingSource, options);
+
+ // The writable to expose to JS through writable getter.
+ $putByIdDirectPrivate(stream, "writable", writable);
+ // The writable to use for the actual transform algorithms.
+ $putByIdDirectPrivate(stream, "internalWritable", $getInternalWritableStream(writable));
+
+ $putByIdDirectPrivate(stream, "readable", readable);
+ $putByIdDirectPrivate(stream, "backpressure", undefined);
+ $putByIdDirectPrivate(stream, "backpressureChangePromise", undefined);
+
+ $transformStreamSetBackpressure(stream, true);
+ $putByIdDirectPrivate(stream, "controller", undefined);
+}
+
+export function transformStreamError(stream, e) {
+ const readable = $getByIdDirectPrivate(stream, "readable");
+ const readableController = $getByIdDirectPrivate(readable, "readableStreamController");
+ $readableStreamDefaultControllerError(readableController, e);
+
+ $transformStreamErrorWritableAndUnblockWrite(stream, e);
+}
+
+export function transformStreamErrorWritableAndUnblockWrite(stream, e) {
+ $transformStreamDefaultControllerClearAlgorithms($getByIdDirectPrivate(stream, "controller"));
+
+ const writable = $getByIdDirectPrivate(stream, "internalWritable");
+ $writableStreamDefaultControllerErrorIfNeeded($getByIdDirectPrivate(writable, "controller"), e);
+
+ if ($getByIdDirectPrivate(stream, "backpressure")) $transformStreamSetBackpressure(stream, false);
+}
+
+export function transformStreamSetBackpressure(stream, backpressure) {
+ $assert($getByIdDirectPrivate(stream, "backpressure") !== backpressure);
+
+ const backpressureChangePromise = $getByIdDirectPrivate(stream, "backpressureChangePromise");
+ if (backpressureChangePromise !== undefined) backpressureChangePromise.$resolve.$call();
+
+ $putByIdDirectPrivate(stream, "backpressureChangePromise", $newPromiseCapability(Promise));
+ $putByIdDirectPrivate(stream, "backpressure", backpressure);
+}
+
+export function setUpTransformStreamDefaultController(stream, controller, transformAlgorithm, flushAlgorithm) {
+ $assert($isTransformStream(stream));
+ $assert($getByIdDirectPrivate(stream, "controller") === undefined);
+
+ $putByIdDirectPrivate(controller, "stream", stream);
+ $putByIdDirectPrivate(stream, "controller", controller);
+ $putByIdDirectPrivate(controller, "transformAlgorithm", transformAlgorithm);
+ $putByIdDirectPrivate(controller, "flushAlgorithm", flushAlgorithm);
+}
+
+export function setUpTransformStreamDefaultControllerFromTransformer(stream, transformer, transformerDict) {
+ const controller = new TransformStreamDefaultController();
+ let transformAlgorithm = chunk => {
+ try {
+ $transformStreamDefaultControllerEnqueue(controller, chunk);
+ } catch (e) {
+ return Promise.$reject(e);
+ }
+ return Promise.$resolve();
+ };
+ let flushAlgorithm = () => {
+ return Promise.$resolve();
+ };
+
+ if ("transform" in transformerDict)
+ transformAlgorithm = chunk => {
+ return $promiseInvokeOrNoopMethod(transformer, transformerDict["transform"], [chunk, controller]);
+ };
+
+ if ("flush" in transformerDict) {
+ flushAlgorithm = () => {
+ return $promiseInvokeOrNoopMethod(transformer, transformerDict["flush"], [controller]);
+ };
+ }
+
+ $setUpTransformStreamDefaultController(stream, controller, transformAlgorithm, flushAlgorithm);
+}
+
+export function transformStreamDefaultControllerClearAlgorithms(controller) {
+ // We set transformAlgorithm to true to allow GC but keep the isTransformStreamDefaultController check.
+ $putByIdDirectPrivate(controller, "transformAlgorithm", true);
+ $putByIdDirectPrivate(controller, "flushAlgorithm", undefined);
+}
+
+export function transformStreamDefaultControllerEnqueue(controller, chunk) {
+ const stream = $getByIdDirectPrivate(controller, "stream");
+ const readable = $getByIdDirectPrivate(stream, "readable");
+ const readableController = $getByIdDirectPrivate(readable, "readableStreamController");
+
+ $assert(readableController !== undefined);
+ if (!$readableStreamDefaultControllerCanCloseOrEnqueue(readableController))
+ $throwTypeError("TransformStream.readable cannot close or enqueue");
+
+ try {
+ $readableStreamDefaultControllerEnqueue(readableController, chunk);
+ } catch (e) {
+ $transformStreamErrorWritableAndUnblockWrite(stream, e);
+ throw $getByIdDirectPrivate(readable, "storedError");
+ }
+
+ const backpressure = !$readableStreamDefaultControllerShouldCallPull(readableController);
+ if (backpressure !== $getByIdDirectPrivate(stream, "backpressure")) {
+ $assert(backpressure);
+ $transformStreamSetBackpressure(stream, true);
+ }
+}
+
+export function transformStreamDefaultControllerError(controller, e) {
+ $transformStreamError($getByIdDirectPrivate(controller, "stream"), e);
+}
+
+export function transformStreamDefaultControllerPerformTransform(controller, chunk) {
+ const promiseCapability = $newPromiseCapability(Promise);
+
+ const transformPromise = $getByIdDirectPrivate(controller, "transformAlgorithm").$call(undefined, chunk);
+ transformPromise.$then(
+ () => {
+ promiseCapability.$resolve();
+ },
+ r => {
+ $transformStreamError($getByIdDirectPrivate(controller, "stream"), r);
+ promiseCapability.$reject.$call(undefined, r);
+ },
+ );
+ return promiseCapability.$promise;
+}
+
+export function transformStreamDefaultControllerTerminate(controller) {
+ const stream = $getByIdDirectPrivate(controller, "stream");
+ const readable = $getByIdDirectPrivate(stream, "readable");
+ const readableController = $getByIdDirectPrivate(readable, "readableStreamController");
+
+ // FIXME: Update readableStreamDefaultControllerClose to make this check.
+ if ($readableStreamDefaultControllerCanCloseOrEnqueue(readableController))
+ $readableStreamDefaultControllerClose(readableController);
+ const error = $makeTypeError("the stream has been terminated");
+ $transformStreamErrorWritableAndUnblockWrite(stream, error);
+}
+
+export function transformStreamDefaultSinkWriteAlgorithm(stream, chunk) {
+ const writable = $getByIdDirectPrivate(stream, "internalWritable");
+
+ $assert($getByIdDirectPrivate(writable, "state") === "writable");
+
+ const controller = $getByIdDirectPrivate(stream, "controller");
+
+ if ($getByIdDirectPrivate(stream, "backpressure")) {
+ const promiseCapability = $newPromiseCapability(Promise);
+
+ const backpressureChangePromise = $getByIdDirectPrivate(stream, "backpressureChangePromise");
+ $assert(backpressureChangePromise !== undefined);
+ backpressureChangePromise.$promise.$then(
+ () => {
+ const state = $getByIdDirectPrivate(writable, "state");
+ if (state === "erroring") {
+ promiseCapability.$reject.$call(undefined, $getByIdDirectPrivate(writable, "storedError"));
+ return;
+ }
+
+ $assert(state === "writable");
+ $transformStreamDefaultControllerPerformTransform(controller, chunk).$then(
+ () => {
+ promiseCapability.$resolve();
+ },
+ e => {
+ promiseCapability.$reject.$call(undefined, e);
+ },
+ );
+ },
+ e => {
+ promiseCapability.$reject.$call(undefined, e);
+ },
+ );
+
+ return promiseCapability.$promise;
+ }
+ return $transformStreamDefaultControllerPerformTransform(controller, chunk);
+}
+
+export function transformStreamDefaultSinkAbortAlgorithm(stream, reason) {
+ $transformStreamError(stream, reason);
+ return Promise.$resolve();
+}
+
+export function transformStreamDefaultSinkCloseAlgorithm(stream) {
+ const readable = $getByIdDirectPrivate(stream, "readable");
+ const controller = $getByIdDirectPrivate(stream, "controller");
+ const readableController = $getByIdDirectPrivate(readable, "readableStreamController");
+
+ const flushAlgorithm = $getByIdDirectPrivate(controller, "flushAlgorithm");
+ $assert(flushAlgorithm !== undefined);
+ const flushPromise = $getByIdDirectPrivate(controller, "flushAlgorithm").$call();
+ $transformStreamDefaultControllerClearAlgorithms(controller);
+
+ const promiseCapability = $newPromiseCapability(Promise);
+ flushPromise.$then(
+ () => {
+ if ($getByIdDirectPrivate(readable, "state") === $streamErrored) {
+ promiseCapability.$reject.$call(undefined, $getByIdDirectPrivate(readable, "storedError"));
+ return;
+ }
+
+ // FIXME: Update readableStreamDefaultControllerClose to make this check.
+ if ($readableStreamDefaultControllerCanCloseOrEnqueue(readableController))
+ $readableStreamDefaultControllerClose(readableController);
+ promiseCapability.$resolve();
+ },
+ r => {
+ $transformStreamError($getByIdDirectPrivate(controller, "stream"), r);
+ promiseCapability.$reject.$call(undefined, $getByIdDirectPrivate(readable, "storedError"));
+ },
+ );
+ return promiseCapability.$promise;
+}
+
+export function transformStreamDefaultSourcePullAlgorithm(stream) {
+ $assert($getByIdDirectPrivate(stream, "backpressure"));
+ $assert($getByIdDirectPrivate(stream, "backpressureChangePromise") !== undefined);
+
+ $transformStreamSetBackpressure(stream, false);
+
+ return $getByIdDirectPrivate(stream, "backpressureChangePromise").$promise;
+}