diff options
author | 2022-06-22 22:52:03 -0700 | |
---|---|---|
committer | 2022-06-22 22:52:03 -0700 | |
commit | a77825799e83b05833f22b4d7f8950ff2bc3fe00 (patch) | |
tree | bad8460540733f8ede8048fdd6b6c11457a570c2 /src/javascript/jsc/builtins/js/TransformStreamInternals.js | |
parent | 7db88ca6b5614463d70bdf048b42abc9996b3a4f (diff) | |
download | bun-a77825799e83b05833f22b4d7f8950ff2bc3fe00.tar.gz bun-a77825799e83b05833f22b4d7f8950ff2bc3fe00.tar.zst bun-a77825799e83b05833f22b4d7f8950ff2bc3fe00.zip |
Move builtins to src/javascript/jsc/builtins
Diffstat (limited to 'src/javascript/jsc/builtins/js/TransformStreamInternals.js')
-rw-r--r-- | src/javascript/jsc/builtins/js/TransformStreamInternals.js | 350 |
1 files changed, 350 insertions, 0 deletions
diff --git a/src/javascript/jsc/builtins/js/TransformStreamInternals.js b/src/javascript/jsc/builtins/js/TransformStreamInternals.js new file mode 100644 index 000000000..4263e3991 --- /dev/null +++ b/src/javascript/jsc/builtins/js/TransformStreamInternals.js @@ -0,0 +1,350 @@ +/* + * Copyright (C) 2020 Apple Inc. All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY APPLE INC. AND ITS CONTRIBUTORS ``AS IS'' + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, + * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR + * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL APPLE INC. OR ITS CONTRIBUTORS + * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF + * THE POSSIBILITY OF SUCH DAMAGE. + */ + +// @internal + +function isTransformStream(stream) +{ + "use strict"; + + return @isObject(stream) && !!@getByIdDirectPrivate(stream, "readable"); +} + +function isTransformStreamDefaultController(controller) +{ + "use strict"; + + return @isObject(controller) && !!@getByIdDirectPrivate(controller, "transformAlgorithm"); +} + +function createTransformStream(startAlgorithm, transformAlgorithm, flushAlgorithm, writableHighWaterMark, writableSizeAlgorithm, readableHighWaterMark, readableSizeAlgorithm) +{ + if (writableHighWaterMark === @undefined) + writableHighWaterMark = 1; + if (writableSizeAlgorithm === @undefined) + writableSizeAlgorithm = () => 1; + if (readableHighWaterMark === @undefined) + readableHighWaterMark = 0; + if (readableSizeAlgorithm === @undefined) + readableSizeAlgorithm = () => 1; + @assert(writableHighWaterMark >= 0); + @assert(readableHighWaterMark >= 0); + + const transform = {}; + @putByIdDirectPrivate(transform, "TransformStream", true); + + const stream = new @TransformStream(transform); + const startPromiseCapability = @newPromiseCapability(@Promise); + @initializeTransformStream(stream, startPromiseCapability.@promise, writableHighWaterMark, writableSizeAlgorithm, readableHighWaterMark, readableSizeAlgorithm); + + const controller = new @TransformStreamDefaultController(); + @setUpTransformStreamDefaultController(stream, controller, transformAlgorithm, flushAlgorithm); + + startAlgorithm().@then(() => { + startPromiseCapability.@resolve.@call(); + }, (error) => { + startPromiseCapability.@reject.@call(@undefined, error); + }); + + return stream; +} + +function initializeTransformStream(stream, startPromise, writableHighWaterMark, writableSizeAlgorithm, readableHighWaterMark, readableSizeAlgorithm) +{ + "use strict"; + + const startAlgorithm = () => { return startPromise; }; + const writeAlgorithm = (chunk) => { return @transformStreamDefaultSinkWriteAlgorithm(stream, chunk); } + const abortAlgorithm = (reason) => { return @transformStreamDefaultSinkAbortAlgorithm(stream, reason); } + const closeAlgorithm = () => { return @transformStreamDefaultSinkCloseAlgorithm(stream); } + const writable = @createWritableStream(startAlgorithm, writeAlgorithm, closeAlgorithm, abortAlgorithm, writableHighWaterMark, writableSizeAlgorithm); + + const pullAlgorithm = () => { return @transformStreamDefaultSourcePullAlgorithm(stream); }; + const cancelAlgorithm = (reason) => { + @transformStreamErrorWritableAndUnblockWrite(stream, reason); + return @Promise.@resolve(); + }; + const underlyingSource = { }; + @putByIdDirectPrivate(underlyingSource, "start", startAlgorithm); + @putByIdDirectPrivate(underlyingSource, "pull", pullAlgorithm); + @putByIdDirectPrivate(underlyingSource, "cancel", cancelAlgorithm); + const options = { }; + @putByIdDirectPrivate(options, "size", readableSizeAlgorithm); + @putByIdDirectPrivate(options, "highWaterMark", readableHighWaterMark); + const readable = new @ReadableStream(underlyingSource, options); + + // The writable to expose to JS through writable getter. + @putByIdDirectPrivate(stream, "writable", writable); + // The writable to use for the actual transform algorithms. + @putByIdDirectPrivate(stream, "internalWritable", @getInternalWritableStream(writable)); + + @putByIdDirectPrivate(stream, "readable", readable); + @putByIdDirectPrivate(stream, "backpressure", @undefined); + @putByIdDirectPrivate(stream, "backpressureChangePromise", @undefined); + + @transformStreamSetBackpressure(stream, true); + @putByIdDirectPrivate(stream, "controller", @undefined); +} + +function transformStreamError(stream, e) +{ + "use strict"; + + const readable = @getByIdDirectPrivate(stream, "readable"); + const readableController = @getByIdDirectPrivate(readable, "readableStreamController"); + @readableStreamDefaultControllerError(readableController, e); + + @transformStreamErrorWritableAndUnblockWrite(stream, e); +} + +function transformStreamErrorWritableAndUnblockWrite(stream, e) +{ + "use strict"; + + @transformStreamDefaultControllerClearAlgorithms(@getByIdDirectPrivate(stream, "controller")); + + const writable = @getByIdDirectPrivate(stream, "internalWritable"); + @writableStreamDefaultControllerErrorIfNeeded(@getByIdDirectPrivate(writable, "controller"), e); + + if (@getByIdDirectPrivate(stream, "backpressure")) + @transformStreamSetBackpressure(stream, false); +} + +function transformStreamSetBackpressure(stream, backpressure) +{ + "use strict"; + + @assert(@getByIdDirectPrivate(stream, "backpressure") !== backpressure); + + const backpressureChangePromise = @getByIdDirectPrivate(stream, "backpressureChangePromise"); + if (backpressureChangePromise !== @undefined) + backpressureChangePromise.@resolve.@call(); + + @putByIdDirectPrivate(stream, "backpressureChangePromise", @newPromiseCapability(@Promise)); + @putByIdDirectPrivate(stream, "backpressure", backpressure); +} + +function setUpTransformStreamDefaultController(stream, controller, transformAlgorithm, flushAlgorithm) +{ + "use strict"; + + @assert(@isTransformStream(stream)); + @assert(@getByIdDirectPrivate(stream, "controller") === @undefined); + + @putByIdDirectPrivate(controller, "stream", stream); + @putByIdDirectPrivate(stream, "controller", controller); + @putByIdDirectPrivate(controller, "transformAlgorithm", transformAlgorithm); + @putByIdDirectPrivate(controller, "flushAlgorithm", flushAlgorithm); +} + + +function setUpTransformStreamDefaultControllerFromTransformer(stream, transformer, transformerDict) +{ + "use strict"; + + const controller = new @TransformStreamDefaultController(); + let transformAlgorithm = (chunk) => { + try { + @transformStreamDefaultControllerEnqueue(controller, chunk); + } catch (e) { + return @Promise.@reject(e); + } + return @Promise.@resolve(); + }; + let flushAlgorithm = () => { return @Promise.@resolve(); }; + + if ("transform" in transformerDict) + transformAlgorithm = (chunk) => { + return @promiseInvokeOrNoopMethod(transformer, transformerDict["transform"], [chunk, controller]); + }; + + if ("flush" in transformerDict) { + flushAlgorithm = () => { + return @promiseInvokeOrNoopMethod(transformer, transformerDict["flush"], [controller]); + }; + } + + @setUpTransformStreamDefaultController(stream, controller, transformAlgorithm, flushAlgorithm); +} + +function transformStreamDefaultControllerClearAlgorithms(controller) +{ + "use strict"; + + // We set transformAlgorithm to true to allow GC but keep the isTransformStreamDefaultController check. + @putByIdDirectPrivate(controller, "transformAlgorithm", true); + @putByIdDirectPrivate(controller, "flushAlgorithm", @undefined); +} + +function transformStreamDefaultControllerEnqueue(controller, chunk) +{ + "use strict"; + + const stream = @getByIdDirectPrivate(controller, "stream"); + const readable = @getByIdDirectPrivate(stream, "readable"); + const readableController = @getByIdDirectPrivate(readable, "readableStreamController"); + + @assert(readableController !== @undefined); + if (!@readableStreamDefaultControllerCanCloseOrEnqueue(readableController)) + @throwTypeError("TransformStream.readable cannot close or enqueue"); + + try { + @readableStreamDefaultControllerEnqueue(readableController, chunk); + } catch (e) { + @transformStreamErrorWritableAndUnblockWrite(stream, e); + throw @getByIdDirectPrivate(readable, "storedError"); + } + + const backpressure = !@readableStreamDefaultControllerShouldCallPull(readableController); + if (backpressure !== @getByIdDirectPrivate(stream, "backpressure")) { + @assert(backpressure); + @transformStreamSetBackpressure(stream, true); + } +} + +function transformStreamDefaultControllerError(controller, e) +{ + "use strict"; + + @transformStreamError(@getByIdDirectPrivate(controller, "stream"), e); +} + +function transformStreamDefaultControllerPerformTransform(controller, chunk) +{ + "use strict"; + + const promiseCapability = @newPromiseCapability(@Promise); + + const transformPromise = @getByIdDirectPrivate(controller, "transformAlgorithm").@call(@undefined, chunk); + transformPromise.@then(() => { + promiseCapability.@resolve(); + }, (r) => { + @transformStreamError(@getByIdDirectPrivate(controller, "stream"), r); + promiseCapability.@reject.@call(@undefined, r); + }); + return promiseCapability.@promise; +} + +function transformStreamDefaultControllerTerminate(controller) +{ + "use strict"; + + const stream = @getByIdDirectPrivate(controller, "stream"); + const readable = @getByIdDirectPrivate(stream, "readable"); + const readableController = @getByIdDirectPrivate(readable, "readableStreamController"); + + // FIXME: Update readableStreamDefaultControllerClose to make this check. + if (@readableStreamDefaultControllerCanCloseOrEnqueue(readableController)) + @readableStreamDefaultControllerClose(readableController); + const error = @makeTypeError("the stream has been terminated"); + @transformStreamErrorWritableAndUnblockWrite(stream, error); +} + +function transformStreamDefaultSinkWriteAlgorithm(stream, chunk) +{ + "use strict"; + + const writable = @getByIdDirectPrivate(stream, "internalWritable"); + + @assert(@getByIdDirectPrivate(writable, "state") === "writable"); + + const controller = @getByIdDirectPrivate(stream, "controller"); + + if (@getByIdDirectPrivate(stream, "backpressure")) { + const promiseCapability = @newPromiseCapability(@Promise); + + const backpressureChangePromise = @getByIdDirectPrivate(stream, "backpressureChangePromise"); + @assert(backpressureChangePromise !== @undefined); + backpressureChangePromise.@promise.@then(() => { + const state = @getByIdDirectPrivate(writable, "state"); + if (state === "erroring") { + promiseCapability.@reject.@call(@undefined, @getByIdDirectPrivate(writable, "storedError")); + return; + } + + @assert(state === "writable"); + @transformStreamDefaultControllerPerformTransform(controller, chunk).@then(() => { + promiseCapability.@resolve(); + }, (e) => { + promiseCapability.@reject.@call(@undefined, e); + }); + }, (e) => { + promiseCapability.@reject.@call(@undefined, e); + }); + + return promiseCapability.@promise; + } + return @transformStreamDefaultControllerPerformTransform(controller, chunk); +} + +function transformStreamDefaultSinkAbortAlgorithm(stream, reason) +{ + "use strict"; + + @transformStreamError(stream, reason); + return @Promise.@resolve(); +} + +function transformStreamDefaultSinkCloseAlgorithm(stream) +{ + "use strict"; + const readable = @getByIdDirectPrivate(stream, "readable"); + const controller = @getByIdDirectPrivate(stream, "controller"); + const readableController = @getByIdDirectPrivate(readable, "readableStreamController"); + + const flushAlgorithm = @getByIdDirectPrivate(controller, "flushAlgorithm"); + @assert(flushAlgorithm !== @undefined); + const flushPromise = @getByIdDirectPrivate(controller, "flushAlgorithm").@call(); + @transformStreamDefaultControllerClearAlgorithms(controller); + + const promiseCapability = @newPromiseCapability(@Promise); + flushPromise.@then(() => { + if (@getByIdDirectPrivate(readable, "state") === @streamErrored) { + promiseCapability.@reject.@call(@undefined, @getByIdDirectPrivate(readable, "storedError")); + return; + } + + // FIXME: Update readableStreamDefaultControllerClose to make this check. + if (@readableStreamDefaultControllerCanCloseOrEnqueue(readableController)) + @readableStreamDefaultControllerClose(readableController); + promiseCapability.@resolve(); + }, (r) => { + @transformStreamError(@getByIdDirectPrivate(controller, "stream"), r); + promiseCapability.@reject.@call(@undefined, @getByIdDirectPrivate(readable, "storedError")); + }); + return promiseCapability.@promise; +} + +function transformStreamDefaultSourcePullAlgorithm(stream) +{ + "use strict"; + + @assert(@getByIdDirectPrivate(stream, "backpressure")); + @assert(@getByIdDirectPrivate(stream, "backpressureChangePromise") !== @undefined); + + @transformStreamSetBackpressure(stream, false); + + return @getByIdDirectPrivate(stream, "backpressureChangePromise").@promise; +} |