aboutsummaryrefslogtreecommitdiff
path: root/src/bun.js/builtins/js/TransformStreamInternals.js
diff options
context:
space:
mode:
Diffstat (limited to 'src/bun.js/builtins/js/TransformStreamInternals.js')
-rw-r--r--src/bun.js/builtins/js/TransformStreamInternals.js350
1 files changed, 350 insertions, 0 deletions
diff --git a/src/bun.js/builtins/js/TransformStreamInternals.js b/src/bun.js/builtins/js/TransformStreamInternals.js
new file mode 100644
index 000000000..4263e3991
--- /dev/null
+++ b/src/bun.js/builtins/js/TransformStreamInternals.js
@@ -0,0 +1,350 @@
+/*
+ * Copyright (C) 2020 Apple Inc. All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ * 1. Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY APPLE INC. AND ITS CONTRIBUTORS ``AS IS''
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
+ * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+ * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL APPLE INC. OR ITS CONTRIBUTORS
+ * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
+ * THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+// @internal
+
+function isTransformStream(stream)
+{
+ "use strict";
+
+ return @isObject(stream) && !!@getByIdDirectPrivate(stream, "readable");
+}
+
+function isTransformStreamDefaultController(controller)
+{
+ "use strict";
+
+ return @isObject(controller) && !!@getByIdDirectPrivate(controller, "transformAlgorithm");
+}
+
+function createTransformStream(startAlgorithm, transformAlgorithm, flushAlgorithm, writableHighWaterMark, writableSizeAlgorithm, readableHighWaterMark, readableSizeAlgorithm)
+{
+ if (writableHighWaterMark === @undefined)
+ writableHighWaterMark = 1;
+ if (writableSizeAlgorithm === @undefined)
+ writableSizeAlgorithm = () => 1;
+ if (readableHighWaterMark === @undefined)
+ readableHighWaterMark = 0;
+ if (readableSizeAlgorithm === @undefined)
+ readableSizeAlgorithm = () => 1;
+ @assert(writableHighWaterMark >= 0);
+ @assert(readableHighWaterMark >= 0);
+
+ const transform = {};
+ @putByIdDirectPrivate(transform, "TransformStream", true);
+
+ const stream = new @TransformStream(transform);
+ const startPromiseCapability = @newPromiseCapability(@Promise);
+ @initializeTransformStream(stream, startPromiseCapability.@promise, writableHighWaterMark, writableSizeAlgorithm, readableHighWaterMark, readableSizeAlgorithm);
+
+ const controller = new @TransformStreamDefaultController();
+ @setUpTransformStreamDefaultController(stream, controller, transformAlgorithm, flushAlgorithm);
+
+ startAlgorithm().@then(() => {
+ startPromiseCapability.@resolve.@call();
+ }, (error) => {
+ startPromiseCapability.@reject.@call(@undefined, error);
+ });
+
+ return stream;
+}
+
+function initializeTransformStream(stream, startPromise, writableHighWaterMark, writableSizeAlgorithm, readableHighWaterMark, readableSizeAlgorithm)
+{
+ "use strict";
+
+ const startAlgorithm = () => { return startPromise; };
+ const writeAlgorithm = (chunk) => { return @transformStreamDefaultSinkWriteAlgorithm(stream, chunk); }
+ const abortAlgorithm = (reason) => { return @transformStreamDefaultSinkAbortAlgorithm(stream, reason); }
+ const closeAlgorithm = () => { return @transformStreamDefaultSinkCloseAlgorithm(stream); }
+ const writable = @createWritableStream(startAlgorithm, writeAlgorithm, closeAlgorithm, abortAlgorithm, writableHighWaterMark, writableSizeAlgorithm);
+
+ const pullAlgorithm = () => { return @transformStreamDefaultSourcePullAlgorithm(stream); };
+ const cancelAlgorithm = (reason) => {
+ @transformStreamErrorWritableAndUnblockWrite(stream, reason);
+ return @Promise.@resolve();
+ };
+ const underlyingSource = { };
+ @putByIdDirectPrivate(underlyingSource, "start", startAlgorithm);
+ @putByIdDirectPrivate(underlyingSource, "pull", pullAlgorithm);
+ @putByIdDirectPrivate(underlyingSource, "cancel", cancelAlgorithm);
+ const options = { };
+ @putByIdDirectPrivate(options, "size", readableSizeAlgorithm);
+ @putByIdDirectPrivate(options, "highWaterMark", readableHighWaterMark);
+ const readable = new @ReadableStream(underlyingSource, options);
+
+ // The writable to expose to JS through writable getter.
+ @putByIdDirectPrivate(stream, "writable", writable);
+ // The writable to use for the actual transform algorithms.
+ @putByIdDirectPrivate(stream, "internalWritable", @getInternalWritableStream(writable));
+
+ @putByIdDirectPrivate(stream, "readable", readable);
+ @putByIdDirectPrivate(stream, "backpressure", @undefined);
+ @putByIdDirectPrivate(stream, "backpressureChangePromise", @undefined);
+
+ @transformStreamSetBackpressure(stream, true);
+ @putByIdDirectPrivate(stream, "controller", @undefined);
+}
+
+function transformStreamError(stream, e)
+{
+ "use strict";
+
+ const readable = @getByIdDirectPrivate(stream, "readable");
+ const readableController = @getByIdDirectPrivate(readable, "readableStreamController");
+ @readableStreamDefaultControllerError(readableController, e);
+
+ @transformStreamErrorWritableAndUnblockWrite(stream, e);
+}
+
+function transformStreamErrorWritableAndUnblockWrite(stream, e)
+{
+ "use strict";
+
+ @transformStreamDefaultControllerClearAlgorithms(@getByIdDirectPrivate(stream, "controller"));
+
+ const writable = @getByIdDirectPrivate(stream, "internalWritable");
+ @writableStreamDefaultControllerErrorIfNeeded(@getByIdDirectPrivate(writable, "controller"), e);
+
+ if (@getByIdDirectPrivate(stream, "backpressure"))
+ @transformStreamSetBackpressure(stream, false);
+}
+
+function transformStreamSetBackpressure(stream, backpressure)
+{
+ "use strict";
+
+ @assert(@getByIdDirectPrivate(stream, "backpressure") !== backpressure);
+
+ const backpressureChangePromise = @getByIdDirectPrivate(stream, "backpressureChangePromise");
+ if (backpressureChangePromise !== @undefined)
+ backpressureChangePromise.@resolve.@call();
+
+ @putByIdDirectPrivate(stream, "backpressureChangePromise", @newPromiseCapability(@Promise));
+ @putByIdDirectPrivate(stream, "backpressure", backpressure);
+}
+
+function setUpTransformStreamDefaultController(stream, controller, transformAlgorithm, flushAlgorithm)
+{
+ "use strict";
+
+ @assert(@isTransformStream(stream));
+ @assert(@getByIdDirectPrivate(stream, "controller") === @undefined);
+
+ @putByIdDirectPrivate(controller, "stream", stream);
+ @putByIdDirectPrivate(stream, "controller", controller);
+ @putByIdDirectPrivate(controller, "transformAlgorithm", transformAlgorithm);
+ @putByIdDirectPrivate(controller, "flushAlgorithm", flushAlgorithm);
+}
+
+
+function setUpTransformStreamDefaultControllerFromTransformer(stream, transformer, transformerDict)
+{
+ "use strict";
+
+ const controller = new @TransformStreamDefaultController();
+ let transformAlgorithm = (chunk) => {
+ try {
+ @transformStreamDefaultControllerEnqueue(controller, chunk);
+ } catch (e) {
+ return @Promise.@reject(e);
+ }
+ return @Promise.@resolve();
+ };
+ let flushAlgorithm = () => { return @Promise.@resolve(); };
+
+ if ("transform" in transformerDict)
+ transformAlgorithm = (chunk) => {
+ return @promiseInvokeOrNoopMethod(transformer, transformerDict["transform"], [chunk, controller]);
+ };
+
+ if ("flush" in transformerDict) {
+ flushAlgorithm = () => {
+ return @promiseInvokeOrNoopMethod(transformer, transformerDict["flush"], [controller]);
+ };
+ }
+
+ @setUpTransformStreamDefaultController(stream, controller, transformAlgorithm, flushAlgorithm);
+}
+
+function transformStreamDefaultControllerClearAlgorithms(controller)
+{
+ "use strict";
+
+ // We set transformAlgorithm to true to allow GC but keep the isTransformStreamDefaultController check.
+ @putByIdDirectPrivate(controller, "transformAlgorithm", true);
+ @putByIdDirectPrivate(controller, "flushAlgorithm", @undefined);
+}
+
+function transformStreamDefaultControllerEnqueue(controller, chunk)
+{
+ "use strict";
+
+ const stream = @getByIdDirectPrivate(controller, "stream");
+ const readable = @getByIdDirectPrivate(stream, "readable");
+ const readableController = @getByIdDirectPrivate(readable, "readableStreamController");
+
+ @assert(readableController !== @undefined);
+ if (!@readableStreamDefaultControllerCanCloseOrEnqueue(readableController))
+ @throwTypeError("TransformStream.readable cannot close or enqueue");
+
+ try {
+ @readableStreamDefaultControllerEnqueue(readableController, chunk);
+ } catch (e) {
+ @transformStreamErrorWritableAndUnblockWrite(stream, e);
+ throw @getByIdDirectPrivate(readable, "storedError");
+ }
+
+ const backpressure = !@readableStreamDefaultControllerShouldCallPull(readableController);
+ if (backpressure !== @getByIdDirectPrivate(stream, "backpressure")) {
+ @assert(backpressure);
+ @transformStreamSetBackpressure(stream, true);
+ }
+}
+
+function transformStreamDefaultControllerError(controller, e)
+{
+ "use strict";
+
+ @transformStreamError(@getByIdDirectPrivate(controller, "stream"), e);
+}
+
+function transformStreamDefaultControllerPerformTransform(controller, chunk)
+{
+ "use strict";
+
+ const promiseCapability = @newPromiseCapability(@Promise);
+
+ const transformPromise = @getByIdDirectPrivate(controller, "transformAlgorithm").@call(@undefined, chunk);
+ transformPromise.@then(() => {
+ promiseCapability.@resolve();
+ }, (r) => {
+ @transformStreamError(@getByIdDirectPrivate(controller, "stream"), r);
+ promiseCapability.@reject.@call(@undefined, r);
+ });
+ return promiseCapability.@promise;
+}
+
+function transformStreamDefaultControllerTerminate(controller)
+{
+ "use strict";
+
+ const stream = @getByIdDirectPrivate(controller, "stream");
+ const readable = @getByIdDirectPrivate(stream, "readable");
+ const readableController = @getByIdDirectPrivate(readable, "readableStreamController");
+
+ // FIXME: Update readableStreamDefaultControllerClose to make this check.
+ if (@readableStreamDefaultControllerCanCloseOrEnqueue(readableController))
+ @readableStreamDefaultControllerClose(readableController);
+ const error = @makeTypeError("the stream has been terminated");
+ @transformStreamErrorWritableAndUnblockWrite(stream, error);
+}
+
+function transformStreamDefaultSinkWriteAlgorithm(stream, chunk)
+{
+ "use strict";
+
+ const writable = @getByIdDirectPrivate(stream, "internalWritable");
+
+ @assert(@getByIdDirectPrivate(writable, "state") === "writable");
+
+ const controller = @getByIdDirectPrivate(stream, "controller");
+
+ if (@getByIdDirectPrivate(stream, "backpressure")) {
+ const promiseCapability = @newPromiseCapability(@Promise);
+
+ const backpressureChangePromise = @getByIdDirectPrivate(stream, "backpressureChangePromise");
+ @assert(backpressureChangePromise !== @undefined);
+ backpressureChangePromise.@promise.@then(() => {
+ const state = @getByIdDirectPrivate(writable, "state");
+ if (state === "erroring") {
+ promiseCapability.@reject.@call(@undefined, @getByIdDirectPrivate(writable, "storedError"));
+ return;
+ }
+
+ @assert(state === "writable");
+ @transformStreamDefaultControllerPerformTransform(controller, chunk).@then(() => {
+ promiseCapability.@resolve();
+ }, (e) => {
+ promiseCapability.@reject.@call(@undefined, e);
+ });
+ }, (e) => {
+ promiseCapability.@reject.@call(@undefined, e);
+ });
+
+ return promiseCapability.@promise;
+ }
+ return @transformStreamDefaultControllerPerformTransform(controller, chunk);
+}
+
+function transformStreamDefaultSinkAbortAlgorithm(stream, reason)
+{
+ "use strict";
+
+ @transformStreamError(stream, reason);
+ return @Promise.@resolve();
+}
+
+function transformStreamDefaultSinkCloseAlgorithm(stream)
+{
+ "use strict";
+ const readable = @getByIdDirectPrivate(stream, "readable");
+ const controller = @getByIdDirectPrivate(stream, "controller");
+ const readableController = @getByIdDirectPrivate(readable, "readableStreamController");
+
+ const flushAlgorithm = @getByIdDirectPrivate(controller, "flushAlgorithm");
+ @assert(flushAlgorithm !== @undefined);
+ const flushPromise = @getByIdDirectPrivate(controller, "flushAlgorithm").@call();
+ @transformStreamDefaultControllerClearAlgorithms(controller);
+
+ const promiseCapability = @newPromiseCapability(@Promise);
+ flushPromise.@then(() => {
+ if (@getByIdDirectPrivate(readable, "state") === @streamErrored) {
+ promiseCapability.@reject.@call(@undefined, @getByIdDirectPrivate(readable, "storedError"));
+ return;
+ }
+
+ // FIXME: Update readableStreamDefaultControllerClose to make this check.
+ if (@readableStreamDefaultControllerCanCloseOrEnqueue(readableController))
+ @readableStreamDefaultControllerClose(readableController);
+ promiseCapability.@resolve();
+ }, (r) => {
+ @transformStreamError(@getByIdDirectPrivate(controller, "stream"), r);
+ promiseCapability.@reject.@call(@undefined, @getByIdDirectPrivate(readable, "storedError"));
+ });
+ return promiseCapability.@promise;
+}
+
+function transformStreamDefaultSourcePullAlgorithm(stream)
+{
+ "use strict";
+
+ @assert(@getByIdDirectPrivate(stream, "backpressure"));
+ @assert(@getByIdDirectPrivate(stream, "backpressureChangePromise") !== @undefined);
+
+ @transformStreamSetBackpressure(stream, false);
+
+ return @getByIdDirectPrivate(stream, "backpressureChangePromise").@promise;
+}