diff options
author | 2022-06-22 23:21:48 -0700 | |
---|---|---|
committer | 2022-06-22 23:21:48 -0700 | |
commit | 729d445b6885f69dd2c6355f38707bd42851c791 (patch) | |
tree | f87a7c408929ea3f57bbb7ace380cf869da83c0e /src/bun.js/builtins/js | |
parent | 25f820c6bf1d8ec6d444ef579cc036b8c0607b75 (diff) | |
download | bun-jarred/rename.tar.gz bun-jarred/rename.tar.zst bun-jarred/rename.zip |
change the directory structurejarred/rename
Diffstat (limited to 'src/bun.js/builtins/js')
20 files changed, 5470 insertions, 0 deletions
diff --git a/src/bun.js/builtins/js/ByteLengthQueuingStrategy.js b/src/bun.js/builtins/js/ByteLengthQueuingStrategy.js new file mode 100644 index 000000000..e8f5b1cfc --- /dev/null +++ b/src/bun.js/builtins/js/ByteLengthQueuingStrategy.js @@ -0,0 +1,51 @@ +/* + * Copyright (C) 2015 Canon Inc. + * Copyright (C) 2015 Igalia S.L. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY APPLE INC. ``AS IS'' AND ANY + * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR + * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL APPLE INC. OR + * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, + * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, + * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR + * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY + * OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +@getter +function highWaterMark() +{ + "use strict"; + + const highWaterMark = @getByIdDirectPrivate(this, "highWaterMark"); + if (highWaterMark === @undefined) + @throwTypeError("ByteLengthQueuingStrategy.highWaterMark getter called on incompatible |this| value."); + + return highWaterMark; +} + +function size(chunk) +{ + "use strict"; + + return chunk.byteLength; +} + +function initializeByteLengthQueuingStrategy(parameters) +{ + "use strict"; + + @putByIdDirectPrivate(this, "highWaterMark", @extractHighWaterMarkFromQueuingStrategyInit(parameters)); +} diff --git a/src/bun.js/builtins/js/CountQueuingStrategy.js b/src/bun.js/builtins/js/CountQueuingStrategy.js new file mode 100644 index 000000000..3cd9cffc2 --- /dev/null +++ b/src/bun.js/builtins/js/CountQueuingStrategy.js @@ -0,0 +1,50 @@ +/* + * Copyright (C) 2015 Canon Inc. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY APPLE INC. ``AS IS'' AND ANY + * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR + * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL APPLE INC. OR + * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, + * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, + * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR + * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY + * OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +@getter +function highWaterMark() +{ + "use strict"; + + const highWaterMark = @getByIdDirectPrivate(this, "highWaterMark"); + if (highWaterMark === @undefined) + @throwTypeError("CountQueuingStrategy.highWaterMark getter called on incompatible |this| value."); + + return highWaterMark; +} + +function size() +{ + "use strict"; + + return 1; +} + +function initializeCountQueuingStrategy(parameters) +{ + "use strict"; + + @putByIdDirectPrivate(this, "highWaterMark", @extractHighWaterMarkFromQueuingStrategyInit(parameters)); +} diff --git a/src/bun.js/builtins/js/JSBufferConstructor.js b/src/bun.js/builtins/js/JSBufferConstructor.js new file mode 100644 index 000000000..9a3f0e1b7 --- /dev/null +++ b/src/bun.js/builtins/js/JSBufferConstructor.js @@ -0,0 +1,65 @@ +/* + * Copyright 2022 Codeblog Corp. All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY APPLE INC. ``AS IS'' AND ANY + * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR + * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL APPLE INC. OR + * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, + * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, + * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR + * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY + * OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +// ^ that comment is required or the builtins generator will have a fit. + + +function from(items) { + "use strict"; + + if (!@isConstructor(this)) + @throwTypeError("Buffer.from requires |this| to be a constructor"); + + + // TODO: figure out why private symbol not found + if (typeof items === 'string' || (typeof items === 'object' && items && (items instanceof ArrayBuffer || items instanceof SharedArrayBuffer))) { + switch (@argumentCount()) { + case 1: { + return new this(items); + } + case 2: { + return new this(items, @argument(1)); + } + default: { + return new this(items, @argument(1), @argument(2)); + } + } + } + + + var arrayLike = @toObject(items, "Buffer.from requires an array-like object - not null or undefined"); + + // Buffer-specific fast path: + // - uninitialized memory + // - use .set + if (@isTypedArrayView(arrayLike)) { + var length = @typedArrayLength(arrayLike); + var result = this.allocUnsafe(length); + result.set(arrayLike); + return result; + } + + return @tailCallForwardArguments(@Uint8Array.from, this); +} diff --git a/src/bun.js/builtins/js/JSBufferPrototype.js b/src/bun.js/builtins/js/JSBufferPrototype.js new file mode 100644 index 000000000..c841bcd6c --- /dev/null +++ b/src/bun.js/builtins/js/JSBufferPrototype.js @@ -0,0 +1,303 @@ +/* + * Copyright 2022 Codeblog Corp. All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY APPLE INC. ``AS IS'' AND ANY + * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR + * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL APPLE INC. OR + * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, + * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, + * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR + * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY + * OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + + +// ^ that comment is required or the builtins generator will have a fit. + +// The fastest way as of April 2022 is to use DataView. +// DataView has intrinsics that cause inlining + +function setBigUint64(offset, value, le) { + "use strict"; + return this.dataView.setBigUint64(offset, value, le); +} +function readInt8(offset) { + "use strict"; + return this.dataView.getInt8(offset); +} +function readUInt8(offset) { + "use strict"; + return this.dataView.getUint8(offset); +} +function readInt16LE(offset) { + "use strict"; + return this.dataView.getInt16(offset, true); +} +function readInt16BE(offset) { + "use strict"; + return this.dataView.getInt16(offset, false); +} +function readUInt16LE(offset) { + "use strict"; + return this.dataView.getUint16(offset, true); +} +function readUInt16BE(offset) { + "use strict"; + return this.dataView.getUint16(offset, false); +} +function readInt32LE(offset) { + "use strict"; + return this.dataView.getInt32(offset, true); +} +function readInt32BE(offset) { + "use strict"; + return this.dataView.getInt32(offset, false); +} +function readUInt32LE(offset) { + "use strict"; + return this.dataView.getUint32(offset, true); +} +function readUInt32BE(offset) { + "use strict"; + return this.dataView.getUint32(offset, false); +} +function readFloatLE(offset) { + "use strict"; + return this.dataView.getFloat32(offset, true); +} +function readFloatBE(offset) { + "use strict"; + return this.dataView.getFloat32(offset, false); +} +function readDoubleLE(offset) { + "use strict"; + return this.dataView.getFloat64(offset, true); +} +function readDoubleBE(offset) { + "use strict"; + return this.dataView.getFloat64(offset, false); +} +function readBigInt64LE(offset) { + "use strict"; + return this.dataView.getBigInt64(offset, true); +} +function readBigInt64BE(offset) { + "use strict"; + return this.dataView.getBigInt64(offset, false); +} +function readBigUInt64LE(offset) { + "use strict"; + return this.dataView.getBigUint64(offset, true); +} +function readBigUInt64BE(offset) { + "use strict"; + return this.dataView.getBigUint64(offset, false); +} +function writeInt8(value, offset) { + "use strict"; + this.dataView.setInt8(offset, value); + return offset + 1; +} +function writeUInt8(value, offset) { + "use strict"; + this.dataView.setUint8(offset, value); + return offset + 1; +} +function writeInt16LE(value, offset) { + "use strict"; + this.dataView.setInt16(offset, value, true); + return offset + 2; +} +function writeInt16BE(value, offset) { + "use strict"; + this.dataView.setInt16(offset, value, false); + return offset + 2; +} +function writeUInt16LE(value, offset) { + "use strict"; + this.dataView.setUint16(offset, value, true); + return offset + 2; +} +function writeUInt16BE(value, offset) { + "use strict"; + this.dataView.setUint16(offset, value, false); + return offset + 2; +} +function writeInt32LE(value, offset) { + "use strict"; + this.dataView.setInt32(offset, value, true); + return offset + 4; +} +function writeInt32BE(value, offset) { + "use strict"; + this.dataView.setInt32(offset, value, false); + return offset + 4; +} +function writeUInt32LE(value, offset) { + "use strict"; + this.dataView.setUint32(offset, value, true); + return offset + 4; +} +function writeUInt32BE(value, offset) { + "use strict"; + this.dataView.setUint32(offset, value, false); + return offset + 4; +} + +function writeFloatLE(value, offset) { + "use strict"; + this.dataView.setFloat32(offset, value, true); + return offset + 4; +} + +function writeFloatBE(value, offset) { + "use strict"; + this.dataView.setFloat32(offset, value, false); + return offset + 4; +} + +function writeDoubleLE(value, offset) { + "use strict"; + this.dataView.setFloat64(offset, value, true); + return offset + 8; +} + +function writeDoubleBE(value, offset) { + "use strict"; + this.dataView.setFloat64(offset, value, false); + return offset + 8; +} + +function writeBigInt64LE(value, offset) { + "use strict"; + this.dataView.setBigInt64(offset, value, true); + return offset + 8; +} + +function writeBigInt64BE(value, offset) { + "use strict"; + this.dataView.setBigInt64(offset, value, false); + return offset + 8; +} + +function writeBigUInt64LE(value, offset) { + "use strict"; + this.dataView.setBigUint64(offset, value, true); + return offset + 8; +} + +function writeBigUInt64BE(value, offset) { + "use strict"; + this.dataView.setBigUint64(offset, value, false); + return offset + 8; +} + +function slice(start, end) { + "use strict"; + if (start === undefined && end === undefined) { + return this; + } + + Buffer[Symbol.species] ||= Buffer; + + return new Buffer(this.buffer, this.byteOffset + (start || 0), (end || this.byteLength) - (start || 0)); +} + +function utf8Write(text, offset, length) { + "use strict"; + return this.write(text, offset, length, "utf8"); +} +function ucs2Write(text, offset, length) { + "use strict"; + return this.write(text, offset, length, "ucs2"); +} +function utf16leWrite(text, offset, length) { + "use strict"; + return this.write(text, offset, length, "utf16le"); +} +function latin1Write(text, offset, length) { + "use strict"; + return this.write(text, offset, length, "latin1"); +} +function asciiWrite(text, offset, length) { + "use strict"; + return this.write(text, offset, length, "ascii"); +} +function base64Write(text, offset, length) { + "use strict"; + return this.write(text, offset, length, "base64"); +} +function base64urlWrite(text, offset, length) { + "use strict"; + return this.write(text, offset, length, "base64url"); +} +function hexWrite(text, offset, length) { + "use strict"; + return this.write(text, offset, length, "hex"); +} + +function utf8Slice(offset, length) { + "use strict"; + return this.toString(offset, length, "utf8"); +} +function ucs2Slice(offset, length) { + "use strict"; + return this.toString(offset, length, "ucs2"); +} +function utf16leSlice(offset, length) { + "use strict"; + return this.toString(offset, length, "utf16le"); +} +function latin1Slice(offset, length) { + "use strict"; + return this.toString(offset, length, "latin1"); +} +function asciiSlice(offset, length) { + "use strict"; + return this.toString(offset, length, "ascii"); +} +function base64Slice(offset, length) { + "use strict"; + return this.toString(offset, length, "base64"); +} +function base64urlSlice(offset, length) { + "use strict"; + return this.toString(offset, length, "base64url"); +} +function hexSlice(offset, length) { + "use strict"; + return this.toString(offset, length, "hex"); +} + +function toJSON() { + "use strict"; + const type = "Buffer"; + const data = @Array.from(this); + return { type, data }; +} + +function subarray(start, end) { + "use strict"; + + Buffer[Symbol.species] ??= Buffer; + return new Buffer(this.buffer, this.byteOffset + (start || 0), (end || this.byteLength) - (start || 0)); +} + + +function initializeBunBuffer(parameters) +{ + "use strict"; + +} diff --git a/src/bun.js/builtins/js/JSZigGlobalObject.js b/src/bun.js/builtins/js/JSZigGlobalObject.js new file mode 100644 index 000000000..cb3446159 --- /dev/null +++ b/src/bun.js/builtins/js/JSZigGlobalObject.js @@ -0,0 +1,62 @@ +/* + * Copyright 2022 Codeblog Corp. All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY APPLE INC. ``AS IS'' AND ANY + * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR + * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL APPLE INC. OR + * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, + * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, + * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR + * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY + * OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +function require(name) { + "use strict"; + if (typeof name !== "string") { + @throwTypeError("require() expects a string as its argument"); + } + + const resolved = this.resolveSync(name, this.path); + var requireCache = (globalThis[Symbol.for("_requireCache")] ||= new @Map); + var cached = requireCache.@get(resolved); + if (cached) { + if (resolved.endsWith(".node")) { + return cached.exports; + } + + return cached; + } + + // TODO: remove this hardcoding + if (resolved.endsWith(".json")) { + var fs = (globalThis[Symbol.for("_fs")] ||= Bun.fs()); + var exports = JSON.parse(fs.readFileSync(resolved, "utf8")); + requireCache.@set(resolved, exports); + return exports; + } else if (resolved.endsWith(".node")) { + var module = { exports: {} }; + globalThis.process.dlopen(module, resolved); + requireCache.@set(resolved, module); + return module.exports; + } else if (resolved.endsWith(".toml")) { + var fs = (globalThis[Symbol.for("_fs")] ||= Bun.fs()); + var exports = Bun.TOML.parse(fs.readFileSync(resolved, "utf8")); + requireCache.@set(resolved, exports); + return exports; + } + + @throwTypeError(`Dynamic require isn't supported for file type: ${resolved.subsring(resolved.lastIndexOf(".") + 1) || resolved}`); +} diff --git a/src/bun.js/builtins/js/ReadableByteStreamController.js b/src/bun.js/builtins/js/ReadableByteStreamController.js new file mode 100644 index 000000000..0b47d730c --- /dev/null +++ b/src/bun.js/builtins/js/ReadableByteStreamController.js @@ -0,0 +1,117 @@ +/* + * Copyright (C) 2016 Canon Inc. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY APPLE INC. ``AS IS'' AND ANY + * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR + * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL APPLE INC. OR + * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, + * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, + * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR + * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY + * OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +function initializeReadableByteStreamController(stream, underlyingByteSource, highWaterMark) +{ + "use strict"; + + if (arguments.length !== 4 && arguments[3] !== @isReadableStream) + @throwTypeError("ReadableByteStreamController constructor should not be called directly"); + + return @privateInitializeReadableByteStreamController.@call(this, stream, underlyingByteSource, highWaterMark); +} + +function enqueue(chunk) +{ + "use strict"; + + if (!@isReadableByteStreamController(this)) + throw @makeThisTypeError("ReadableByteStreamController", "enqueue"); + + if (@getByIdDirectPrivate(this, "closeRequested")) + @throwTypeError("ReadableByteStreamController is requested to close"); + + if (@getByIdDirectPrivate(@getByIdDirectPrivate(this, "controlledReadableStream"), "state") !== @streamReadable) + @throwTypeError("ReadableStream is not readable"); + + if (!@isObject(chunk) || !@ArrayBuffer.@isView(chunk)) + @throwTypeError("Provided chunk is not a TypedArray"); + + return @readableByteStreamControllerEnqueue(this, chunk); +} + +function error(error) +{ + "use strict"; + + if (!@isReadableByteStreamController(this)) + throw @makeThisTypeError("ReadableByteStreamController", "error"); + + if (@getByIdDirectPrivate(@getByIdDirectPrivate(this, "controlledReadableStream"), "state") !== @streamReadable) + @throwTypeError("ReadableStream is not readable"); + + @readableByteStreamControllerError(this, error); +} + +function close() +{ + "use strict"; + + if (!@isReadableByteStreamController(this)) + throw @makeThisTypeError("ReadableByteStreamController", "close"); + + if (@getByIdDirectPrivate(this, "closeRequested")) + @throwTypeError("Close has already been requested"); + + if (@getByIdDirectPrivate(@getByIdDirectPrivate(this, "controlledReadableStream"), "state") !== @streamReadable) + @throwTypeError("ReadableStream is not readable"); + + @readableByteStreamControllerClose(this); +} + +@getter +function byobRequest() +{ + "use strict"; + + if (!@isReadableByteStreamController(this)) + throw @makeGetterTypeError("ReadableByteStreamController", "byobRequest"); + + + var request = @getByIdDirectPrivate(this, "byobRequest"); + if (request === @undefined) { + var pending = @getByIdDirectPrivate(this, "pendingPullIntos"); + const firstDescriptor = pending.peek(); + if (firstDescriptor) { + const view = new @Uint8Array(firstDescriptor.buffer, + firstDescriptor.byteOffset + firstDescriptor.bytesFilled, + firstDescriptor.byteLength - firstDescriptor.bytesFilled); + @putByIdDirectPrivate(this, "byobRequest", new @ReadableStreamBYOBRequest(this, view, @isReadableStream)); + } + } + + return @getByIdDirectPrivate(this, "byobRequest"); +} + +@getter +function desiredSize() +{ + "use strict"; + + if (!@isReadableByteStreamController(this)) + throw @makeGetterTypeError("ReadableByteStreamController", "desiredSize"); + + return @readableByteStreamControllerGetDesiredSize(this); +} diff --git a/src/bun.js/builtins/js/ReadableByteStreamInternals.js b/src/bun.js/builtins/js/ReadableByteStreamInternals.js new file mode 100644 index 000000000..01da62e1a --- /dev/null +++ b/src/bun.js/builtins/js/ReadableByteStreamInternals.js @@ -0,0 +1,712 @@ +/* + * Copyright (C) 2016 Canon Inc. All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY APPLE INC. ``AS IS'' AND ANY + * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR + * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL APPLE INC. OR + * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, + * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, + * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR + * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY + * OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ +// @internal + +function privateInitializeReadableByteStreamController(stream, underlyingByteSource, highWaterMark) +{ + "use strict"; + + if (!@isReadableStream(stream)) + @throwTypeError("ReadableByteStreamController needs a ReadableStream"); + + // readableStreamController is initialized with null value. + if (@getByIdDirectPrivate(stream, "readableStreamController") !== null) + @throwTypeError("ReadableStream already has a controller"); + + @putByIdDirectPrivate(this, "controlledReadableStream", stream); + @putByIdDirectPrivate(this, "underlyingByteSource", underlyingByteSource); + @putByIdDirectPrivate(this, "pullAgain", false); + @putByIdDirectPrivate(this, "pulling", false); + @readableByteStreamControllerClearPendingPullIntos(this); + @putByIdDirectPrivate(this, "queue", @newQueue()); + @putByIdDirectPrivate(this, "started", 0); + @putByIdDirectPrivate(this, "closeRequested", false); + + let hwm = @toNumber(highWaterMark); + if (@isNaN(hwm) || hwm < 0) + @throwRangeError("highWaterMark value is negative or not a number"); + @putByIdDirectPrivate(this, "strategyHWM", hwm); + + let autoAllocateChunkSize = underlyingByteSource.autoAllocateChunkSize; + if (autoAllocateChunkSize !== @undefined) { + autoAllocateChunkSize = @toNumber(autoAllocateChunkSize); + if (autoAllocateChunkSize <= 0 || autoAllocateChunkSize === @Infinity || autoAllocateChunkSize === -@Infinity) + @throwRangeError("autoAllocateChunkSize value is negative or equal to positive or negative infinity"); + } + @putByIdDirectPrivate(this, "autoAllocateChunkSize", autoAllocateChunkSize); + @putByIdDirectPrivate(this, "pendingPullIntos", @createFIFO()); + + + const controller = this; + @promiseInvokeOrNoopNoCatch(@getByIdDirectPrivate(controller, "underlyingByteSource"), "start", [controller]).@then(() => { + @putByIdDirectPrivate(controller, "started", 1); + @assert(!@getByIdDirectPrivate(controller, "pulling")); + @assert(!@getByIdDirectPrivate(controller, "pullAgain")); + @readableByteStreamControllerCallPullIfNeeded(controller); + }, (error) => { + if (@getByIdDirectPrivate(stream, "state") === @streamReadable) + @readableByteStreamControllerError(controller, error); + }); + + @putByIdDirectPrivate(this, "cancel", @readableByteStreamControllerCancel); + @putByIdDirectPrivate(this, "pull", @readableByteStreamControllerPull); + + return this; +} + +function readableStreamByteStreamControllerStart(controller) { + "use strict"; + @putByIdDirectPrivate(controller, "start", @undefined); + +} + + +function privateInitializeReadableStreamBYOBRequest(controller, view) +{ + "use strict"; + + @putByIdDirectPrivate(this, "associatedReadableByteStreamController", controller); + @putByIdDirectPrivate(this, "view", view); +} + +function isReadableByteStreamController(controller) +{ + "use strict"; + + // Same test mechanism as in isReadableStreamDefaultController (ReadableStreamInternals.js). + // See corresponding function for explanations. + return @isObject(controller) && !!@getByIdDirectPrivate(controller, "underlyingByteSource"); +} + +function isReadableStreamBYOBRequest(byobRequest) +{ + "use strict"; + + // Same test mechanism as in isReadableStreamDefaultController (ReadableStreamInternals.js). + // See corresponding function for explanations. + return @isObject(byobRequest) && !!@getByIdDirectPrivate(byobRequest, "associatedReadableByteStreamController"); +} + +function isReadableStreamBYOBReader(reader) +{ + "use strict"; + + // Spec tells to return true only if reader has a readIntoRequests internal slot. + // However, since it is a private slot, it cannot be checked using hasOwnProperty(). + // Since readIntoRequests is initialized with an empty array, the following test is ok. + return @isObject(reader) && !!@getByIdDirectPrivate(reader, "readIntoRequests"); +} + +function readableByteStreamControllerCancel(controller, reason) +{ + "use strict"; + + var pendingPullIntos = @getByIdDirectPrivate(controller, "pendingPullIntos"); + var first = pendingPullIntos.peek(); + if (first) + first.bytesFilled = 0; + + @putByIdDirectPrivate(controller, "queue", @newQueue()); + return @promiseInvokeOrNoop(@getByIdDirectPrivate(controller, "underlyingByteSource"), "cancel", [reason]); +} + +function readableByteStreamControllerError(controller, e) +{ + "use strict"; + + @assert(@getByIdDirectPrivate(@getByIdDirectPrivate(controller, "controlledReadableStream"), "state") === @streamReadable); + @readableByteStreamControllerClearPendingPullIntos(controller); + @putByIdDirectPrivate(controller, "queue", @newQueue()); + @readableStreamError(@getByIdDirectPrivate(controller, "controlledReadableStream"), e); +} + +function readableByteStreamControllerClose(controller) +{ + "use strict"; + + @assert(!@getByIdDirectPrivate(controller, "closeRequested")); + @assert(@getByIdDirectPrivate(@getByIdDirectPrivate(controller, "controlledReadableStream"), "state") === @streamReadable); + + if (@getByIdDirectPrivate(controller, "queue").size > 0) { + @putByIdDirectPrivate(controller, "closeRequested", true); + return; + } + + var first = @getByIdDirectPrivate(controller, "pendingPullIntos")?.peek(); + if (first) { + if (first.bytesFilled > 0) { + const e = @makeTypeError("Close requested while there remain pending bytes"); + @readableByteStreamControllerError(controller, e); + throw e; + } + } + + @readableStreamClose(@getByIdDirectPrivate(controller, "controlledReadableStream")); +} + +function readableByteStreamControllerClearPendingPullIntos(controller) +{ + "use strict"; + + @readableByteStreamControllerInvalidateBYOBRequest(controller); + var existing = @getByIdDirectPrivate(controller, "pendingPullIntos"); + if (existing !== @undefined) { + existing.clear(); + } else { + @putByIdDirectPrivate(controller, "pendingPullIntos", @createFIFO()); + } +} + +function readableByteStreamControllerGetDesiredSize(controller) +{ + "use strict"; + + const stream = @getByIdDirectPrivate(controller, "controlledReadableStream"); + const state = @getByIdDirectPrivate(stream, "state"); + + if (state === @streamErrored) + return null; + if (state === @streamClosed) + return 0; + + return @getByIdDirectPrivate(controller, "strategyHWM") - @getByIdDirectPrivate(controller, "queue").size; +} + +function readableStreamHasBYOBReader(stream) +{ + "use strict"; + + const reader = @getByIdDirectPrivate(stream, "reader"); + return reader !== @undefined && @isReadableStreamBYOBReader(reader); +} + +function readableStreamHasDefaultReader(stream) +{ + "use strict"; + + const reader = @getByIdDirectPrivate(stream, "reader"); + return reader !== @undefined && @isReadableStreamDefaultReader(reader); +} + +function readableByteStreamControllerHandleQueueDrain(controller) { + + "use strict"; + + @assert(@getByIdDirectPrivate(@getByIdDirectPrivate(controller, "controlledReadableStream"), "state") === @streamReadable); + if (!@getByIdDirectPrivate(controller, "queue").size && @getByIdDirectPrivate(controller, "closeRequested")) + @readableStreamClose(@getByIdDirectPrivate(controller, "controlledReadableStream")); + else + @readableByteStreamControllerCallPullIfNeeded(controller); +} + +function readableByteStreamControllerPull(controller) +{ + "use strict"; + + const stream = @getByIdDirectPrivate(controller, "controlledReadableStream"); + @assert(@readableStreamHasDefaultReader(stream)); + if (@getByIdDirectPrivate(controller, "queue").content?.isNotEmpty()) { + const entry = @getByIdDirectPrivate(controller, "queue").content.shift(); + @getByIdDirectPrivate(controller, "queue").size -= entry.byteLength; + @readableByteStreamControllerHandleQueueDrain(controller); + let view; + try { + view = new @Uint8Array(entry.buffer, entry.byteOffset, entry.byteLength); + } catch (error) { + return @Promise.@reject(error); + } + return @createFulfilledPromise({ value: view, done: false }); + } + + if (@getByIdDirectPrivate(controller, "autoAllocateChunkSize") !== @undefined) { + let buffer; + try { + buffer = @createUninitializedArrayBuffer(@getByIdDirectPrivate(controller, "autoAllocateChunkSize")); + } catch (error) { + return @Promise.@reject(error); + } + const pullIntoDescriptor = { + buffer, + byteOffset: 0, + byteLength: @getByIdDirectPrivate(controller, "autoAllocateChunkSize"), + bytesFilled: 0, + elementSize: 1, + ctor: @Uint8Array, + readerType: 'default' + }; + @getByIdDirectPrivate(controller, "pendingPullIntos").push(pullIntoDescriptor); + } + + const promise = @readableStreamAddReadRequest(stream); + @readableByteStreamControllerCallPullIfNeeded(controller); + return promise; +} + +function readableByteStreamControllerShouldCallPull(controller) +{ + "use strict"; + + const stream = @getByIdDirectPrivate(controller, "controlledReadableStream"); + + if (@getByIdDirectPrivate(stream, "state") !== @streamReadable) + return false; + if (@getByIdDirectPrivate(controller, "closeRequested")) + return false; + if (!(@getByIdDirectPrivate(controller, "started") > 0)) + return false; + const reader = @getByIdDirectPrivate(stream, "reader"); + + if (reader && (@getByIdDirectPrivate(reader, "readRequests")?.isNotEmpty() || !!@getByIdDirectPrivate(reader, "bunNativePtr"))) + return true; + if (@readableStreamHasBYOBReader(stream) && @getByIdDirectPrivate(@getByIdDirectPrivate(stream, "reader"), "readIntoRequests")?.isNotEmpty()) + return true; + if (@readableByteStreamControllerGetDesiredSize(controller) > 0) + return true; + return false; +} + +function readableByteStreamControllerCallPullIfNeeded(controller) +{ + "use strict"; + + if (!@readableByteStreamControllerShouldCallPull(controller)) + return; + + if (@getByIdDirectPrivate(controller, "pulling")) { + @putByIdDirectPrivate(controller, "pullAgain", true); + return; + } + + @assert(!@getByIdDirectPrivate(controller, "pullAgain")); + @putByIdDirectPrivate(controller, "pulling", true); + @promiseInvokeOrNoop(@getByIdDirectPrivate(controller, "underlyingByteSource"), "pull", [controller]).@then(() => { + @putByIdDirectPrivate(controller, "pulling", false); + if (@getByIdDirectPrivate(controller, "pullAgain")) { + @putByIdDirectPrivate(controller, "pullAgain", false); + @readableByteStreamControllerCallPullIfNeeded(controller); + } + }, (error) => { + if (@getByIdDirectPrivate(@getByIdDirectPrivate(controller, "controlledReadableStream"), "state") === @streamReadable) + @readableByteStreamControllerError(controller, error); + }); +} + +function transferBufferToCurrentRealm(buffer) +{ + "use strict"; + + // FIXME: Determine what should be done here exactly (what is already existing in current + // codebase and what has to be added). According to spec, Transfer operation should be + // performed in order to transfer buffer to current realm. For the moment, simply return + // received buffer. + return buffer; +} + +function readableStreamReaderKind(reader) { + "use strict"; + + + if (!!@getByIdDirectPrivate(reader, "readRequests")) + return @getByIdDirectPrivate(reader, "bunNativePtr") ? 3 : 1; + + if (!!@getByIdDirectPrivate(reader, "readIntoRequests")) + return 2; + + return 0; +} +function readableByteStreamControllerEnqueue(controller, chunk) +{ + "use strict"; + + const stream = @getByIdDirectPrivate(controller, "controlledReadableStream"); + @assert(!@getByIdDirectPrivate(controller, "closeRequested")); + @assert(@getByIdDirectPrivate(stream, "state") === @streamReadable); + + + switch (@getByIdDirectPrivate(stream, "reader") ? @readableStreamReaderKind(@getByIdDirectPrivate(stream, "reader")) : 0) { + /* default reader */ + case 1: { + if (!@getByIdDirectPrivate(@getByIdDirectPrivate(stream, "reader"), "readRequests")?.isNotEmpty()) + @readableByteStreamControllerEnqueueChunk(controller, @transferBufferToCurrentRealm(chunk.buffer), chunk.byteOffset, chunk.byteLength); + else { + @assert(!@getByIdDirectPrivate(controller, "queue").content.size()); + const transferredView = chunk.constructor === @Uint8Array ? chunk : new @Uint8Array(chunk.buffer, chunk.byteOffset, chunk.byteLength); + @readableStreamFulfillReadRequest(stream, transferredView, false); + } + break; + } + + /* BYOB */ + case 2: { + @readableByteStreamControllerEnqueueChunk(controller, @transferBufferToCurrentRealm(chunk.buffer), chunk.byteOffset, chunk.byteLength); + @readableByteStreamControllerProcessPullDescriptors(controller); + break; + } + + /* NativeReader */ + case 3: { + // reader.@enqueueNative(@getByIdDirectPrivate(reader, "bunNativePtr"), chunk); + + break; + } + + default: { + @assert(!@isReadableStreamLocked(stream)); + @readableByteStreamControllerEnqueueChunk(controller, @transferBufferToCurrentRealm(chunk.buffer), chunk.byteOffset, chunk.byteLength); + break; + } + } +} + +// Spec name: readableByteStreamControllerEnqueueChunkToQueue. +function readableByteStreamControllerEnqueueChunk(controller, buffer, byteOffset, byteLength) +{ + "use strict"; + + @getByIdDirectPrivate(controller, "queue").content.push({ + buffer: buffer, + byteOffset: byteOffset, + byteLength: byteLength + }); + @getByIdDirectPrivate(controller, "queue").size += byteLength; +} + +function readableByteStreamControllerRespondWithNewView(controller, view) +{ + "use strict"; + + @assert(@getByIdDirectPrivate(controller, "pendingPullIntos").isNotEmpty()); + + let firstDescriptor = @getByIdDirectPrivate(controller, "pendingPullIntos").peek(); + + if (firstDescriptor.byteOffset + firstDescriptor.bytesFilled !== view.byteOffset) + @throwRangeError("Invalid value for view.byteOffset"); + + if (firstDescriptor.byteLength !== view.byteLength) + @throwRangeError("Invalid value for view.byteLength"); + + firstDescriptor.buffer = view.buffer; + @readableByteStreamControllerRespondInternal(controller, view.byteLength); +} + +function readableByteStreamControllerRespond(controller, bytesWritten) +{ + "use strict"; + + bytesWritten = @toNumber(bytesWritten); + + if (@isNaN(bytesWritten) || bytesWritten === @Infinity || bytesWritten < 0 ) + @throwRangeError("bytesWritten has an incorrect value"); + + @assert(@getByIdDirectPrivate(controller, "pendingPullIntos").isNotEmpty()); + + @readableByteStreamControllerRespondInternal(controller, bytesWritten); +} + +function readableByteStreamControllerRespondInternal(controller, bytesWritten) +{ + "use strict"; + + let firstDescriptor = @getByIdDirectPrivate(controller, "pendingPullIntos").peek(); + let stream = @getByIdDirectPrivate(controller, "controlledReadableStream"); + + if (@getByIdDirectPrivate(stream, "state") === @streamClosed) { + if (bytesWritten !== 0) + @throwTypeError("bytesWritten is different from 0 even though stream is closed"); + @readableByteStreamControllerRespondInClosedState(controller, firstDescriptor); + } else { + @assert(@getByIdDirectPrivate(stream, "state") === @streamReadable); + @readableByteStreamControllerRespondInReadableState(controller, bytesWritten, firstDescriptor); + } +} + +function readableByteStreamControllerRespondInReadableState(controller, bytesWritten, pullIntoDescriptor) +{ + "use strict"; + + if (pullIntoDescriptor.bytesFilled + bytesWritten > pullIntoDescriptor.byteLength) + @throwRangeError("bytesWritten value is too great"); + + @assert(@getByIdDirectPrivate(controller, "pendingPullIntos").isEmpty() || @getByIdDirectPrivate(controller, "pendingPullIntos").peek() === pullIntoDescriptor); + @readableByteStreamControllerInvalidateBYOBRequest(controller); + pullIntoDescriptor.bytesFilled += bytesWritten; + + if (pullIntoDescriptor.bytesFilled < pullIntoDescriptor.elementSize) + return; + + @readableByteStreamControllerShiftPendingDescriptor(controller); + const remainderSize = pullIntoDescriptor.bytesFilled % pullIntoDescriptor.elementSize; + + if (remainderSize > 0) { + const end = pullIntoDescriptor.byteOffset + pullIntoDescriptor.bytesFilled; + const remainder = @cloneArrayBuffer(pullIntoDescriptor.buffer, end - remainderSize, remainderSize); + @readableByteStreamControllerEnqueueChunk(controller, remainder, 0, remainder.byteLength); + } + + pullIntoDescriptor.buffer = @transferBufferToCurrentRealm(pullIntoDescriptor.buffer); + pullIntoDescriptor.bytesFilled -= remainderSize; + @readableByteStreamControllerCommitDescriptor(@getByIdDirectPrivate(controller, "controlledReadableStream"), pullIntoDescriptor); + @readableByteStreamControllerProcessPullDescriptors(controller); +} + +function readableByteStreamControllerRespondInClosedState(controller, firstDescriptor) +{ + "use strict"; + + firstDescriptor.buffer = @transferBufferToCurrentRealm(firstDescriptor.buffer); + @assert(firstDescriptor.bytesFilled === 0); + + if (@readableStreamHasBYOBReader(@getByIdDirectPrivate(controller, "controlledReadableStream"))) { + while (@getByIdDirectPrivate(@getByIdDirectPrivate(@getByIdDirectPrivate(controller, "controlledReadableStream"), "reader"), "readIntoRequests")?.isNotEmpty()) { + let pullIntoDescriptor = @readableByteStreamControllerShiftPendingDescriptor(controller); + @readableByteStreamControllerCommitDescriptor(@getByIdDirectPrivate(controller, "controlledReadableStream"), pullIntoDescriptor); + } + } +} + +// Spec name: readableByteStreamControllerProcessPullIntoDescriptorsUsingQueue (shortened for readability). +function readableByteStreamControllerProcessPullDescriptors(controller) +{ + "use strict"; + + @assert(!@getByIdDirectPrivate(controller, "closeRequested")); + while (@getByIdDirectPrivate(controller, "pendingPullIntos").isNotEmpty()) { + if (@getByIdDirectPrivate(controller, "queue").size === 0) + return; + let pullIntoDescriptor = @getByIdDirectPrivate(controller, "pendingPullIntos").peek(); + if (@readableByteStreamControllerFillDescriptorFromQueue(controller, pullIntoDescriptor)) { + @readableByteStreamControllerShiftPendingDescriptor(controller); + @readableByteStreamControllerCommitDescriptor(@getByIdDirectPrivate(controller, "controlledReadableStream"), pullIntoDescriptor); + } + } +} + +// Spec name: readableByteStreamControllerFillPullIntoDescriptorFromQueue (shortened for readability). +function readableByteStreamControllerFillDescriptorFromQueue(controller, pullIntoDescriptor) +{ + "use strict"; + + const currentAlignedBytes = pullIntoDescriptor.bytesFilled - (pullIntoDescriptor.bytesFilled % pullIntoDescriptor.elementSize); + const maxBytesToCopy = @getByIdDirectPrivate(controller, "queue").size < pullIntoDescriptor.byteLength - pullIntoDescriptor.bytesFilled ? + @getByIdDirectPrivate(controller, "queue").size : pullIntoDescriptor.byteLength - pullIntoDescriptor.bytesFilled; + const maxBytesFilled = pullIntoDescriptor.bytesFilled + maxBytesToCopy; + const maxAlignedBytes = maxBytesFilled - (maxBytesFilled % pullIntoDescriptor.elementSize); + let totalBytesToCopyRemaining = maxBytesToCopy; + let ready = false; + + if (maxAlignedBytes > currentAlignedBytes) { + totalBytesToCopyRemaining = maxAlignedBytes - pullIntoDescriptor.bytesFilled; + ready = true; + } + + while (totalBytesToCopyRemaining > 0) { + let headOfQueue = @getByIdDirectPrivate(controller, "queue").content.peek(); + const bytesToCopy = totalBytesToCopyRemaining < headOfQueue.byteLength ? totalBytesToCopyRemaining : headOfQueue.byteLength; + // Copy appropriate part of pullIntoDescriptor.buffer to headOfQueue.buffer. + // Remark: this implementation is not completely aligned on the definition of CopyDataBlockBytes + // operation of ECMAScript (the case of Shared Data Block is not considered here, but it doesn't seem to be an issue). + const destStart = pullIntoDescriptor.byteOffset + pullIntoDescriptor.bytesFilled; + // FIXME: As indicated in comments of bug 172717, access to set is not safe. However, using prototype.@set.@call does + // not work (@set is undefined). A safe way to do that is needed. + new @Uint8Array(pullIntoDescriptor.buffer).set(new @Uint8Array(headOfQueue.buffer, headOfQueue.byteOffset, bytesToCopy), destStart); + + if (headOfQueue.byteLength === bytesToCopy) + @getByIdDirectPrivate(controller, "queue").content.shift(); + else { + headOfQueue.byteOffset += bytesToCopy; + headOfQueue.byteLength -= bytesToCopy; + } + + @getByIdDirectPrivate(controller, "queue").size -= bytesToCopy; + @assert(@getByIdDirectPrivate(controller, "pendingPullIntos").isEmpty() || @getByIdDirectPrivate(controller, "pendingPullIntos").peek() === pullIntoDescriptor); + @readableByteStreamControllerInvalidateBYOBRequest(controller); + pullIntoDescriptor.bytesFilled += bytesToCopy; + totalBytesToCopyRemaining -= bytesToCopy; + } + + if (!ready) { + @assert(@getByIdDirectPrivate(controller, "queue").size === 0); + @assert(pullIntoDescriptor.bytesFilled > 0); + @assert(pullIntoDescriptor.bytesFilled < pullIntoDescriptor.elementSize); + } + + return ready; +} + +// Spec name: readableByteStreamControllerShiftPendingPullInto (renamed for consistency). +function readableByteStreamControllerShiftPendingDescriptor(controller) +{ + "use strict"; + + let descriptor = @getByIdDirectPrivate(controller, "pendingPullIntos").shift(); + @readableByteStreamControllerInvalidateBYOBRequest(controller); + return descriptor; +} + +function readableByteStreamControllerInvalidateBYOBRequest(controller) +{ + "use strict"; + + if (@getByIdDirectPrivate(controller, "byobRequest") === @undefined) + return; + const byobRequest = @getByIdDirectPrivate(controller, "byobRequest"); + @putByIdDirectPrivate(byobRequest, "associatedReadableByteStreamController", @undefined); + @putByIdDirectPrivate(byobRequest, "view", @undefined); + @putByIdDirectPrivate(controller, "byobRequest", @undefined); +} + +// Spec name: readableByteStreamControllerCommitPullIntoDescriptor (shortened for readability). +function readableByteStreamControllerCommitDescriptor(stream, pullIntoDescriptor) +{ + "use strict"; + + @assert(@getByIdDirectPrivate(stream, "state") !== @streamErrored); + let done = false; + if (@getByIdDirectPrivate(stream, "state") === @streamClosed) { + @assert(!pullIntoDescriptor.bytesFilled); + done = true; + } + let filledView = @readableByteStreamControllerConvertDescriptor(pullIntoDescriptor); + if (pullIntoDescriptor.readerType === "default") + @readableStreamFulfillReadRequest(stream, filledView, done); + else { + @assert(pullIntoDescriptor.readerType === "byob"); + @readableStreamFulfillReadIntoRequest(stream, filledView, done); + } +} + +// Spec name: readableByteStreamControllerConvertPullIntoDescriptor (shortened for readability). +function readableByteStreamControllerConvertDescriptor(pullIntoDescriptor) +{ + "use strict"; + + @assert(pullIntoDescriptor.bytesFilled <= pullIntoDescriptor.byteLength); + @assert(pullIntoDescriptor.bytesFilled % pullIntoDescriptor.elementSize === 0); + + return new pullIntoDescriptor.ctor(pullIntoDescriptor.buffer, pullIntoDescriptor.byteOffset, pullIntoDescriptor.bytesFilled / pullIntoDescriptor.elementSize); +} + +function readableStreamFulfillReadIntoRequest(stream, chunk, done) +{ + "use strict"; + const readIntoRequest = @getByIdDirectPrivate(@getByIdDirectPrivate(stream, "reader"), "readIntoRequests").shift(); + @fulfillPromise(readIntoRequest, { value: chunk, done: done }); +} + +function readableStreamBYOBReaderRead(reader, view) +{ + "use strict"; + + const stream = @getByIdDirectPrivate(reader, "ownerReadableStream"); + @assert(!!stream); + + @putByIdDirectPrivate(stream, "disturbed", true); + if (@getByIdDirectPrivate(stream, "state") === @streamErrored) + return @Promise.@reject(@getByIdDirectPrivate(stream, "storedError")); + + return @readableByteStreamControllerPullInto(@getByIdDirectPrivate(stream, "readableStreamController"), view); +} + +function readableByteStreamControllerPullInto(controller, view) +{ + "use strict"; + + const stream = @getByIdDirectPrivate(controller, "controlledReadableStream"); + let elementSize = 1; + // Spec describes that in the case where view is a TypedArray, elementSize + // should be set to the size of an element (e.g. 2 for UInt16Array). For + // DataView, BYTES_PER_ELEMENT is undefined, contrary to the same property + // for TypedArrays. + // FIXME: Getting BYTES_PER_ELEMENT like this is not safe (property is read-only + // but can be modified if the prototype is redefined). A safe way of getting + // it would be to determine which type of ArrayBufferView view is an instance + // of based on typed arrays private variables. However, this is not possible due + // to bug 167697, which prevents access to typed arrays through their private + // names unless public name has already been met before. + if (view.BYTES_PER_ELEMENT !== @undefined) + elementSize = view.BYTES_PER_ELEMENT; + + // FIXME: Getting constructor like this is not safe. A safe way of getting + // it would be to determine which type of ArrayBufferView view is an instance + // of, and to assign appropriate constructor based on this (e.g. ctor = + // @Uint8Array). However, this is not possible due to bug 167697, which + // prevents access to typed arrays through their private names unless public + // name has already been met before. + const ctor = view.constructor; + + const pullIntoDescriptor = { + buffer: view.buffer, + byteOffset: view.byteOffset, + byteLength: view.byteLength, + bytesFilled: 0, + elementSize, + ctor, + readerType: 'byob' + }; + + var pending = @getByIdDirectPrivate(controller, "pendingPullIntos"); + if (pending?.isNotEmpty()) { + pullIntoDescriptor.buffer = @transferBufferToCurrentRealm(pullIntoDescriptor.buffer); + pending.push(pullIntoDescriptor); + return @readableStreamAddReadIntoRequest(stream); + } + + if (@getByIdDirectPrivate(stream, "state") === @streamClosed) { + const emptyView = new ctor(pullIntoDescriptor.buffer, pullIntoDescriptor.byteOffset, 0); + return @createFulfilledPromise({ value: emptyView, done: true }); + } + + if (@getByIdDirectPrivate(controller, "queue").size > 0) { + if (@readableByteStreamControllerFillDescriptorFromQueue(controller, pullIntoDescriptor)) { + const filledView = @readableByteStreamControllerConvertDescriptor(pullIntoDescriptor); + @readableByteStreamControllerHandleQueueDrain(controller); + return @createFulfilledPromise({ value: filledView, done: false }); + } + if (@getByIdDirectPrivate(controller, "closeRequested")) { + const e = @makeTypeError("Closing stream has been requested"); + @readableByteStreamControllerError(controller, e); + return @Promise.@reject(e); + } + } + + pullIntoDescriptor.buffer = @transferBufferToCurrentRealm(pullIntoDescriptor.buffer); + @getByIdDirectPrivate(controller, "pendingPullIntos").push(pullIntoDescriptor); + const promise = @readableStreamAddReadIntoRequest(stream); + @readableByteStreamControllerCallPullIfNeeded(controller); + return promise; +} + +function readableStreamAddReadIntoRequest(stream) +{ + "use strict"; + + @assert(@isReadableStreamBYOBReader(@getByIdDirectPrivate(stream, "reader"))); + @assert(@getByIdDirectPrivate(stream, "state") === @streamReadable || @getByIdDirectPrivate(stream, "state") === @streamClosed); + + const readRequest = @newPromise(); + @getByIdDirectPrivate(@getByIdDirectPrivate(stream, "reader"), "readIntoRequests").push(readRequest); + + return readRequest; +} diff --git a/src/bun.js/builtins/js/ReadableStream.js b/src/bun.js/builtins/js/ReadableStream.js new file mode 100644 index 000000000..db7cf85a8 --- /dev/null +++ b/src/bun.js/builtins/js/ReadableStream.js @@ -0,0 +1,506 @@ +/* + * Copyright (C) 2015 Canon Inc. + * Copyright (C) 2015 Igalia. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY APPLE INC. ``AS IS'' AND ANY + * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR + * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL APPLE INC. OR + * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, + * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, + * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR + * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY + * OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +function initializeReadableStream(underlyingSource, strategy) +{ + "use strict"; + + if (underlyingSource === @undefined) + underlyingSource = { @bunNativeType: 0, @bunNativePtr: 0, @lazy: false }; + if (strategy === @undefined) + strategy = { }; + + if (!@isObject(underlyingSource)) + @throwTypeError("ReadableStream constructor takes an object as first argument"); + + if (strategy !== @undefined && !@isObject(strategy)) + @throwTypeError("ReadableStream constructor takes an object as second argument, if any"); + + @putByIdDirectPrivate(this, "state", @streamReadable); + + @putByIdDirectPrivate(this, "reader", @undefined); + + @putByIdDirectPrivate(this, "storedError", @undefined); + + @putByIdDirectPrivate(this, "disturbed", false); + + // Initialized with null value to enable distinction with undefined case. + @putByIdDirectPrivate(this, "readableStreamController", null); + @putByIdDirectPrivate(this, "bunNativeType", @getByIdDirectPrivate(underlyingSource, "bunNativeType") ?? 0); + @putByIdDirectPrivate(this, "bunNativePtr", @getByIdDirectPrivate(underlyingSource, "bunNativePtr") ?? 0); + + const isDirect = underlyingSource.type === "direct"; + // direct streams are always lazy + const isUnderlyingSourceLazy = !!underlyingSource.@lazy; + const isLazy = isDirect || isUnderlyingSourceLazy; + + // // FIXME: We should introduce https://streams.spec.whatwg.org/#create-readable-stream. + // // For now, we emulate this with underlyingSource with private properties. + if (@getByIdDirectPrivate(underlyingSource, "pull") !== @undefined && !isLazy) { + const size = @getByIdDirectPrivate(strategy, "size"); + const highWaterMark = @getByIdDirectPrivate(strategy, "highWaterMark"); + @setupReadableStreamDefaultController(this, underlyingSource, size, highWaterMark !== @undefined ? highWaterMark : 1, @getByIdDirectPrivate(underlyingSource, "start"), @getByIdDirectPrivate(underlyingSource, "pull"), @getByIdDirectPrivate(underlyingSource, "cancel")); + + return this; + } + if (isDirect) { + @putByIdDirectPrivate(this, "start", () => @createReadableStreamController(this, underlyingSource, strategy)); + } else if (isLazy) { + const autoAllocateChunkSize = underlyingSource.autoAllocateChunkSize; + + + @putByIdDirectPrivate(this, "start", () => { + const instance = @lazyLoadStream(this, autoAllocateChunkSize); + if (instance) { + @createReadableStreamController(this, instance, strategy); + } + }); + } else { + @putByIdDirectPrivate(this, "start", @undefined); + @createReadableStreamController(this, underlyingSource, strategy); + } + + + return this; +} + + +@globalPrivate +function readableStreamToArray(stream) { + "use strict"; + + if (!stream || @getByIdDirectPrivate(stream, "state") === @streamClosed) { + return null; + } + var reader = stream.getReader(); + var manyResult = reader.readMany(); + + async function processManyResult(result) { + + if (result.done) { + return null; + } + + var chunks = result.value || []; + + while (true) { + var thisResult = await reader.read(); + + if (thisResult.done) { + return chunks; + } + + chunks.push(thisResult.value); + } + + return chunks; + }; + + + if (manyResult && @isPromise(manyResult)) { + return manyResult.@then(processManyResult); + } + + if (manyResult && manyResult.done) { + return null; + } + + return processManyResult(manyResult); +} + +@globalPrivate +function readableStreamToText(stream) { + "use strict"; + + // TODO: optimize this to skip the extra ArrayBuffer + return globalThis.Bun.readableStreamToArrayBuffer(stream).@then(function(arrayBuffer) { + return new globalThis.TextDecoder().decode(arrayBuffer); + }); +} + +@globalPrivate +function readableStreamToJSON(stream) { + "use strict"; + + // TODO: optimize this to skip the extra ArrayBuffer + return globalThis.Bun.readableStreamToArrayBuffer(stream).@then(function(arrayBuffer) { + return globalThis.JSON.parse(new globalThis.TextDecoder().decode(arrayBuffer)); + }); +} + +@globalPrivate +function readableStreamToBlob(stream) { + "use strict"; + + + const array = @readableStreamToArray(stream); + if (array === null) { + return new globalThis.Blob(); + } + + return array.@then(function(chunks) { + if (chunks === null || chunks.length === 0) { + return new globalThis.Blob(); + } + + return new globalThis.Blob(chunks); + }); +} + +@globalPrivate +function readableStreamToArrayPublic(stream) { + "use strict"; + + if (@getByIdDirectPrivate(stream, "state") === @streamClosed) { + return []; + } + var reader = stream.getReader(); + + var manyResult = reader.readMany(); + + var processManyResult = (0, (async function(result) { + if (result.done) { + return []; + } + + var chunks = result.value || []; + + while (true) { + var thisResult = await reader.read(); + if (thisResult.done) { + return chunks; + } + + chunks.push(thisResult.value); + } + + return chunks; + })); + + + if (manyResult && @isPromise(manyResult)) { + return manyResult.then(processManyResult); + } + + if (manyResult && manyResult.done) { + return []; + } + + return processManyResult(manyResult); +} + + + +@globalPrivate +function consumeReadableStream(nativePtr, nativeType, inputStream) { + "use strict"; + const symbol = globalThis.Symbol.for("Bun.consumeReadableStreamPrototype"); + var cached = globalThis[symbol]; + if (!cached) { + cached = globalThis[symbol] = []; + } + var Prototype = cached[nativeType]; + if (Prototype === @undefined) { + var [doRead, doError, doReadMany, doClose, onClose, deinit] = globalThis[globalThis.Symbol.for("Bun.lazy")](nativeType); + + Prototype = class NativeReadableStreamSink { + constructor(reader, ptr) { + this.#ptr = ptr; + this.#reader = reader; + this.#didClose = false; + + this.handleError = this._handleError.bind(this); + this.handleClosed = this._handleClosed.bind(this); + this.processResult = this._processResult.bind(this); + + reader.closed.then(this.handleClosed, this.handleError); + } + + handleError; + handleClosed; + _handleClosed() { + if (this.#didClose) return; + this.#didClose = true; + var ptr = this.#ptr; + this.#ptr = 0; + doClose(ptr); + deinit(ptr); + } + + _handleError(error) { + if (this.#didClose) return; + this.#didClose = true; + var ptr = this.#ptr; + this.#ptr = 0; + doError(ptr, error); + deinit(ptr); + } + + #ptr; + #didClose = false; + #reader; + + _handleReadMany({value, done, size}) { + if (done) { + this.handleClosed(); + return; + } + + if (this.#didClose) return; + + + doReadMany(this.#ptr, value, done, size); + } + + + read() { + if (!this.#ptr) return @throwTypeError("ReadableStreamSink is already closed"); + + return this.processResult(this.#reader.read()); + + } + + _processResult(result) { + if (result && @isPromise(result)) { + const flags = @getPromiseInternalField(result, @promiseFieldFlags); + if (flags & @promiseStateFulfilled) { + const fulfilledValue = @getPromiseInternalField(result, @promiseFieldReactionsOrResult); + if (fulfilledValue) { + result = fulfilledValue; + } + } + } + + if (result && @isPromise(result)) { + result.then(this.processResult, this.handleError); + return null; + } + + if (result.done) { + this.handleClosed(); + return 0; + } else if (result.value) { + return result.value; + } else { + return -1; + } + } + + readMany() { + if (!this.#ptr) return @throwTypeError("ReadableStreamSink is already closed"); + return this.processResult(this.#reader.readMany()); + } + + + }; + + const minlength = nativeType + 1; + if (cached.length < minlength) { + cached.length = minlength; + } + @putByValDirect(cached, nativeType, Prototype); + } + + if (@isReadableStreamLocked(inputStream)) { + @throwTypeError("Cannot start reading from a locked stream"); + } + + return new Prototype(inputStream.getReader(), nativePtr); +} + +@globalPrivate +function createEmptyReadableStream() { + "use strict"; + + var stream = new @ReadableStream({ + pull() {}, + }); + @readableStreamClose(stream); + return stream; +} + +@globalPrivate +function createNativeReadableStream(nativePtr, nativeType, autoAllocateChunkSize) { + "use strict"; + return new @ReadableStream({ + @lazy: true, + @bunNativeType: nativeType, + @bunNativePtr: nativePtr, + autoAllocateChunkSize: autoAllocateChunkSize, + }); +} + +function cancel(reason) +{ + "use strict"; + + if (!@isReadableStream(this)) + return @Promise.@reject(@makeThisTypeError("ReadableStream", "cancel")); + + if (@isReadableStreamLocked(this)) + return @Promise.@reject(@makeTypeError("ReadableStream is locked")); + + return @readableStreamCancel(this, reason); +} + +function getReader(options) +{ + "use strict"; + + if (!@isReadableStream(this)) + throw @makeThisTypeError("ReadableStream", "getReader"); + + const mode = @toDictionary(options, { }, "ReadableStream.getReader takes an object as first argument").mode; + if (mode === @undefined) { + var start_ = @getByIdDirectPrivate(this, "start"); + if (start_) { + @putByIdDirectPrivate(this, "start", @undefined); + start_(); + } + + return new @ReadableStreamDefaultReader(this); + } + // String conversion is required by spec, hence double equals. + if (mode == 'byob') { + return new @ReadableStreamBYOBReader(this); + } + + + @throwTypeError("Invalid mode is specified"); +} + +function pipeThrough(streams, options) +{ + "use strict"; + + const transforms = streams; + + const readable = transforms["readable"]; + if (!@isReadableStream(readable)) + throw @makeTypeError("readable should be ReadableStream"); + + const writable = transforms["writable"]; + const internalWritable = @getInternalWritableStream(writable); + if (!@isWritableStream(internalWritable)) + throw @makeTypeError("writable should be WritableStream"); + + let preventClose = false; + let preventAbort = false; + let preventCancel = false; + let signal; + if (!@isUndefinedOrNull(options)) { + if (!@isObject(options)) + throw @makeTypeError("options must be an object"); + + preventAbort = !!options["preventAbort"]; + preventCancel = !!options["preventCancel"]; + preventClose = !!options["preventClose"]; + + signal = options["signal"]; + if (signal !== @undefined && !@isAbortSignal(signal)) + throw @makeTypeError("options.signal must be AbortSignal"); + } + + if (!@isReadableStream(this)) + throw @makeThisTypeError("ReadableStream", "pipeThrough"); + + if (@isReadableStreamLocked(this)) + throw @makeTypeError("ReadableStream is locked"); + + if (@isWritableStreamLocked(internalWritable)) + throw @makeTypeError("WritableStream is locked"); + + @readableStreamPipeToWritableStream(this, internalWritable, preventClose, preventAbort, preventCancel, signal); + + return readable; +} + +function pipeTo(destination) +{ + "use strict"; + + // FIXME: https://bugs.webkit.org/show_bug.cgi?id=159869. + // Built-in generator should be able to parse function signature to compute the function length correctly. + let options = arguments[1]; + + let preventClose = false; + let preventAbort = false; + let preventCancel = false; + let signal; + if (!@isUndefinedOrNull(options)) { + if (!@isObject(options)) + return @Promise.@reject(@makeTypeError("options must be an object")); + + try { + preventAbort = !!options["preventAbort"]; + preventCancel = !!options["preventCancel"]; + preventClose = !!options["preventClose"]; + + signal = options["signal"]; + } catch(e) { + return @Promise.@reject(e); + } + + if (signal !== @undefined && !@isAbortSignal(signal)) + return @Promise.@reject(@makeTypeError("options.signal must be AbortSignal")); + } + + const internalDestination = @getInternalWritableStream(destination); + if (!@isWritableStream(internalDestination)) + return @Promise.@reject(@makeTypeError("ReadableStream pipeTo requires a WritableStream")); + + if (!@isReadableStream(this)) + return @Promise.@reject(@makeThisTypeError("ReadableStream", "pipeTo")); + + if (@isReadableStreamLocked(this)) + return @Promise.@reject(@makeTypeError("ReadableStream is locked")); + + if (@isWritableStreamLocked(internalDestination)) + return @Promise.@reject(@makeTypeError("WritableStream is locked")); + + return @readableStreamPipeToWritableStream(this, internalDestination, preventClose, preventAbort, preventCancel, signal); +} + +function tee() +{ + "use strict"; + + if (!@isReadableStream(this)) + throw @makeThisTypeError("ReadableStream", "tee"); + + return @readableStreamTee(this, false); +} + +@getter +function locked() +{ + "use strict"; + + if (!@isReadableStream(this)) + throw @makeGetterTypeError("ReadableStream", "locked"); + + return @isReadableStreamLocked(this); +} diff --git a/src/bun.js/builtins/js/ReadableStreamBYOBReader.js b/src/bun.js/builtins/js/ReadableStreamBYOBReader.js new file mode 100644 index 000000000..16e4ebce5 --- /dev/null +++ b/src/bun.js/builtins/js/ReadableStreamBYOBReader.js @@ -0,0 +1,102 @@ +/* + * Copyright (C) 2017 Canon Inc. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY CANON INC. AND ITS CONTRIBUTORS "AS IS" AND ANY + * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED + * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE + * DISCLAIMED. IN NO EVENT SHALL CANON INC. AND ITS CONTRIBUTORS BE LIABLE FOR + * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL + * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR + * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER + * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, + * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +function initializeReadableStreamBYOBReader(stream) +{ + "use strict"; + + if (!@isReadableStream(stream)) + @throwTypeError("ReadableStreamBYOBReader needs a ReadableStream"); + if (!@isReadableByteStreamController(@getByIdDirectPrivate(stream, "readableStreamController"))) + @throwTypeError("ReadableStreamBYOBReader needs a ReadableByteStreamController"); + if (@isReadableStreamLocked(stream)) + @throwTypeError("ReadableStream is locked"); + + @readableStreamReaderGenericInitialize(this, stream); + @putByIdDirectPrivate(this, "readIntoRequests", @createFIFO()); + + return this; +} + +function cancel(reason) +{ + "use strict"; + + if (!@isReadableStreamBYOBReader(this)) + return @Promise.@reject(@makeThisTypeError("ReadableStreamBYOBReader", "cancel")); + + if (!@getByIdDirectPrivate(this, "ownerReadableStream")) + return @Promise.@reject(@makeTypeError("cancel() called on a reader owned by no readable stream")); + + return @readableStreamReaderGenericCancel(this, reason); +} + +function read(view) +{ + "use strict"; + + if (!@isReadableStreamBYOBReader(this)) + return @Promise.@reject(@makeThisTypeError("ReadableStreamBYOBReader", "read")); + + if (!@getByIdDirectPrivate(this, "ownerReadableStream")) + return @Promise.@reject(@makeTypeError("read() called on a reader owned by no readable stream")); + + if (!@isObject(view)) + return @Promise.@reject(@makeTypeError("Provided view is not an object")); + + if (!@ArrayBuffer.@isView(view)) + return @Promise.@reject(@makeTypeError("Provided view is not an ArrayBufferView")); + + if (view.byteLength === 0) + return @Promise.@reject(@makeTypeError("Provided view cannot have a 0 byteLength")); + + return @readableStreamBYOBReaderRead(this, view); +} + +function releaseLock() +{ + "use strict"; + + if (!@isReadableStreamBYOBReader(this)) + throw @makeThisTypeError("ReadableStreamBYOBReader", "releaseLock"); + + if (!@getByIdDirectPrivate(this, "ownerReadableStream")) + return; + + if (@getByIdDirectPrivate(this, "readIntoRequests")?.isNotEmpty()) + @throwTypeError("There are still pending read requests, cannot release the lock"); + + @readableStreamReaderGenericRelease(this); +} + +@getter +function closed() +{ + "use strict"; + + if (!@isReadableStreamBYOBReader(this)) + return @Promise.@reject(@makeGetterTypeError("ReadableStreamBYOBReader", "closed")); + + return @getByIdDirectPrivate(this, "closedPromiseCapability").@promise; +} diff --git a/src/bun.js/builtins/js/ReadableStreamBYOBRequest.js b/src/bun.js/builtins/js/ReadableStreamBYOBRequest.js new file mode 100644 index 000000000..d97667165 --- /dev/null +++ b/src/bun.js/builtins/js/ReadableStreamBYOBRequest.js @@ -0,0 +1,77 @@ +/* + * Copyright (C) 2017 Canon Inc. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY APPLE INC. ``AS IS'' AND ANY + * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR + * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL APPLE INC. OR + * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, + * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, + * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR + * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY + * OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +function initializeReadableStreamBYOBRequest(controller, view) +{ + "use strict"; + + if (arguments.length !== 3 && arguments[2] !== @isReadableStream) + @throwTypeError("ReadableStreamBYOBRequest constructor should not be called directly"); + + return @privateInitializeReadableStreamBYOBRequest.@call(this, controller, view); +} + +function respond(bytesWritten) +{ + "use strict"; + + if (!@isReadableStreamBYOBRequest(this)) + throw @makeThisTypeError("ReadableStreamBYOBRequest", "respond"); + + if (@getByIdDirectPrivate(this, "associatedReadableByteStreamController") === @undefined) + @throwTypeError("ReadableStreamBYOBRequest.associatedReadableByteStreamController is undefined"); + + return @readableByteStreamControllerRespond(@getByIdDirectPrivate(this, "associatedReadableByteStreamController"), bytesWritten); +} + +function respondWithNewView(view) +{ + "use strict"; + + if (!@isReadableStreamBYOBRequest(this)) + throw @makeThisTypeError("ReadableStreamBYOBRequest", "respond"); + + if (@getByIdDirectPrivate(this, "associatedReadableByteStreamController") === @undefined) + @throwTypeError("ReadableStreamBYOBRequest.associatedReadableByteStreamController is undefined"); + + if (!@isObject(view)) + @throwTypeError("Provided view is not an object"); + + if (!@ArrayBuffer.@isView(view)) + @throwTypeError("Provided view is not an ArrayBufferView"); + + return @readableByteStreamControllerRespondWithNewView(@getByIdDirectPrivate(this, "associatedReadableByteStreamController"), view); +} + +@getter +function view() +{ + "use strict"; + + if (!@isReadableStreamBYOBRequest(this)) + throw @makeGetterTypeError("ReadableStreamBYOBRequest", "view"); + + return @getByIdDirectPrivate(this, "view"); +} diff --git a/src/bun.js/builtins/js/ReadableStreamDefaultController.js b/src/bun.js/builtins/js/ReadableStreamDefaultController.js new file mode 100644 index 000000000..07fc65cb1 --- /dev/null +++ b/src/bun.js/builtins/js/ReadableStreamDefaultController.js @@ -0,0 +1,82 @@ +/* + * Copyright (C) 2015 Canon Inc. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY APPLE INC. ``AS IS'' AND ANY + * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR + * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL APPLE INC. OR + * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, + * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, + * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR + * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY + * OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +function initializeReadableStreamDefaultController(stream, underlyingSource, size, highWaterMark) +{ + "use strict"; + + if (arguments.length !== 5 && arguments[4] !== @isReadableStream) + @throwTypeError("ReadableStreamDefaultController constructor should not be called directly"); + + return @privateInitializeReadableStreamDefaultController.@call(this, stream, underlyingSource, size, highWaterMark); +} + +function enqueue(chunk) +{ + "use strict"; + + if (!@isReadableStreamDefaultController(this)) + throw @makeThisTypeError("ReadableStreamDefaultController", "enqueue"); + + if (!@readableStreamDefaultControllerCanCloseOrEnqueue(this)) + @throwTypeError("ReadableStreamDefaultController is not in a state where chunk can be enqueued"); + + return @readableStreamDefaultControllerEnqueue(this, chunk); +} + +function error(error) +{ + "use strict"; + + if (!@isReadableStreamDefaultController(this)) + throw @makeThisTypeError("ReadableStreamDefaultController", "error"); + + @readableStreamDefaultControllerError(this, error); +} + +function close() +{ + "use strict"; + + if (!@isReadableStreamDefaultController(this)) + throw @makeThisTypeError("ReadableStreamDefaultController", "close"); + + if (!@readableStreamDefaultControllerCanCloseOrEnqueue(this)) + @throwTypeError("ReadableStreamDefaultController is not in a state where it can be closed"); + + @readableStreamDefaultControllerClose(this); +} + +@getter +function desiredSize() +{ + "use strict"; + + if (!@isReadableStreamDefaultController(this)) + throw @makeGetterTypeError("ReadableStreamDefaultController", "desiredSize"); + + return @readableStreamDefaultControllerGetDesiredSize(this); +} + diff --git a/src/bun.js/builtins/js/ReadableStreamDefaultReader.js b/src/bun.js/builtins/js/ReadableStreamDefaultReader.js new file mode 100644 index 000000000..774c7161e --- /dev/null +++ b/src/bun.js/builtins/js/ReadableStreamDefaultReader.js @@ -0,0 +1,182 @@ +/* + * Copyright (C) 2015 Canon Inc. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY APPLE INC. ``AS IS'' AND ANY + * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR + * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL APPLE INC. OR + * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, + * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, + * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR + * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY + * OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +function initializeReadableStreamDefaultReader(stream) +{ + "use strict"; + + if (!@isReadableStream(stream)) + @throwTypeError("ReadableStreamDefaultReader needs a ReadableStream"); + if (@isReadableStreamLocked(stream)) + @throwTypeError("ReadableStream is locked"); + + @readableStreamReaderGenericInitialize(this, stream); + @putByIdDirectPrivate(this, "readRequests", @createFIFO()); + + return this; +} + +function cancel(reason) +{ + "use strict"; + + if (!@isReadableStreamDefaultReader(this)) + return @Promise.@reject(@makeThisTypeError("ReadableStreamDefaultReader", "cancel")); + + if (!@getByIdDirectPrivate(this, "ownerReadableStream")) + return @Promise.@reject(@makeTypeError("cancel() called on a reader owned by no readable stream")); + + return @readableStreamReaderGenericCancel(this, reason); +} + +function readMany() +{ + "use strict"; + + if (!@isReadableStreamDefaultReader(this)) + @throwTypeError("ReadableStreamDefaultReader.readMany() should not be called directly"); + + const stream = @getByIdDirectPrivate(this, "ownerReadableStream"); + if (!stream) + @throwTypeError("readMany() called on a reader owned by no readable stream"); + + const state = @getByIdDirectPrivate(stream, "state"); + @putByIdDirectPrivate(stream, "disturbed", true); + if (state === @streamClosed) + return {value: [], size: 0, done: true}; + else if (state === @streamErrored) { + throw @getByIdDirectPrivate(stream, "storedError"); + } + + + var controller = @getByIdDirectPrivate(stream, "readableStreamController"); + + const content = @getByIdDirectPrivate(controller, "queue").content; + var size = @getByIdDirectPrivate(controller, "queue").size; + var values = content.toArray(false); + var length = values.length; + + if (length > 0) { + + if (@isReadableByteStreamController(controller)) { + for (var i = 0; i < value.length; i++) { + const buf = value[i]; + if (!(@ArrayBuffer.@isView(buf) || buf instanceof @ArrayBuffer)) { + value[i] = new @Uint8Array(buf.buffer, buf.byteOffset, buf.byteLength); + } + } + } + + @resetQueue(@getByIdDirectPrivate(controller, "queue")); + + if (@getByIdDirectPrivate(controller, "closeRequested")) + @readableStreamClose(@getByIdDirectPrivate(controller, "controlledReadableStream")); + else if (@isReadableStreamDefaultController(controller)) + @readableStreamDefaultControllerCallPullIfNeeded(controller); + else if (@isReadableByteStreamController(controller)) + @readableByteStreamControllerCallPullIfNeeded(controller); + + return {value: values, size, done: false}; + } + + var onPullMany = (result) => { + if (result.done) { + return {value: [], size: 0, done: true}; + } + var controller = @getByIdDirectPrivate(stream, "readableStreamController"); + + var queue = @getByIdDirectPrivate(controller, "queue"); + var value = [result.value].concat(queue.content.toArray(false)); + + if (@isReadableByteStreamController(controller)) { + for (var i = 0; i < value.length; i++) { + const buf = value[i]; + if (!(@ArrayBuffer.@isView(buf) || buf instanceof @ArrayBuffer)) { + value[i] = new @Uint8Array(buf.buffer, buf.byteOffset, buf.byteLength); + } + } + } + + var size = queue.size; + @resetQueue(queue); + + if (@getByIdDirectPrivate(controller, "closeRequested")) + @readableStreamClose(@getByIdDirectPrivate(controller, "controlledReadableStream")); + else if (@isReadableStreamDefaultController(controller)) + @readableStreamDefaultControllerCallPullIfNeeded(controller); + else if (@isReadableByteStreamController(controller)) + @readableByteStreamControllerCallPullIfNeeded(controller); + + + + return {value: value, size: size, done: false}; + }; + + var pullResult = controller.@pull(controller); + if (pullResult && @isPromise(pullResult)) { + return pullResult.@then(onPullMany); + } + + return onPullMany(pullResult); +} + +function read() +{ + "use strict"; + + if (!@isReadableStreamDefaultReader(this)) + return @Promise.@reject(@makeThisTypeError("ReadableStreamDefaultReader", "read")); + if (!@getByIdDirectPrivate(this, "ownerReadableStream")) + return @Promise.@reject(@makeTypeError("read() called on a reader owned by no readable stream")); + + return @readableStreamDefaultReaderRead(this); +} + +function releaseLock() +{ + "use strict"; + + if (!@isReadableStreamDefaultReader(this)) + throw @makeThisTypeError("ReadableStreamDefaultReader", "releaseLock"); + + if (!@getByIdDirectPrivate(this, "ownerReadableStream")) + return; + + if (@getByIdDirectPrivate(this, "readRequests")?.isNotEmpty()) + @throwTypeError("There are still pending read requests, cannot release the lock"); + + @readableStreamReaderGenericRelease(this); +} + +@getter +function closed() +{ + "use strict"; + + if (!@isReadableStreamDefaultReader(this)) + return @Promise.@reject(@makeGetterTypeError("ReadableStreamDefaultReader", "closed")); + + return @getByIdDirectPrivate(this, "closedPromiseCapability").@promise; +} diff --git a/src/bun.js/builtins/js/ReadableStreamInternals.js b/src/bun.js/builtins/js/ReadableStreamInternals.js new file mode 100644 index 000000000..3e6590f31 --- /dev/null +++ b/src/bun.js/builtins/js/ReadableStreamInternals.js @@ -0,0 +1,1255 @@ +/* + * Copyright (C) 2015 Canon Inc. All rights reserved. + * Copyright (C) 2015 Igalia. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY APPLE INC. ``AS IS'' AND ANY + * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR + * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL APPLE INC. OR + * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, + * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, + * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR + * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY + * OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +// @internal + +function readableStreamReaderGenericInitialize(reader, stream) +{ + "use strict"; + + @putByIdDirectPrivate(reader, "ownerReadableStream", stream); + @putByIdDirectPrivate(stream, "reader", reader); + if (@getByIdDirectPrivate(stream, "state") === @streamReadable) + @putByIdDirectPrivate(reader, "closedPromiseCapability", @newPromiseCapability(@Promise)); + else if (@getByIdDirectPrivate(stream, "state") === @streamClosed) + @putByIdDirectPrivate(reader, "closedPromiseCapability", { @promise: @Promise.@resolve() }); + else { + @assert(@getByIdDirectPrivate(stream, "state") === @streamErrored); + @putByIdDirectPrivate(reader, "closedPromiseCapability", { @promise: @newHandledRejectedPromise(@getByIdDirectPrivate(stream, "storedError")) }); + } +} + +function privateInitializeReadableStreamDefaultController(stream, underlyingSource, size, highWaterMark) +{ + "use strict"; + + if (!@isReadableStream(stream)) + @throwTypeError("ReadableStreamDefaultController needs a ReadableStream"); + + // readableStreamController is initialized with null value. + if (@getByIdDirectPrivate(stream, "readableStreamController") !== null) + @throwTypeError("ReadableStream already has a controller"); + + + + @putByIdDirectPrivate(this, "controlledReadableStream", stream); + @putByIdDirectPrivate(this, "underlyingSource", underlyingSource); + @putByIdDirectPrivate(this, "queue", @newQueue()); + @putByIdDirectPrivate(this, "started", -1); + @putByIdDirectPrivate(this, "closeRequested", false); + @putByIdDirectPrivate(this, "pullAgain", false); + @putByIdDirectPrivate(this, "pulling", false); + @putByIdDirectPrivate(this, "strategy", @validateAndNormalizeQueuingStrategy(size, highWaterMark)); + + return this; +} + +function readableStreamDefaultControllerError(controller, error) +{ + "use strict"; + + const stream = @getByIdDirectPrivate(controller, "controlledReadableStream"); + if (@getByIdDirectPrivate(stream, "state") !== @streamReadable) + return; + @putByIdDirectPrivate(controller, "queue", @newQueue()); + + @readableStreamError(stream, error); +} + +function readableStreamPipeTo(stream, sink) +{ + "use strict"; + @assert(@isReadableStream(stream)); + + const reader = new @ReadableStreamDefaultReader(stream); + + @getByIdDirectPrivate(reader, "closedPromiseCapability").@promise.@then(() => { }, (e) => { sink.error(e); }); + + function doPipe() { + @readableStreamDefaultReaderRead(reader).@then(function(result) { + if (result.done) { + sink.close(); + return; + } + try { + sink.enqueue(result.value); + } catch (e) { + sink.error("ReadableStream chunk enqueueing in the sink failed"); + return; + } + doPipe(); + }, function(e) { + sink.error(e); + }); + } + doPipe(); +} + + + +function acquireReadableStreamDefaultReader(stream) +{ + "use strict"; + var start = @getByIdDirectPrivate(stream, "start"); + if (start) { + start.@call(stream); + } + + return new @ReadableStreamDefaultReader(stream); +} + +// https://streams.spec.whatwg.org/#set-up-readable-stream-default-controller, starting from step 6. +// The other part is implemented in privateInitializeReadableStreamDefaultController. +function setupReadableStreamDefaultController(stream, underlyingSource, size, highWaterMark, startMethod, pullMethod, cancelMethod) +{ + "use strict"; + + const controller = new @ReadableStreamDefaultController(stream, underlyingSource, size, highWaterMark, @isReadableStream); + + const pullAlgorithm = () => @promiseInvokeOrNoopMethod(underlyingSource, pullMethod, [controller]); + const cancelAlgorithm = (reason) => @promiseInvokeOrNoopMethod(underlyingSource, cancelMethod, [reason]); + + @putByIdDirectPrivate(controller, "pullAlgorithm", pullAlgorithm); + @putByIdDirectPrivate(controller, "cancelAlgorithm", cancelAlgorithm); + @putByIdDirectPrivate(controller, "pull", @readableStreamDefaultControllerPull); + @putByIdDirectPrivate(controller, "cancel", @readableStreamDefaultControllerCancel); + @putByIdDirectPrivate(stream, "readableStreamController", controller); + + @readableStreamDefaultControllerStart(controller); + +} + + +function createReadableStreamController(stream, underlyingSource, strategy) { + "use strict"; + + const type = underlyingSource.type; + const typeString = @toString(type); + + if (typeString === "bytes") { + // if (!@readableByteStreamAPIEnabled()) + // @throwTypeError("ReadableByteStreamController is not implemented"); + + if (strategy.highWaterMark === @undefined) + strategy.highWaterMark = 0; + if (strategy.size !== @undefined) + @throwRangeError("Strategy for a ReadableByteStreamController cannot have a size"); + + @putByIdDirectPrivate(stream, "readableStreamController", new @ReadableByteStreamController(stream, underlyingSource, strategy.highWaterMark, @isReadableStream)); + } else if (typeString === "direct") { + var highWaterMark = strategy?.highWaterMark; + @initializeArrayBufferStream.@call(stream, underlyingSource, highWaterMark); + } else if (type === @undefined) { + if (strategy.highWaterMark === @undefined) + strategy.highWaterMark = 1; + + @setupReadableStreamDefaultController(stream, underlyingSource, strategy.size, strategy.highWaterMark, underlyingSource.start, underlyingSource.pull, underlyingSource.cancel); + } else + @throwRangeError("Invalid type for underlying source"); + +} + +function readableStreamDefaultControllerStart(controller) { + "use strict"; + + + + if (@getByIdDirectPrivate(controller, "started") !== -1) + return; + + const underlyingSource = @getByIdDirectPrivate(controller, "underlyingSource"); + const startMethod = underlyingSource.start; + @putByIdDirectPrivate(controller, "started", 0); + + @promiseInvokeOrNoopMethodNoCatch(underlyingSource, startMethod, [controller]).@then(() => { + @putByIdDirectPrivate(controller, "started", 1); + @assert(!@getByIdDirectPrivate(controller, "pulling")); + @assert(!@getByIdDirectPrivate(controller, "pullAgain")); + @readableStreamDefaultControllerCallPullIfNeeded(controller); + }, (error) => { + @readableStreamDefaultControllerError(controller, error); + }); +} + + +// FIXME: Replace readableStreamPipeTo by below function. +// This method implements the latest https://streams.spec.whatwg.org/#readable-stream-pipe-to. +function readableStreamPipeToWritableStream(source, destination, preventClose, preventAbort, preventCancel, signal) +{ + "use strict"; + + @assert(@isReadableStream(source)); + @assert(@isWritableStream(destination)); + @assert(!@isReadableStreamLocked(source)); + @assert(!@isWritableStreamLocked(destination)); + @assert(signal === @undefined || @isAbortSignal(signal)); + + if (@getByIdDirectPrivate(source, "underlyingByteSource") !== @undefined) + return @Promise.@reject("Piping to a readable bytestream is not supported"); + + let pipeState = { source : source, destination : destination, preventAbort : preventAbort, preventCancel : preventCancel, preventClose : preventClose, signal : signal }; + + pipeState.reader = @acquireReadableStreamDefaultReader(source); + pipeState.writer = @acquireWritableStreamDefaultWriter(destination); + + @putByIdDirectPrivate(source, "disturbed", true); + + pipeState.finalized = false; + pipeState.shuttingDown = false; + pipeState.promiseCapability = @newPromiseCapability(@Promise); + pipeState.pendingReadPromiseCapability = @newPromiseCapability(@Promise); + pipeState.pendingReadPromiseCapability.@resolve.@call(); + pipeState.pendingWritePromise = @Promise.@resolve(); + + if (signal !== @undefined) { + const algorithm = () => { + if (pipeState.finalized) + return; + + const error = @makeDOMException("AbortError", "abort pipeTo from signal"); + + @pipeToShutdownWithAction(pipeState, () => { + const shouldAbortDestination = !pipeState.preventAbort && @getByIdDirectPrivate(pipeState.destination, "state") === "writable"; + const promiseDestination = shouldAbortDestination ? @writableStreamAbort(pipeState.destination, error) : @Promise.@resolve(); + + const shouldAbortSource = !pipeState.preventCancel && @getByIdDirectPrivate(pipeState.source, "state") === @streamReadable; + const promiseSource = shouldAbortSource ? @readableStreamCancel(pipeState.source, error) : @Promise.@resolve(); + + let promiseCapability = @newPromiseCapability(@Promise); + let shouldWait = true; + let handleResolvedPromise = () => { + if (shouldWait) { + shouldWait = false; + return; + } + promiseCapability.@resolve.@call(); + } + let handleRejectedPromise = (e) => { + promiseCapability.@reject.@call(@undefined, e); + } + promiseDestination.@then(handleResolvedPromise, handleRejectedPromise); + promiseSource.@then(handleResolvedPromise, handleRejectedPromise); + return promiseCapability.@promise; + }, error); + }; + if (@whenSignalAborted(signal, algorithm)) + return pipeState.promiseCapability.@promise; + } + + @pipeToErrorsMustBePropagatedForward(pipeState); + @pipeToErrorsMustBePropagatedBackward(pipeState); + @pipeToClosingMustBePropagatedForward(pipeState); + @pipeToClosingMustBePropagatedBackward(pipeState); + + @pipeToLoop(pipeState); + + return pipeState.promiseCapability.@promise; +} + +function pipeToLoop(pipeState) +{ + "use strict"; + if (pipeState.shuttingDown) + return; + + @pipeToDoReadWrite(pipeState).@then((result) => { + if (result) + @pipeToLoop(pipeState); + }); +} + +function pipeToDoReadWrite(pipeState) +{ + "use strict"; + @assert(!pipeState.shuttingDown); + + pipeState.pendingReadPromiseCapability = @newPromiseCapability(@Promise); + @getByIdDirectPrivate(pipeState.writer, "readyPromise").@promise.@then(() => { + if (pipeState.shuttingDown) { + pipeState.pendingReadPromiseCapability.@resolve.@call(@undefined, false); + return; + } + + @readableStreamDefaultReaderRead(pipeState.reader).@then((result) => { + const canWrite = !result.done && @getByIdDirectPrivate(pipeState.writer, "stream") !== @undefined; + pipeState.pendingReadPromiseCapability.@resolve.@call(@undefined, canWrite); + if (!canWrite) + return; + + pipeState.pendingWritePromise = @writableStreamDefaultWriterWrite(pipeState.writer, result.value); + }, (e) => { + pipeState.pendingReadPromiseCapability.@resolve.@call(@undefined, false); + }); + }, (e) => { + pipeState.pendingReadPromiseCapability.@resolve.@call(@undefined, false); + }); + return pipeState.pendingReadPromiseCapability.@promise; +} + +function pipeToErrorsMustBePropagatedForward(pipeState) +{ + "use strict"; + + const action = () => { + pipeState.pendingReadPromiseCapability.@resolve.@call(@undefined, false); + const error = @getByIdDirectPrivate(pipeState.source, "storedError"); + if (!pipeState.preventAbort) { + @pipeToShutdownWithAction(pipeState, () => @writableStreamAbort(pipeState.destination, error), error); + return; + } + @pipeToShutdown(pipeState, error); + }; + + if (@getByIdDirectPrivate(pipeState.source, "state") === @streamErrored) { + action(); + return; + } + + @getByIdDirectPrivate(pipeState.reader, "closedPromiseCapability").@promise.@then(@undefined, action); +} + +function pipeToErrorsMustBePropagatedBackward(pipeState) +{ + "use strict"; + const action = () => { + const error = @getByIdDirectPrivate(pipeState.destination, "storedError"); + if (!pipeState.preventCancel) { + @pipeToShutdownWithAction(pipeState, () => @readableStreamCancel(pipeState.source, error), error); + return; + } + @pipeToShutdown(pipeState, error); + }; + if (@getByIdDirectPrivate(pipeState.destination, "state") === "errored") { + action(); + return; + } + @getByIdDirectPrivate(pipeState.writer, "closedPromise").@promise.@then(@undefined, action); +} + +function pipeToClosingMustBePropagatedForward(pipeState) +{ + "use strict"; + const action = () => { + pipeState.pendingReadPromiseCapability.@resolve.@call(@undefined, false); + const error = @getByIdDirectPrivate(pipeState.source, "storedError"); + if (!pipeState.preventClose) { + @pipeToShutdownWithAction(pipeState, () => @writableStreamDefaultWriterCloseWithErrorPropagation(pipeState.writer)); + return; + } + @pipeToShutdown(pipeState); + }; + if (@getByIdDirectPrivate(pipeState.source, "state") === @streamClosed) { + action(); + return; + } + @getByIdDirectPrivate(pipeState.reader, "closedPromiseCapability").@promise.@then(action, @undefined); +} + +function pipeToClosingMustBePropagatedBackward(pipeState) +{ + "use strict"; + if (!@writableStreamCloseQueuedOrInFlight(pipeState.destination) && @getByIdDirectPrivate(pipeState.destination, "state") !== "closed") + return; + + // @assert no chunks have been read/written + + const error = @makeTypeError("closing is propagated backward"); + if (!pipeState.preventCancel) { + @pipeToShutdownWithAction(pipeState, () => @readableStreamCancel(pipeState.source, error), error); + return; + } + @pipeToShutdown(pipeState, error); +} + +function pipeToShutdownWithAction(pipeState, action) +{ + "use strict"; + + if (pipeState.shuttingDown) + return; + + pipeState.shuttingDown = true; + + const hasError = arguments.length > 2; + const error = arguments[2]; + const finalize = () => { + const promise = action(); + promise.@then(() => { + if (hasError) + @pipeToFinalize(pipeState, error); + else + @pipeToFinalize(pipeState); + }, (e) => { + @pipeToFinalize(pipeState, e); + }); + }; + + if (@getByIdDirectPrivate(pipeState.destination, "state") === "writable" && !@writableStreamCloseQueuedOrInFlight(pipeState.destination)) { + pipeState.pendingReadPromiseCapability.@promise.@then(() => { + pipeState.pendingWritePromise.@then(finalize, finalize); + }, (e) => @pipeToFinalize(pipeState, e)); + return; + } + + finalize(); +} + +function pipeToShutdown(pipeState) +{ + "use strict"; + + if (pipeState.shuttingDown) + return; + + pipeState.shuttingDown = true; + + const hasError = arguments.length > 1; + const error = arguments[1]; + const finalize = () => { + if (hasError) + @pipeToFinalize(pipeState, error); + else + @pipeToFinalize(pipeState); + }; + + if (@getByIdDirectPrivate(pipeState.destination, "state") === "writable" && !@writableStreamCloseQueuedOrInFlight(pipeState.destination)) { + pipeState.pendingReadPromiseCapability.@promise.@then(() => { + pipeState.pendingWritePromise.@then(finalize, finalize); + }, (e) => @pipeToFinalize(pipeState, e)); + return; + } + finalize(); +} + +function pipeToFinalize(pipeState) +{ + "use strict"; + + @writableStreamDefaultWriterRelease(pipeState.writer); + @readableStreamReaderGenericRelease(pipeState.reader); + + // Instead of removing the abort algorithm as per spec, we make it a no-op which is equivalent. + pipeState.finalized = true; + + if (arguments.length > 1) + pipeState.promiseCapability.@reject.@call(@undefined, arguments[1]); + else + pipeState.promiseCapability.@resolve.@call(); +} + +function readableStreamTee(stream, shouldClone) +{ + "use strict"; + + @assert(@isReadableStream(stream)); + @assert(typeof(shouldClone) === "boolean"); + + const reader = new @ReadableStreamDefaultReader(stream); + + const teeState = { + closedOrErrored: false, + canceled1: false, + canceled2: false, + reason1: @undefined, + reason2: @undefined, + }; + + teeState.cancelPromiseCapability = @newPromiseCapability(@Promise); + + const pullFunction = @readableStreamTeePullFunction(teeState, reader, shouldClone); + + const branch1Source = { }; + @putByIdDirectPrivate(branch1Source, "pull", pullFunction); + @putByIdDirectPrivate(branch1Source, "cancel", @readableStreamTeeBranch1CancelFunction(teeState, stream)); + + const branch2Source = { }; + @putByIdDirectPrivate(branch2Source, "pull", pullFunction); + @putByIdDirectPrivate(branch2Source, "cancel", @readableStreamTeeBranch2CancelFunction(teeState, stream)); + + const branch1 = new @ReadableStream(branch1Source); + const branch2 = new @ReadableStream(branch2Source); + + @getByIdDirectPrivate(reader, "closedPromiseCapability").@promise.@then(@undefined, function(e) { + if (teeState.closedOrErrored) + return; + @readableStreamDefaultControllerError(branch1.@readableStreamController, e); + @readableStreamDefaultControllerError(branch2.@readableStreamController, e); + teeState.closedOrErrored = true; + if (!teeState.canceled1 || !teeState.canceled2) + teeState.cancelPromiseCapability.@resolve.@call(); + }); + + // Additional fields compared to the spec, as they are needed within pull/cancel functions. + teeState.branch1 = branch1; + teeState.branch2 = branch2; + + return [branch1, branch2]; +} + +function readableStreamTeePullFunction(teeState, reader, shouldClone) +{ + "use strict"; + + return function() { + @Promise.prototype.@then.@call(@readableStreamDefaultReaderRead(reader), function(result) { + @assert(@isObject(result)); + @assert(typeof result.done === "boolean"); + if (result.done && !teeState.closedOrErrored) { + if (!teeState.canceled1) + @readableStreamDefaultControllerClose(teeState.branch1.@readableStreamController); + if (!teeState.canceled2) + @readableStreamDefaultControllerClose(teeState.branch2.@readableStreamController); + teeState.closedOrErrored = true; + if (!teeState.canceled1 || !teeState.canceled2) + teeState.cancelPromiseCapability.@resolve.@call(); + } + if (teeState.closedOrErrored) + return; + if (!teeState.canceled1) + @readableStreamDefaultControllerEnqueue(teeState.branch1.@readableStreamController, result.value); + if (!teeState.canceled2) + @readableStreamDefaultControllerEnqueue(teeState.branch2.@readableStreamController, shouldClone ? @structuredCloneForStream(result.value) : result.value); + }); + } +} + +function readableStreamTeeBranch1CancelFunction(teeState, stream) +{ + "use strict"; + + return function(r) { + teeState.canceled1 = true; + teeState.reason1 = r; + if (teeState.canceled2) { + @readableStreamCancel(stream, [teeState.reason1, teeState.reason2]).@then( + teeState.cancelPromiseCapability.@resolve, + teeState.cancelPromiseCapability.@reject); + } + return teeState.cancelPromiseCapability.@promise; + } +} + +function readableStreamTeeBranch2CancelFunction(teeState, stream) +{ + "use strict"; + + return function(r) { + teeState.canceled2 = true; + teeState.reason2 = r; + if (teeState.canceled1) { + @readableStreamCancel(stream, [teeState.reason1, teeState.reason2]).@then( + teeState.cancelPromiseCapability.@resolve, + teeState.cancelPromiseCapability.@reject); + } + return teeState.cancelPromiseCapability.@promise; + } +} + +function isReadableStream(stream) +{ + "use strict"; + + // Spec tells to return true only if stream has a readableStreamController internal slot. + // However, since it is a private slot, it cannot be checked using hasOwnProperty(). + // Therefore, readableStreamController is initialized with null value. + return @isObject(stream) && @getByIdDirectPrivate(stream, "readableStreamController") !== @undefined; +} + +function isReadableStreamDefaultReader(reader) +{ + "use strict"; + + // Spec tells to return true only if reader has a readRequests internal slot. + // However, since it is a private slot, it cannot be checked using hasOwnProperty(). + // Since readRequests is initialized with an empty array, the following test is ok. + return @isObject(reader) && !!@getByIdDirectPrivate(reader, "readRequests"); +} + +function isReadableStreamDefaultController(controller) +{ + "use strict"; + + // Spec tells to return true only if controller has an underlyingSource internal slot. + // However, since it is a private slot, it cannot be checked using hasOwnProperty(). + // underlyingSource is obtained in ReadableStream constructor: if undefined, it is set + // to an empty object. Therefore, following test is ok. + return @isObject(controller) && !!@getByIdDirectPrivate(controller, "underlyingSource"); +} + + +@globalPrivate +function assignDirectStream() { + "use strict"; + + var stream = this; +} + + +function handleDirectStreamError(e) { + "use strict"; + + var controller = this; + var sink = controller.@sink; + if (sink) { + @putByIdDirectPrivate(controller, "sink", @undefined); + try { + sink.close(e); + } catch (f) {} + } + + this.error = this.drain = this.write = this.close = this.end = @onReadableStreamDirectControllerClosed; + + if (typeof this.@underlyingSource.close === 'function') { + try { + this.@underlyingSource.close.@call(this.@underlyingSource, e); + } catch (e) { + } + } + + try { + var pend = controller._pendingRead; + if (pend) { + controller._pendingRead = @undefined; + @rejectPromise(pend, e); + } + } catch (f) {} + var stream = controller.@controlledReadableStream; + if (stream) @readableStreamError(stream, e); +} + +function handleDirectStreamErrorReject(e) { + @handleDirectStreamError.@call(this, e); + return @Promise.@reject(e); +} + +function onPullDirectStream(controller) +{ + + "use strict"; + + var stream = controller.@controlledReadableStream; + if (!stream || @getByIdDirectPrivate(stream, "state") !== @streamReadable) + return; + + // pull is in progress + // this is a recursive call + // ignore it + if (controller._deferClose === -1) { + return; + } + + + controller._deferClose = -1; + controller._deferDrain = -1; + var deferClose; + var deferDrain; + + // Direct streams allow @pull to be called multiple times, unlike the spec. + // Backpressure is handled by the destination, not by the underlying source. + // In this case, we rely on the heuristic that repeatedly draining in the same tick + // is bad for performance + // this code is only run when consuming a direct stream from JS + // without the HTTP server or anything else + try { + var result = controller.@underlyingSource.pull( + controller, + ); + + if (result && @isPromise(result)) { + if (controller._handleError === @undefined) { + controller._handleError = @handleDirectStreamErrorReject.bind(controller); + } + + @Promise.prototype.catch.@call(result, controller._handleError); + } + } catch(e) { + return @handleDirectStreamErrorReject.@call(controller, e); + } finally { + deferClose = controller._deferClose; + deferDrain = controller._deferDrain; + controller._deferDrain = controller._deferClose = 0; + } + + + var promiseToReturn; + + + if (controller._pendingRead === @undefined) { + controller._pendingRead = promiseToReturn = @newPromise(); + } else { + promiseToReturn = @readableStreamAddReadRequest(stream); + } + + + // they called close during @pull() + // we delay that + if (deferClose === 1) { + var reason = controller._deferCloseReason; + controller._deferCloseReason = @undefined; + @onCloseDirectStream.@call(controller, reason); + return promiseToReturn; + } + + // not done, but they called drain() + if (deferDrain === 1) { + @onDrainDirectStream.@call(controller); + } + + + return promiseToReturn; +} + +function noopDoneFunction() { + return @Promise.@resolve({value: @undefined, done: true}); +} + +function onReadableStreamDirectControllerClosed(reason) +{ + "use strict"; + @throwTypeError("ReadableStreamDirectController is now closed"); +} + +function onCloseDirectStream(reason) +{ + "use strict"; + var stream = this.@controlledReadableStream; + if (!stream || @getByIdDirectPrivate(stream, "state") !== @streamReadable) + return; + + if (this._deferClose !== 0) { + this._deferClose = 1; + this._deferCloseReason = reason; + return; + } + + @putByIdDirectPrivate(stream, "state", @streamClosing); + if (typeof this.@underlyingSource.close === 'function') { + try { + this.@underlyingSource.close.@call(this.@underlyingSource, reason); + } catch (e) { + + } + } + + var drained; + try { + drained = this.@sink.end(); + @putByIdDirectPrivate(this, "sink", @undefined); + } catch (e) { + if (this._pendingRead) { + var read = this._pendingRead; + this._pendingRead = @undefined; + @rejectPromise(read, e); + } + @readableStreamError(stream, e); + return; + } + + this.error = this.drain = this.write = this.close = this.end = @onReadableStreamDirectControllerClosed; + + var reader = @getByIdDirectPrivate(stream, "reader"); + + if (reader && @isReadableStreamDefaultReader(reader)) { + var _pendingRead = this._pendingRead; + if (_pendingRead && @isPromise(_pendingRead) && drained?.byteLength) { + this._pendingRead = @undefined; + @fulfillPromise(_pendingRead, {value: drained, done: false}); + @readableStreamClose(stream); + return; + } + } + + if (drained?.byteLength) { + var requests = @getByIdDirectPrivate(reader, "readRequests"); + if (requests?.isNotEmpty()) { + @readableStreamFulfillReadRequest(stream, drained, false); + @readableStreamClose(stream); + return; + } + + @putByIdDirectPrivate(stream, "state", @streamReadable); + this.@pull = () => { + var thisResult = @createFulfilledPromise({value: drained, done: false}); + drained = @undefined; + @readableStreamClose(stream); + stream = @undefined; + return thisResult; + }; + } else if (this._pendingRead) { + var read = this._pendingRead; + this._pendingRead = @undefined; + @putByIdDirectPrivate(this, "pull", @noopDoneFunction); + @fulfillPromise(read, {value: @undefined, done: true}); + } + + @readableStreamClose(stream); +} + +function onDrainDirectStream() +{ + "use strict"; + + var stream = this.@controlledReadableStream; + var reader = @getByIdDirectPrivate(stream, "reader"); + if (!reader || !@isReadableStreamDefaultReader(reader)) { + return; + } + + var _pendingRead = this._pendingRead; + this._pendingRead = @undefined; + if (_pendingRead && @isPromise(_pendingRead)) { + var drained = this.@sink.drain(); + if (drained?.byteLength) { + this._pendingRead = @getByIdDirectPrivate(stream, "readRequests")?.shift(); + @fulfillPromise(_pendingRead, {value: drained, done: false}); + } else { + this._pendingRead = _pendingRead; + } + } else if (@getByIdDirectPrivate(stream, "readRequests")?.isNotEmpty()) { + var drained = this.@sink.drain(); + if (drained?.byteLength) { + @readableStreamFulfillReadRequest(stream, drained, false); + } + } else if (this._deferDrain === -1) { + this._deferDrain = 1; + } + +} + +function initializeArrayBufferStream(underlyingSource, highWaterMark) +{ + "use strict"; + + // This is the fallback implementation for direct streams + // When we don't know what the destination type is + // We assume it is a Uint8Array. + + var opts = highWaterMark ? {highWaterMark, stream: true, asUint8Array: true} : {stream: true, asUint8Array: true}; + var sink = new globalThis.Bun.ArrayBufferSink(); + sink.start(opts); + + var controller = { + @underlyingSource: underlyingSource, + @pull: @onPullDirectStream, + @controlledReadableStream: this, + @sink: sink, + close: @onCloseDirectStream, + write: sink.write.bind(sink), + error: @handleDirectStreamError, + end: @onCloseDirectStream, + @close: @onCloseDirectStream, + drain: @onDrainDirectStream, + _pendingRead: @undefined, + _deferClose: 0, + _deferDrain: 0, + _deferCloseReason: @undefined, + _handleError: @undefined, + }; + + + @putByIdDirectPrivate(this, "readableStreamController", controller); + +} + +function readableStreamError(stream, error) +{ + "use strict"; + + @assert(@isReadableStream(stream)); + @assert(@getByIdDirectPrivate(stream, "state") === @streamReadable); + @putByIdDirectPrivate(stream, "state", @streamErrored); + @putByIdDirectPrivate(stream, "storedError", error); + + const reader = @getByIdDirectPrivate(stream, "reader"); + + if (!reader) + return; + + if (@isReadableStreamDefaultReader(reader)) { + const requests = @getByIdDirectPrivate(reader, "readRequests"); + @putByIdDirectPrivate(reader, "readRequests", @createFIFO()); + for (var request = requests.shift(); request; request = requests.shift()) + @rejectPromise(request, error); + } else { + @assert(@isReadableStreamBYOBReader(reader)); + const requests = @getByIdDirectPrivate(reader, "readIntoRequests"); + @putByIdDirectPrivate(reader, "readIntoRequests", @createFIFO()); + for (var request = requests.shift(); request; request = requests.shift()) + @rejectPromise(request, error); + } + + @getByIdDirectPrivate(reader, "closedPromiseCapability").@reject.@call(@undefined, error); + const promise = @getByIdDirectPrivate(reader, "closedPromiseCapability").@promise; + @markPromiseAsHandled(promise); +} + +function readableStreamDefaultControllerShouldCallPull(controller) +{ + "use strict"; + + const stream = @getByIdDirectPrivate(controller, "controlledReadableStream"); + + if (!@readableStreamDefaultControllerCanCloseOrEnqueue(controller)) + return false; + if (!(@getByIdDirectPrivate(controller, "started") === 1)) + return false; + if ((!@isReadableStreamLocked(stream) || !@getByIdDirectPrivate(@getByIdDirectPrivate(stream, "reader"), "readRequests")?.isNotEmpty()) && @readableStreamDefaultControllerGetDesiredSize(controller) <= 0) + return false; + const desiredSize = @readableStreamDefaultControllerGetDesiredSize(controller); + @assert(desiredSize !== null); + return desiredSize > 0; +} + +function readableStreamDefaultControllerCallPullIfNeeded(controller) +{ + "use strict"; + + // FIXME: use @readableStreamDefaultControllerShouldCallPull + const stream = @getByIdDirectPrivate(controller, "controlledReadableStream"); + + if (!@readableStreamDefaultControllerCanCloseOrEnqueue(controller)) + return; + if (!(@getByIdDirectPrivate(controller, "started") === 1)) + return; + if ((!@isReadableStreamLocked(stream) || !@getByIdDirectPrivate(@getByIdDirectPrivate(stream, "reader"), "readRequests")?.isNotEmpty()) && @readableStreamDefaultControllerGetDesiredSize(controller) <= 0) + return; + + if (@getByIdDirectPrivate(controller, "pulling")) { + @putByIdDirectPrivate(controller, "pullAgain", true); + return; + } + + + @assert(!@getByIdDirectPrivate(controller, "pullAgain")); + @putByIdDirectPrivate(controller, "pulling", true); + + @getByIdDirectPrivate(controller, "pullAlgorithm").@call(@undefined).@then(function() { + @putByIdDirectPrivate(controller, "pulling", false); + if (@getByIdDirectPrivate(controller, "pullAgain")) { + @putByIdDirectPrivate(controller, "pullAgain", false); + + @readableStreamDefaultControllerCallPullIfNeeded(controller); + } + }, function(error) { + @readableStreamDefaultControllerError(controller, error); + }); +} + +function isReadableStreamLocked(stream) +{ + "use strict"; + + @assert(@isReadableStream(stream)); + return !!@getByIdDirectPrivate(stream, "reader"); +} + +function readableStreamDefaultControllerGetDesiredSize(controller) +{ + "use strict"; + + const stream = @getByIdDirectPrivate(controller, "controlledReadableStream"); + const state = @getByIdDirectPrivate(stream, "state"); + + if (state === @streamErrored) + return null; + if (state === @streamClosed) + return 0; + + return @getByIdDirectPrivate(controller, "strategy").highWaterMark - @getByIdDirectPrivate(controller, "queue").size; +} + + +function readableStreamReaderGenericCancel(reader, reason) +{ + "use strict"; + + const stream = @getByIdDirectPrivate(reader, "ownerReadableStream"); + @assert(!!stream); + return @readableStreamCancel(stream, reason); +} + +function readableStreamCancel(stream, reason) +{ + "use strict"; + + @putByIdDirectPrivate(stream, "disturbed", true); + const state = @getByIdDirectPrivate(stream, "state"); + if (state === @streamClosed) + return @Promise.@resolve(); + if (state === @streamErrored) + return @Promise.@reject(@getByIdDirectPrivate(stream, "storedError")); + @readableStreamClose(stream); + + var controller = @getByIdDirectPrivate(stream, "readableStreamController"); + return controller.@cancel(controller, reason).@then(function() { }); +} + +function readableStreamDefaultControllerCancel(controller, reason) +{ + "use strict"; + + @putByIdDirectPrivate(controller, "queue", @newQueue()); + return @getByIdDirectPrivate(controller, "cancelAlgorithm").@call(@undefined, reason); +} + +function readableStreamDefaultControllerPull(controller) +{ + "use strict"; + + var queue = @getByIdDirectPrivate(controller, "queue"); + if (queue.content.isNotEmpty()) { + const chunk = @dequeueValue(queue); + if (@getByIdDirectPrivate(controller, "closeRequested") && queue.content.isEmpty()) + @readableStreamClose(@getByIdDirectPrivate(controller, "controlledReadableStream")); + else + @readableStreamDefaultControllerCallPullIfNeeded(controller); + + return @createFulfilledPromise({ value: chunk, done: false }); + } + const pendingPromise = @readableStreamAddReadRequest(@getByIdDirectPrivate(controller, "controlledReadableStream")); + @readableStreamDefaultControllerCallPullIfNeeded(controller); + return pendingPromise; +} + +function readableStreamDefaultControllerClose(controller) +{ + "use strict"; + + @assert(@readableStreamDefaultControllerCanCloseOrEnqueue(controller)); + @putByIdDirectPrivate(controller, "closeRequested", true); + if (@getByIdDirectPrivate(controller, "queue")?.content?.isEmpty()) + @readableStreamClose(@getByIdDirectPrivate(controller, "controlledReadableStream")); +} + +function readableStreamClose(stream) +{ + "use strict"; + + @assert(@getByIdDirectPrivate(stream, "state") === @streamReadable); + @putByIdDirectPrivate(stream, "state", @streamClosed); + if (!@getByIdDirectPrivate(stream, "reader")) + return; + + if (@isReadableStreamDefaultReader(@getByIdDirectPrivate(stream, "reader"))) { + const requests = @getByIdDirectPrivate(@getByIdDirectPrivate(stream, "reader"), "readRequests"); + if (requests.isNotEmpty()) { + @putByIdDirectPrivate(@getByIdDirectPrivate(stream, "reader"), "readRequests", @createFIFO()); + + for (var request = requests.shift(); request; request = requests.shift()) + @fulfillPromise(request, { value: @undefined, done: true }); + } + } + + @getByIdDirectPrivate(@getByIdDirectPrivate(stream, "reader"), "closedPromiseCapability").@resolve.@call(); +} + +function readableStreamFulfillReadRequest(stream, chunk, done) +{ + "use strict"; + const readRequest = @getByIdDirectPrivate(@getByIdDirectPrivate(stream, "reader"), "readRequests").shift(); + @fulfillPromise(readRequest, { value: chunk, done: done }); +} + +function readableStreamDefaultControllerEnqueue(controller, chunk) +{ + "use strict"; + + const stream = @getByIdDirectPrivate(controller, "controlledReadableStream"); + // this is checked by callers + @assert(@readableStreamDefaultControllerCanCloseOrEnqueue(controller)); + + if (@isReadableStreamLocked(stream) && @getByIdDirectPrivate(@getByIdDirectPrivate(stream, "reader"), "readRequests")?.isNotEmpty()) { + @readableStreamFulfillReadRequest(stream, chunk, false); + @readableStreamDefaultControllerCallPullIfNeeded(controller); + return; + } + + try { + let chunkSize = 1; + if (@getByIdDirectPrivate(controller, "strategy").size !== @undefined) + chunkSize = @getByIdDirectPrivate(controller, "strategy").size(chunk); + @enqueueValueWithSize(@getByIdDirectPrivate(controller, "queue"), chunk, chunkSize); + } + catch(error) { + @readableStreamDefaultControllerError(controller, error); + throw error; + } + @readableStreamDefaultControllerCallPullIfNeeded(controller); +} + +function readableStreamDefaultReaderRead(reader) +{ + "use strict"; + + const stream = @getByIdDirectPrivate(reader, "ownerReadableStream"); + @assert(!!stream); + const state = @getByIdDirectPrivate(stream, "state"); + + @putByIdDirectPrivate(stream, "disturbed", true); + if (state === @streamClosed) + return @createFulfilledPromise({ value: @undefined, done: true }); + if (state === @streamErrored) + return @Promise.@reject(@getByIdDirectPrivate(stream, "storedError")); + @assert(state === @streamReadable); + + return @getByIdDirectPrivate(stream, "readableStreamController").@pull(@getByIdDirectPrivate(stream, "readableStreamController")); +} + +function readableStreamAddReadRequest(stream) +{ + "use strict"; + + @assert(@isReadableStreamDefaultReader(@getByIdDirectPrivate(stream, "reader"))); + @assert(@getByIdDirectPrivate(stream, "state") == @streamReadable); + + const readRequest = @newPromise(); + + @getByIdDirectPrivate(@getByIdDirectPrivate(stream, "reader"), "readRequests").push(readRequest); + + return readRequest; +} + +function isReadableStreamDisturbed(stream) +{ + "use strict"; + + @assert(@isReadableStream(stream)); + return @getByIdDirectPrivate(stream, "disturbed"); +} + +function readableStreamReaderGenericRelease(reader) +{ + "use strict"; + + @assert(!!@getByIdDirectPrivate(reader, "ownerReadableStream")); + @assert(@getByIdDirectPrivate(@getByIdDirectPrivate(reader, "ownerReadableStream"), "reader") === reader); + + if (@getByIdDirectPrivate(@getByIdDirectPrivate(reader, "ownerReadableStream"), "state") === @streamReadable) + @getByIdDirectPrivate(reader, "closedPromiseCapability").@reject.@call(@undefined, @makeTypeError("releasing lock of reader whose stream is still in readable state")); + else + @putByIdDirectPrivate(reader, "closedPromiseCapability", { @promise: @newHandledRejectedPromise(@makeTypeError("reader released lock")) }); + + const promise = @getByIdDirectPrivate(reader, "closedPromiseCapability").@promise; + @markPromiseAsHandled(promise); + @putByIdDirectPrivate(@getByIdDirectPrivate(reader, "ownerReadableStream"), "reader", @undefined); + @putByIdDirectPrivate(reader, "ownerReadableStream", @undefined); +} + +function readableStreamDefaultControllerCanCloseOrEnqueue(controller) +{ + "use strict"; + + return !@getByIdDirectPrivate(controller, "closeRequested") && @getByIdDirectPrivate(@getByIdDirectPrivate(controller, "controlledReadableStream"), "state") === @streamReadable; +} + + +function lazyLoadStream(stream, autoAllocateChunkSize) { + "use strict"; + + var nativeType = @getByIdDirectPrivate(stream, "bunNativeType"); + var nativePtr = @getByIdDirectPrivate(stream, "bunNativePtr"); + var cached = @lazyStreamPrototypeMap; + var Prototype = cached.@get(nativeType); + if (Prototype === @undefined) { + var [pull, start, cancel, setClose, deinit] = @lazyLoad(nativeType); + var closer = [false]; + var handleResult; + function handleNativeReadableStreamPromiseResult(val) { + "use strict"; + var {c, v} = this; + this.c = @undefined; + this.v = @undefined; + handleResult(val, c, v); + } + + handleResult = function handleResult(result, controller, view) { + "use strict"; + + if (result && @isPromise(result)) { + return result.then(handleNativeReadableStreamPromiseResult.bind({c: controller, v: view}), (err) => controller.error(err)); + } else if (result !== false) { + if (view && view.byteLength === result) { + controller.byobRequest.respondWithNewView(view); + } else { + controller.byobRequest.respond(result); + } + } + + if (closer[0] || result === false) { + @enqueueJob(() => controller.close()); + closer[0] = false; + } + }; + + Prototype = class NativeReadableStreamSource { + constructor(tag, autoAllocateChunkSize) { + this.pull = this.pull_.bind(tag); + this.cancel = this.cancel_.bind(tag); + this.autoAllocateChunkSize = autoAllocateChunkSize; + } + + pull; + cancel; + + type = "bytes"; + autoAllocateChunkSize = 0; + + static startSync = start; + + pull_(controller) { + closer[0] = false; + var result; + + const view = controller.byobRequest.view; + try { + result = pull(this, view, closer); + } catch(err) { + return controller.error(err); + } + + return handleResult(result, controller, view); + } + + cancel_(reason) { + cancel(this, reason); + } + static deinit = deinit; + static registry = new FinalizationRegistry(deinit); + } + cached.@set(nativeType, Prototype); + } + + const chunkSize = Prototype.startSync(nativePtr, autoAllocateChunkSize); + + // empty file, no need for native back-and-forth on this + if (chunkSize === 0) { + @readableStreamClose(stream); + return null; + } + var instance = new Prototype(nativePtr, chunkSize); + Prototype.registry.register(instance, nativePtr); + return instance; +}
\ No newline at end of file diff --git a/src/bun.js/builtins/js/StreamInternals.js b/src/bun.js/builtins/js/StreamInternals.js new file mode 100644 index 000000000..c2ca3f5b5 --- /dev/null +++ b/src/bun.js/builtins/js/StreamInternals.js @@ -0,0 +1,317 @@ +/* + * Copyright (C) 2015 Canon Inc. + * Copyright (C) 2015 Igalia. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY APPLE INC. ``AS IS'' AND ANY + * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR + * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL APPLE INC. OR + * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, + * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, + * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR + * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY + * OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +// @internal + +function markPromiseAsHandled(promise) +{ + "use strict"; + + @assert(@isPromise(promise)); + @putPromiseInternalField(promise, @promiseFieldFlags, @getPromiseInternalField(promise, @promiseFieldFlags) | @promiseFlagsIsHandled); +} + +function shieldingPromiseResolve(result) +{ + "use strict"; + + const promise = @Promise.@resolve(result); + if (promise.@then === @undefined) + promise.@then = @Promise.prototype.@then; + return promise; +} + +function promiseInvokeOrNoopMethodNoCatch(object, method, args) +{ + "use strict"; + + if (method === @undefined) + return @Promise.@resolve(); + return @shieldingPromiseResolve(method.@apply(object, args)); +} + +function promiseInvokeOrNoopNoCatch(object, key, args) +{ + "use strict"; + + return @promiseInvokeOrNoopMethodNoCatch(object, object[key], args); +} + +function promiseInvokeOrNoopMethod(object, method, args) +{ + "use strict"; + + try { + return @promiseInvokeOrNoopMethodNoCatch(object, method, args); + } + catch(error) { + return @Promise.@reject(error); + } +} + +function promiseInvokeOrNoop(object, key, args) +{ + "use strict"; + + try { + return @promiseInvokeOrNoopNoCatch(object, key, args); + } + catch(error) { + return @Promise.@reject(error); + } +} + +function promiseInvokeOrFallbackOrNoop(object, key1, args1, key2, args2) +{ + "use strict"; + + try { + const method = object[key1]; + if (method === @undefined) + return @promiseInvokeOrNoopNoCatch(object, key2, args2); + return @shieldingPromiseResolve(method.@apply(object, args1)); + } + catch(error) { + return @Promise.@reject(error); + } +} + +function validateAndNormalizeQueuingStrategy(size, highWaterMark) +{ + "use strict"; + + if (size !== @undefined && typeof size !== "function") + @throwTypeError("size parameter must be a function"); + + const newHighWaterMark = @toNumber(highWaterMark); + + if (@isNaN(newHighWaterMark) || newHighWaterMark < 0) + @throwRangeError("highWaterMark value is negative or not a number"); + + return { size: size, highWaterMark: newHighWaterMark }; +} + +@globalPrivate +function createFIFO() { + "use strict"; + class Denqueue { + constructor() { + this._head = 0; + this._tail = 0; + // this._capacity = 0; + this._capacityMask = 0x3; + this._list = @newArrayWithSize(4); + } + + size() { + if (this._head === this._tail) return 0; + if (this._head < this._tail) return this._tail - this._head; + else return this._capacityMask + 1 - (this._head - this._tail); + } + + isEmpty() { + return this.size() == 0; + } + + isNotEmpty() { + return this.size() > 0; + } + + shift() { + var head = this._head; + if (head === this._tail) return @undefined; + var item = this._list[head]; + @putByValDirect(this._list, head, @undefined); + this._head = (head + 1) & this._capacityMask; + if (head < 2 && this._tail > 10000 && this._tail <= this._list.length >>> 2) this._shrinkArray(); + return item; + } + + peek() { + if (this._head === this._tail) return @undefined; + return this._list[this._head]; + } + + push(item) { + var tail = this._tail; + @putByValDirect(this._list, tail, item); + this._tail = (tail + 1) & this._capacityMask; + if (this._tail === this._head) { + this._growArray(); + } + // if (this._capacity && this.size() > this._capacity) { + // this.shift(); + // } + } + + toArray(fullCopy) { + var list = this._list; + var len = @toLength(list.length); + + if (fullCopy || this._head > this._tail) { + var _head = @toLength(this._head); + var _tail = @toLength(this._tail); + var total = @toLength((len - _head) + _tail); + var array = @newArrayWithSize(total); + var j = 0; + for (var i = _head; i < len; i++) @putByValDirect(array, j++, list[i]); + for (var i = 0; i < _tail; i++) @putByValDirect(array, j++, list[i]); + return array; + } else { + return @Array.prototype.slice.@call(list, this._head, this._tail); + } + } + + clear() { + this._head = 0; + this._tail = 0; + this._list.fill(undefined); + } + + _growArray() { + if (this._head) { + // copy existing data, head to end, then beginning to tail. + this._list = this.toArray(true); + this._head = 0; + } + + // head is at 0 and array is now full, safe to extend + this._tail = @toLength(this._list.length); + + this._list.length <<= 1; + this._capacityMask = (this._capacityMask << 1) | 1; + } + + shrinkArray() { + this._list.length >>>= 1; + this._capacityMask >>>= 1; + } + } + + + return new Denqueue(); +} + +function newQueue() +{ + "use strict"; + + return { content: @createFIFO(), size: 0 }; +} + +function dequeueValue(queue) +{ + "use strict"; + + const record = queue.content.shift(); + queue.size -= record.size; + // As described by spec, below case may occur due to rounding errors. + if (queue.size < 0) + queue.size = 0; + return record.value; +} + +function enqueueValueWithSize(queue, value, size) +{ + "use strict"; + + size = @toNumber(size); + if (!@isFinite(size) || size < 0) + @throwRangeError("size has an incorrect value"); + + queue.content.push({ value, size }); + queue.size += size; +} + +function peekQueueValue(queue) +{ + "use strict"; + + @assert(queue.content.isNotEmpty()); + + return queue.peek().value; +} + +function resetQueue(queue) +{ + "use strict"; + + @assert("content" in queue); + @assert("size" in queue); + queue.content.clear(); + queue.size = 0; +} + +function extractSizeAlgorithm(strategy) +{ + if (!("size" in strategy)) + return () => 1; + const sizeAlgorithm = strategy["size"]; + if (typeof sizeAlgorithm !== "function") + @throwTypeError("strategy.size must be a function"); + + return (chunk) => { return sizeAlgorithm(chunk); }; +} + +function extractHighWaterMark(strategy, defaultHWM) +{ + if (!("highWaterMark" in strategy)) + return defaultHWM; + const highWaterMark = strategy["highWaterMark"]; + if (@isNaN(highWaterMark) || highWaterMark < 0) + @throwRangeError("highWaterMark value is negative or not a number"); + + return @toNumber(highWaterMark); +} + +function extractHighWaterMarkFromQueuingStrategyInit(init) +{ + "use strict"; + + if (!@isObject(init)) + @throwTypeError("QueuingStrategyInit argument must be an object."); + const {highWaterMark} = init; + if (highWaterMark === @undefined) + @throwTypeError("QueuingStrategyInit.highWaterMark member is required."); + + return @toNumber(highWaterMark); +} + +function createFulfilledPromise(value) +{ + const promise = @newPromise(); + @fulfillPromise(promise, value); + return promise; +} + +function toDictionary(value, defaultValue, errorMessage) +{ + if (value === @undefined || value === null) + return defaultValue; + if (!@isObject(value)) + @throwTypeError(errorMessage); + return value; +} diff --git a/src/bun.js/builtins/js/TransformStream.js b/src/bun.js/builtins/js/TransformStream.js new file mode 100644 index 000000000..8d82d87d8 --- /dev/null +++ b/src/bun.js/builtins/js/TransformStream.js @@ -0,0 +1,116 @@ +/* + * Copyright (C) 2020 Apple Inc. All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY APPLE INC. AND ITS CONTRIBUTORS ``AS IS'' + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, + * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR + * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL APPLE INC. OR ITS CONTRIBUTORS + * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF + * THE POSSIBILITY OF SUCH DAMAGE. + */ + +function initializeTransformStream() +{ + "use strict"; + + let transformer = arguments[0]; + + // This is the path for CreateTransformStream. + if (@isObject(transformer) && @getByIdDirectPrivate(transformer, "TransformStream")) + return this; + + let writableStrategy = arguments[1]; + let readableStrategy = arguments[2]; + + if (transformer === @undefined) + transformer = null; + + if (readableStrategy === @undefined) + readableStrategy = { }; + + if (writableStrategy === @undefined) + writableStrategy = { }; + + let transformerDict = { }; + if (transformer !== null) { + if ("start" in transformer) { + transformerDict["start"] = transformer["start"]; + if (typeof transformerDict["start"] !== "function") + @throwTypeError("transformer.start should be a function"); + } + if ("transform" in transformer) { + transformerDict["transform"] = transformer["transform"]; + if (typeof transformerDict["transform"] !== "function") + @throwTypeError("transformer.transform should be a function"); + } + if ("flush" in transformer) { + transformerDict["flush"] = transformer["flush"]; + if (typeof transformerDict["flush"] !== "function") + @throwTypeError("transformer.flush should be a function"); + } + + if ("readableType" in transformer) + @throwRangeError("TransformStream transformer has a readableType"); + if ("writableType" in transformer) + @throwRangeError("TransformStream transformer has a writableType"); + } + + const readableHighWaterMark = @extractHighWaterMark(readableStrategy, 0); + const readableSizeAlgorithm = @extractSizeAlgorithm(readableStrategy); + + const writableHighWaterMark = @extractHighWaterMark(writableStrategy, 1); + const writableSizeAlgorithm = @extractSizeAlgorithm(writableStrategy); + + const startPromiseCapability = @newPromiseCapability(@Promise); + @initializeTransformStream(this, startPromiseCapability.@promise, writableHighWaterMark, writableSizeAlgorithm, readableHighWaterMark, readableSizeAlgorithm); + @setUpTransformStreamDefaultControllerFromTransformer(this, transformer, transformerDict); + + if ("start" in transformerDict) { + const controller = @getByIdDirectPrivate(this, "controller"); + const startAlgorithm = () => @promiseInvokeOrNoopMethodNoCatch(transformer, transformerDict["start"], [controller]); + startAlgorithm().@then(() => { + // FIXME: We probably need to resolve start promise with the result of the start algorithm. + startPromiseCapability.@resolve.@call(); + }, (error) => { + startPromiseCapability.@reject.@call(@undefined, error); + }); + } else + startPromiseCapability.@resolve.@call(); + + return this; +} + +@getter +function readable() +{ + "use strict"; + + if (!@isTransformStream(this)) + throw @makeThisTypeError("TransformStream", "readable"); + + return @getByIdDirectPrivate(this, "readable"); +} + +function writable() +{ + "use strict"; + + if (!@isTransformStream(this)) + throw @makeThisTypeError("TransformStream", "writable"); + + return @getByIdDirectPrivate(this, "writable"); +} diff --git a/src/bun.js/builtins/js/TransformStreamDefaultController.js b/src/bun.js/builtins/js/TransformStreamDefaultController.js new file mode 100644 index 000000000..5ed7d0dfa --- /dev/null +++ b/src/bun.js/builtins/js/TransformStreamDefaultController.js @@ -0,0 +1,76 @@ +/* + * Copyright (C) 2020 Apple Inc. All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY APPLE INC. AND ITS CONTRIBUTORS ``AS IS'' + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, + * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR + * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL APPLE INC. OR ITS CONTRIBUTORS + * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF + * THE POSSIBILITY OF SUCH DAMAGE. + */ + +function initializeTransformStreamDefaultController() +{ + "use strict"; + + return this; +} + +@getter +function desiredSize() +{ + "use strict"; + + if (!@isTransformStreamDefaultController(this)) + throw @makeThisTypeError("TransformStreamDefaultController", "enqueue"); + + const stream = @getByIdDirectPrivate(this, "stream"); + const readable = @getByIdDirectPrivate(stream, "readable"); + const readableController = @getByIdDirectPrivate(readable, "readableStreamController"); + + return @readableStreamDefaultControllerGetDesiredSize(readableController); +} + +function enqueue(chunk) +{ + "use strict"; + + if (!@isTransformStreamDefaultController(this)) + throw @makeThisTypeError("TransformStreamDefaultController", "enqueue"); + + @transformStreamDefaultControllerEnqueue(this, chunk); +} + +function error(e) +{ + "use strict"; + + if (!@isTransformStreamDefaultController(this)) + throw @makeThisTypeError("TransformStreamDefaultController", "error"); + + @transformStreamDefaultControllerError(this, e); +} + +function terminate() +{ + "use strict"; + + if (!@isTransformStreamDefaultController(this)) + throw @makeThisTypeError("TransformStreamDefaultController", "terminate"); + + @transformStreamDefaultControllerTerminate(this); +} diff --git a/src/bun.js/builtins/js/TransformStreamInternals.js b/src/bun.js/builtins/js/TransformStreamInternals.js new file mode 100644 index 000000000..4263e3991 --- /dev/null +++ b/src/bun.js/builtins/js/TransformStreamInternals.js @@ -0,0 +1,350 @@ +/* + * Copyright (C) 2020 Apple Inc. All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY APPLE INC. AND ITS CONTRIBUTORS ``AS IS'' + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, + * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR + * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL APPLE INC. OR ITS CONTRIBUTORS + * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF + * THE POSSIBILITY OF SUCH DAMAGE. + */ + +// @internal + +function isTransformStream(stream) +{ + "use strict"; + + return @isObject(stream) && !!@getByIdDirectPrivate(stream, "readable"); +} + +function isTransformStreamDefaultController(controller) +{ + "use strict"; + + return @isObject(controller) && !!@getByIdDirectPrivate(controller, "transformAlgorithm"); +} + +function createTransformStream(startAlgorithm, transformAlgorithm, flushAlgorithm, writableHighWaterMark, writableSizeAlgorithm, readableHighWaterMark, readableSizeAlgorithm) +{ + if (writableHighWaterMark === @undefined) + writableHighWaterMark = 1; + if (writableSizeAlgorithm === @undefined) + writableSizeAlgorithm = () => 1; + if (readableHighWaterMark === @undefined) + readableHighWaterMark = 0; + if (readableSizeAlgorithm === @undefined) + readableSizeAlgorithm = () => 1; + @assert(writableHighWaterMark >= 0); + @assert(readableHighWaterMark >= 0); + + const transform = {}; + @putByIdDirectPrivate(transform, "TransformStream", true); + + const stream = new @TransformStream(transform); + const startPromiseCapability = @newPromiseCapability(@Promise); + @initializeTransformStream(stream, startPromiseCapability.@promise, writableHighWaterMark, writableSizeAlgorithm, readableHighWaterMark, readableSizeAlgorithm); + + const controller = new @TransformStreamDefaultController(); + @setUpTransformStreamDefaultController(stream, controller, transformAlgorithm, flushAlgorithm); + + startAlgorithm().@then(() => { + startPromiseCapability.@resolve.@call(); + }, (error) => { + startPromiseCapability.@reject.@call(@undefined, error); + }); + + return stream; +} + +function initializeTransformStream(stream, startPromise, writableHighWaterMark, writableSizeAlgorithm, readableHighWaterMark, readableSizeAlgorithm) +{ + "use strict"; + + const startAlgorithm = () => { return startPromise; }; + const writeAlgorithm = (chunk) => { return @transformStreamDefaultSinkWriteAlgorithm(stream, chunk); } + const abortAlgorithm = (reason) => { return @transformStreamDefaultSinkAbortAlgorithm(stream, reason); } + const closeAlgorithm = () => { return @transformStreamDefaultSinkCloseAlgorithm(stream); } + const writable = @createWritableStream(startAlgorithm, writeAlgorithm, closeAlgorithm, abortAlgorithm, writableHighWaterMark, writableSizeAlgorithm); + + const pullAlgorithm = () => { return @transformStreamDefaultSourcePullAlgorithm(stream); }; + const cancelAlgorithm = (reason) => { + @transformStreamErrorWritableAndUnblockWrite(stream, reason); + return @Promise.@resolve(); + }; + const underlyingSource = { }; + @putByIdDirectPrivate(underlyingSource, "start", startAlgorithm); + @putByIdDirectPrivate(underlyingSource, "pull", pullAlgorithm); + @putByIdDirectPrivate(underlyingSource, "cancel", cancelAlgorithm); + const options = { }; + @putByIdDirectPrivate(options, "size", readableSizeAlgorithm); + @putByIdDirectPrivate(options, "highWaterMark", readableHighWaterMark); + const readable = new @ReadableStream(underlyingSource, options); + + // The writable to expose to JS through writable getter. + @putByIdDirectPrivate(stream, "writable", writable); + // The writable to use for the actual transform algorithms. + @putByIdDirectPrivate(stream, "internalWritable", @getInternalWritableStream(writable)); + + @putByIdDirectPrivate(stream, "readable", readable); + @putByIdDirectPrivate(stream, "backpressure", @undefined); + @putByIdDirectPrivate(stream, "backpressureChangePromise", @undefined); + + @transformStreamSetBackpressure(stream, true); + @putByIdDirectPrivate(stream, "controller", @undefined); +} + +function transformStreamError(stream, e) +{ + "use strict"; + + const readable = @getByIdDirectPrivate(stream, "readable"); + const readableController = @getByIdDirectPrivate(readable, "readableStreamController"); + @readableStreamDefaultControllerError(readableController, e); + + @transformStreamErrorWritableAndUnblockWrite(stream, e); +} + +function transformStreamErrorWritableAndUnblockWrite(stream, e) +{ + "use strict"; + + @transformStreamDefaultControllerClearAlgorithms(@getByIdDirectPrivate(stream, "controller")); + + const writable = @getByIdDirectPrivate(stream, "internalWritable"); + @writableStreamDefaultControllerErrorIfNeeded(@getByIdDirectPrivate(writable, "controller"), e); + + if (@getByIdDirectPrivate(stream, "backpressure")) + @transformStreamSetBackpressure(stream, false); +} + +function transformStreamSetBackpressure(stream, backpressure) +{ + "use strict"; + + @assert(@getByIdDirectPrivate(stream, "backpressure") !== backpressure); + + const backpressureChangePromise = @getByIdDirectPrivate(stream, "backpressureChangePromise"); + if (backpressureChangePromise !== @undefined) + backpressureChangePromise.@resolve.@call(); + + @putByIdDirectPrivate(stream, "backpressureChangePromise", @newPromiseCapability(@Promise)); + @putByIdDirectPrivate(stream, "backpressure", backpressure); +} + +function setUpTransformStreamDefaultController(stream, controller, transformAlgorithm, flushAlgorithm) +{ + "use strict"; + + @assert(@isTransformStream(stream)); + @assert(@getByIdDirectPrivate(stream, "controller") === @undefined); + + @putByIdDirectPrivate(controller, "stream", stream); + @putByIdDirectPrivate(stream, "controller", controller); + @putByIdDirectPrivate(controller, "transformAlgorithm", transformAlgorithm); + @putByIdDirectPrivate(controller, "flushAlgorithm", flushAlgorithm); +} + + +function setUpTransformStreamDefaultControllerFromTransformer(stream, transformer, transformerDict) +{ + "use strict"; + + const controller = new @TransformStreamDefaultController(); + let transformAlgorithm = (chunk) => { + try { + @transformStreamDefaultControllerEnqueue(controller, chunk); + } catch (e) { + return @Promise.@reject(e); + } + return @Promise.@resolve(); + }; + let flushAlgorithm = () => { return @Promise.@resolve(); }; + + if ("transform" in transformerDict) + transformAlgorithm = (chunk) => { + return @promiseInvokeOrNoopMethod(transformer, transformerDict["transform"], [chunk, controller]); + }; + + if ("flush" in transformerDict) { + flushAlgorithm = () => { + return @promiseInvokeOrNoopMethod(transformer, transformerDict["flush"], [controller]); + }; + } + + @setUpTransformStreamDefaultController(stream, controller, transformAlgorithm, flushAlgorithm); +} + +function transformStreamDefaultControllerClearAlgorithms(controller) +{ + "use strict"; + + // We set transformAlgorithm to true to allow GC but keep the isTransformStreamDefaultController check. + @putByIdDirectPrivate(controller, "transformAlgorithm", true); + @putByIdDirectPrivate(controller, "flushAlgorithm", @undefined); +} + +function transformStreamDefaultControllerEnqueue(controller, chunk) +{ + "use strict"; + + const stream = @getByIdDirectPrivate(controller, "stream"); + const readable = @getByIdDirectPrivate(stream, "readable"); + const readableController = @getByIdDirectPrivate(readable, "readableStreamController"); + + @assert(readableController !== @undefined); + if (!@readableStreamDefaultControllerCanCloseOrEnqueue(readableController)) + @throwTypeError("TransformStream.readable cannot close or enqueue"); + + try { + @readableStreamDefaultControllerEnqueue(readableController, chunk); + } catch (e) { + @transformStreamErrorWritableAndUnblockWrite(stream, e); + throw @getByIdDirectPrivate(readable, "storedError"); + } + + const backpressure = !@readableStreamDefaultControllerShouldCallPull(readableController); + if (backpressure !== @getByIdDirectPrivate(stream, "backpressure")) { + @assert(backpressure); + @transformStreamSetBackpressure(stream, true); + } +} + +function transformStreamDefaultControllerError(controller, e) +{ + "use strict"; + + @transformStreamError(@getByIdDirectPrivate(controller, "stream"), e); +} + +function transformStreamDefaultControllerPerformTransform(controller, chunk) +{ + "use strict"; + + const promiseCapability = @newPromiseCapability(@Promise); + + const transformPromise = @getByIdDirectPrivate(controller, "transformAlgorithm").@call(@undefined, chunk); + transformPromise.@then(() => { + promiseCapability.@resolve(); + }, (r) => { + @transformStreamError(@getByIdDirectPrivate(controller, "stream"), r); + promiseCapability.@reject.@call(@undefined, r); + }); + return promiseCapability.@promise; +} + +function transformStreamDefaultControllerTerminate(controller) +{ + "use strict"; + + const stream = @getByIdDirectPrivate(controller, "stream"); + const readable = @getByIdDirectPrivate(stream, "readable"); + const readableController = @getByIdDirectPrivate(readable, "readableStreamController"); + + // FIXME: Update readableStreamDefaultControllerClose to make this check. + if (@readableStreamDefaultControllerCanCloseOrEnqueue(readableController)) + @readableStreamDefaultControllerClose(readableController); + const error = @makeTypeError("the stream has been terminated"); + @transformStreamErrorWritableAndUnblockWrite(stream, error); +} + +function transformStreamDefaultSinkWriteAlgorithm(stream, chunk) +{ + "use strict"; + + const writable = @getByIdDirectPrivate(stream, "internalWritable"); + + @assert(@getByIdDirectPrivate(writable, "state") === "writable"); + + const controller = @getByIdDirectPrivate(stream, "controller"); + + if (@getByIdDirectPrivate(stream, "backpressure")) { + const promiseCapability = @newPromiseCapability(@Promise); + + const backpressureChangePromise = @getByIdDirectPrivate(stream, "backpressureChangePromise"); + @assert(backpressureChangePromise !== @undefined); + backpressureChangePromise.@promise.@then(() => { + const state = @getByIdDirectPrivate(writable, "state"); + if (state === "erroring") { + promiseCapability.@reject.@call(@undefined, @getByIdDirectPrivate(writable, "storedError")); + return; + } + + @assert(state === "writable"); + @transformStreamDefaultControllerPerformTransform(controller, chunk).@then(() => { + promiseCapability.@resolve(); + }, (e) => { + promiseCapability.@reject.@call(@undefined, e); + }); + }, (e) => { + promiseCapability.@reject.@call(@undefined, e); + }); + + return promiseCapability.@promise; + } + return @transformStreamDefaultControllerPerformTransform(controller, chunk); +} + +function transformStreamDefaultSinkAbortAlgorithm(stream, reason) +{ + "use strict"; + + @transformStreamError(stream, reason); + return @Promise.@resolve(); +} + +function transformStreamDefaultSinkCloseAlgorithm(stream) +{ + "use strict"; + const readable = @getByIdDirectPrivate(stream, "readable"); + const controller = @getByIdDirectPrivate(stream, "controller"); + const readableController = @getByIdDirectPrivate(readable, "readableStreamController"); + + const flushAlgorithm = @getByIdDirectPrivate(controller, "flushAlgorithm"); + @assert(flushAlgorithm !== @undefined); + const flushPromise = @getByIdDirectPrivate(controller, "flushAlgorithm").@call(); + @transformStreamDefaultControllerClearAlgorithms(controller); + + const promiseCapability = @newPromiseCapability(@Promise); + flushPromise.@then(() => { + if (@getByIdDirectPrivate(readable, "state") === @streamErrored) { + promiseCapability.@reject.@call(@undefined, @getByIdDirectPrivate(readable, "storedError")); + return; + } + + // FIXME: Update readableStreamDefaultControllerClose to make this check. + if (@readableStreamDefaultControllerCanCloseOrEnqueue(readableController)) + @readableStreamDefaultControllerClose(readableController); + promiseCapability.@resolve(); + }, (r) => { + @transformStreamError(@getByIdDirectPrivate(controller, "stream"), r); + promiseCapability.@reject.@call(@undefined, @getByIdDirectPrivate(readable, "storedError")); + }); + return promiseCapability.@promise; +} + +function transformStreamDefaultSourcePullAlgorithm(stream) +{ + "use strict"; + + @assert(@getByIdDirectPrivate(stream, "backpressure")); + @assert(@getByIdDirectPrivate(stream, "backpressureChangePromise") !== @undefined); + + @transformStreamSetBackpressure(stream, false); + + return @getByIdDirectPrivate(stream, "backpressureChangePromise").@promise; +} diff --git a/src/bun.js/builtins/js/WritableStreamDefaultController.js b/src/bun.js/builtins/js/WritableStreamDefaultController.js new file mode 100644 index 000000000..8c42212e0 --- /dev/null +++ b/src/bun.js/builtins/js/WritableStreamDefaultController.js @@ -0,0 +1,56 @@ +/* + * Copyright (C) 2020 Apple Inc. All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY APPLE INC. AND ITS CONTRIBUTORS ``AS IS'' + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, + * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR + * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL APPLE INC. OR ITS CONTRIBUTORS + * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF + * THE POSSIBILITY OF SUCH DAMAGE. + */ + + +function initializeWritableStreamDefaultController() +{ + "use strict"; + + @putByIdDirectPrivate(this, "queue", @newQueue()); + @putByIdDirectPrivate(this, "abortSteps", (reason) => { + const result = @getByIdDirectPrivate(this, "abortAlgorithm").@call(@undefined, reason); + @writableStreamDefaultControllerClearAlgorithms(this); + return result; + }); + + @putByIdDirectPrivate(this, "errorSteps", () => { + @resetQueue(@getByIdDirectPrivate(this, "queue")); + }); + + return this; +} + +function error(e) +{ + "use strict"; + + if (@getByIdDirectPrivate(this, "abortSteps") === @undefined) + throw @makeThisTypeError("WritableStreamDefaultController", "error"); + + const stream = @getByIdDirectPrivate(this, "stream"); + if (@getByIdDirectPrivate(stream, "state") !== "writable") + return; + @writableStreamDefaultControllerError(this, e); +} diff --git a/src/bun.js/builtins/js/WritableStreamDefaultWriter.js b/src/bun.js/builtins/js/WritableStreamDefaultWriter.js new file mode 100644 index 000000000..69a953fc3 --- /dev/null +++ b/src/bun.js/builtins/js/WritableStreamDefaultWriter.js @@ -0,0 +1,135 @@ +/* + * Copyright (C) 2020 Apple Inc. All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY APPLE INC. AND ITS CONTRIBUTORS ``AS IS'' + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, + * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR + * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL APPLE INC. OR ITS CONTRIBUTORS + * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF + * THE POSSIBILITY OF SUCH DAMAGE. + */ + +function initializeWritableStreamDefaultWriter(stream) +{ + "use strict"; + + // stream can be a WritableStream if WritableStreamDefaultWriter constructor is called directly from JS + // or an InternalWritableStream in other code paths. + const internalStream = @getInternalWritableStream(stream); + if (internalStream) + stream = internalStream; + + if (!@isWritableStream(stream)) + @throwTypeError("WritableStreamDefaultWriter constructor takes a WritableStream"); + + @setUpWritableStreamDefaultWriter(this, stream); + return this; +} + +@getter +function closed() +{ + "use strict"; + + if (!@isWritableStreamDefaultWriter(this)) + return @Promise.@reject(@makeGetterTypeError("WritableStreamDefaultWriter", "closed")); + + return @getByIdDirectPrivate(this, "closedPromise").@promise; +} + +@getter +function desiredSize() +{ + "use strict"; + + if (!@isWritableStreamDefaultWriter(this)) + throw @makeThisTypeError("WritableStreamDefaultWriter", "desiredSize"); + + if (@getByIdDirectPrivate(this, "stream") === @undefined) + @throwTypeError("WritableStreamDefaultWriter has no stream"); + + return @writableStreamDefaultWriterGetDesiredSize(this); +} + +@getter +function ready() +{ + "use strict"; + + if (!@isWritableStreamDefaultWriter(this)) + return @Promise.@reject(@makeThisTypeError("WritableStreamDefaultWriter", "ready")); + + return @getByIdDirectPrivate(this, "readyPromise").@promise; +} + +function abort(reason) +{ + "use strict"; + + if (!@isWritableStreamDefaultWriter(this)) + return @Promise.@reject(@makeThisTypeError("WritableStreamDefaultWriter", "abort")); + + if (@getByIdDirectPrivate(this, "stream") === @undefined) + return @Promise.@reject(@makeTypeError("WritableStreamDefaultWriter has no stream")); + + return @writableStreamDefaultWriterAbort(this, reason); +} + +function close() +{ + "use strict"; + + if (!@isWritableStreamDefaultWriter(this)) + return @Promise.@reject(@makeThisTypeError("WritableStreamDefaultWriter", "close")); + + const stream = @getByIdDirectPrivate(this, "stream"); + if (stream === @undefined) + return @Promise.@reject(@makeTypeError("WritableStreamDefaultWriter has no stream")); + + if (@writableStreamCloseQueuedOrInFlight(stream)) + return @Promise.@reject(@makeTypeError("WritableStreamDefaultWriter is being closed")); + + return @writableStreamDefaultWriterClose(this); +} + +function releaseLock() +{ + "use strict"; + + if (!@isWritableStreamDefaultWriter(this)) + throw @makeThisTypeError("WritableStreamDefaultWriter", "releaseLock"); + + const stream = @getByIdDirectPrivate(this, "stream"); + if (stream === @undefined) + return; + + @assert(@getByIdDirectPrivate(stream, "writer") !== @undefined); + @writableStreamDefaultWriterRelease(this); +} + +function write(chunk) +{ + "use strict"; + + if (!@isWritableStreamDefaultWriter(this)) + return @Promise.@reject(@makeThisTypeError("WritableStreamDefaultWriter", "write")); + + if (@getByIdDirectPrivate(this, "stream") === @undefined) + return @Promise.@reject(@makeTypeError("WritableStreamDefaultWriter has no stream")); + + return @writableStreamDefaultWriterWrite(this, chunk); +} diff --git a/src/bun.js/builtins/js/WritableStreamInternals.js b/src/bun.js/builtins/js/WritableStreamInternals.js new file mode 100644 index 000000000..5a97155f2 --- /dev/null +++ b/src/bun.js/builtins/js/WritableStreamInternals.js @@ -0,0 +1,856 @@ +/* + * Copyright (C) 2015 Canon Inc. + * Copyright (C) 2015 Igalia + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY APPLE INC. ``AS IS'' AND ANY + * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR + * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL APPLE INC. OR + * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, + * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, + * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR + * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY + * OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +// @internal + +function isWritableStream(stream) +{ + "use strict"; + + return @isObject(stream) && !!@getByIdDirectPrivate(stream, "underlyingSink"); +} + +function isWritableStreamDefaultWriter(writer) +{ + "use strict"; + + return @isObject(writer) && !!@getByIdDirectPrivate(writer, "closedPromise"); +} + +function acquireWritableStreamDefaultWriter(stream) +{ + return new @WritableStreamDefaultWriter(stream); +} + +// https://streams.spec.whatwg.org/#create-writable-stream +function createWritableStream(startAlgorithm, writeAlgorithm, closeAlgorithm, abortAlgorithm, highWaterMark, sizeAlgorithm) +{ + @assert(typeof highWaterMark === "number" && !@isNaN(highWaterMark) && highWaterMark >= 0); + + const internalStream = { }; + @initializeWritableStreamSlots(internalStream, { }); + const controller = new @WritableStreamDefaultController(); + + @setUpWritableStreamDefaultController(internalStream, controller, startAlgorithm, writeAlgorithm, closeAlgorithm, abortAlgorithm, highWaterMark, sizeAlgorithm); + + return @createWritableStreamFromInternal(internalStream); +} + +function createInternalWritableStreamFromUnderlyingSink(underlyingSink, strategy) +{ + "use strict"; + + const stream = { }; + + if (underlyingSink === @undefined) + underlyingSink = { }; + + if (strategy === @undefined) + strategy = { }; + + if (!@isObject(underlyingSink)) + @throwTypeError("WritableStream constructor takes an object as first argument"); + + if ("type" in underlyingSink) + @throwRangeError("Invalid type is specified"); + + const sizeAlgorithm = @extractSizeAlgorithm(strategy); + const highWaterMark = @extractHighWaterMark(strategy, 1); + + const underlyingSinkDict = { }; + if ("start" in underlyingSink) { + underlyingSinkDict["start"] = underlyingSink["start"]; + if (typeof underlyingSinkDict["start"] !== "function") + @throwTypeError("underlyingSink.start should be a function"); + } + if ("write" in underlyingSink) { + underlyingSinkDict["write"] = underlyingSink["write"]; + if (typeof underlyingSinkDict["write"] !== "function") + @throwTypeError("underlyingSink.write should be a function"); + } + if ("close" in underlyingSink) { + underlyingSinkDict["close"] = underlyingSink["close"]; + if (typeof underlyingSinkDict["close"] !== "function") + @throwTypeError("underlyingSink.close should be a function"); + } + if ("abort" in underlyingSink) { + underlyingSinkDict["abort"] = underlyingSink["abort"]; + if (typeof underlyingSinkDict["abort"] !== "function") + @throwTypeError("underlyingSink.abort should be a function"); + } + + @initializeWritableStreamSlots(stream, underlyingSink); + @setUpWritableStreamDefaultControllerFromUnderlyingSink(stream, underlyingSink, underlyingSinkDict, highWaterMark, sizeAlgorithm); + + return stream; +} + +function initializeWritableStreamSlots(stream, underlyingSink) +{ "use strict"; + + @putByIdDirectPrivate(stream, "state", "writable"); + @putByIdDirectPrivate(stream, "storedError", @undefined); + @putByIdDirectPrivate(stream, "writer", @undefined); + @putByIdDirectPrivate(stream, "controller", @undefined); + @putByIdDirectPrivate(stream, "inFlightWriteRequest", @undefined); + @putByIdDirectPrivate(stream, "closeRequest", @undefined); + @putByIdDirectPrivate(stream, "inFlightCloseRequest", @undefined); + @putByIdDirectPrivate(stream, "pendingAbortRequest", @undefined); + @putByIdDirectPrivate(stream, "writeRequests", @createFIFO()); + @putByIdDirectPrivate(stream, "backpressure", false); + @putByIdDirectPrivate(stream, "underlyingSink", underlyingSink); +} + +function writableStreamCloseForBindings(stream) +{ "use strict"; + + if (@isWritableStreamLocked(stream)) + return @Promise.@reject(@makeTypeError("WritableStream.close method can only be used on non locked WritableStream")); + + if (@writableStreamCloseQueuedOrInFlight(stream)) + return @Promise.@reject(@makeTypeError("WritableStream.close method can only be used on a being close WritableStream")); + + return @writableStreamClose(stream); +} + +function writableStreamAbortForBindings(stream, reason) +{ "use strict"; + + if (@isWritableStreamLocked(stream)) + return @Promise.@reject(@makeTypeError("WritableStream.abort method can only be used on non locked WritableStream")); + + return @writableStreamAbort(stream, reason); +} + +function isWritableStreamLocked(stream) +{ "use strict"; + + return @getByIdDirectPrivate(stream, "writer") !== @undefined; +} + +function setUpWritableStreamDefaultWriter(writer, stream) +{ "use strict"; + + if (@isWritableStreamLocked(stream)) + @throwTypeError("WritableStream is locked"); + + @putByIdDirectPrivate(writer, "stream", stream); + @putByIdDirectPrivate(stream, "writer", writer); + + const readyPromiseCapability = @newPromiseCapability(@Promise); + const closedPromiseCapability = @newPromiseCapability(@Promise); + @putByIdDirectPrivate(writer, "readyPromise", readyPromiseCapability); + @putByIdDirectPrivate(writer, "closedPromise", closedPromiseCapability); + + const state = @getByIdDirectPrivate(stream, "state"); + if (state === "writable") { + if (@writableStreamCloseQueuedOrInFlight(stream) || !@getByIdDirectPrivate(stream, "backpressure")) + readyPromiseCapability.@resolve.@call(); + } else if (state === "erroring") { + readyPromiseCapability.@reject.@call(@undefined, @getByIdDirectPrivate(stream, "storedError")); + @markPromiseAsHandled(readyPromiseCapability.@promise); + } else if (state === "closed") { + readyPromiseCapability.@resolve.@call(); + closedPromiseCapability.@resolve.@call(); + } else { + @assert(state === "errored"); + const storedError = @getByIdDirectPrivate(stream, "storedError"); + readyPromiseCapability.@reject.@call(@undefined, storedError); + @markPromiseAsHandled(readyPromiseCapability.@promise); + closedPromiseCapability.@reject.@call(@undefined, storedError); + @markPromiseAsHandled(closedPromiseCapability.@promise); + } +} + +function writableStreamAbort(stream, reason) +{ + "use strict"; + const state = @getByIdDirectPrivate(stream, "state"); + if (state === "closed" || state === "errored") + return @Promise.@resolve(); + + const pendingAbortRequest = @getByIdDirectPrivate(stream, "pendingAbortRequest"); + if (pendingAbortRequest !== @undefined) + return pendingAbortRequest.promise.@promise; + + @assert(state === "writable" || state === "erroring"); + let wasAlreadyErroring = false; + if (state === "erroring") { + wasAlreadyErroring = true; + reason = @undefined; + } + + const abortPromiseCapability = @newPromiseCapability(@Promise); + @putByIdDirectPrivate(stream, "pendingAbortRequest", { promise : abortPromiseCapability, reason : reason, wasAlreadyErroring : wasAlreadyErroring }); + + if (!wasAlreadyErroring) + @writableStreamStartErroring(stream, reason); + return abortPromiseCapability.@promise; +} + +function writableStreamClose(stream) +{ + "use strict"; + + const state = @getByIdDirectPrivate(stream, "state"); + if (state === "closed" || state === "errored") + return @Promise.@reject(@makeTypeError("Cannot close a writable stream that is closed or errored")); + + @assert(state === "writable" || state === "erroring"); + @assert(!@writableStreamCloseQueuedOrInFlight(stream)); + + const closePromiseCapability = @newPromiseCapability(@Promise); + @putByIdDirectPrivate(stream, "closeRequest", closePromiseCapability); + + const writer = @getByIdDirectPrivate(stream, "writer"); + if (writer !== @undefined && @getByIdDirectPrivate(stream, "backpressure") && state === "writable") + @getByIdDirectPrivate(writer, "readyPromise").@resolve.@call(); + + @writableStreamDefaultControllerClose(@getByIdDirectPrivate(stream, "controller")); + + return closePromiseCapability.@promise; +} + +function writableStreamAddWriteRequest(stream) +{ + "use strict"; + + @assert(@isWritableStreamLocked(stream)) + @assert(@getByIdDirectPrivate(stream, "state") === "writable"); + + const writePromiseCapability = @newPromiseCapability(@Promise); + const writeRequests = @getByIdDirectPrivate(stream, "writeRequests"); + writeRequests.push(writePromiseCapability); + return writePromiseCapability.@promise; +} + +function writableStreamCloseQueuedOrInFlight(stream) +{ + "use strict"; + + return @getByIdDirectPrivate(stream, "closeRequest") !== @undefined || @getByIdDirectPrivate(stream, "inFlightCloseRequest") !== @undefined; +} + +function writableStreamDealWithRejection(stream, error) +{ + "use strict"; + + const state = @getByIdDirectPrivate(stream, "state"); + if (state === "writable") { + @writableStreamStartErroring(stream, error); + return; + } + + @assert(state === "erroring"); + @writableStreamFinishErroring(stream); +} + +function writableStreamFinishErroring(stream) +{ + "use strict"; + + @assert(@getByIdDirectPrivate(stream, "state") === "erroring"); + @assert(!@writableStreamHasOperationMarkedInFlight(stream)); + + @putByIdDirectPrivate(stream, "state", "errored"); + + const controller = @getByIdDirectPrivate(stream, "controller"); + @getByIdDirectPrivate(controller, "errorSteps").@call(); + + const storedError = @getByIdDirectPrivate(stream, "storedError"); + const requests = @getByIdDirectPrivate(stream, "writeRequests"); + for (var request = requests.shift(); request; request = requests.shift()) + request.@reject.@call(@undefined, storedError); + + // TODO: is this still necessary? + @putByIdDirectPrivate(stream, "writeRequests", @createFIFO()); + + const abortRequest = @getByIdDirectPrivate(stream, "pendingAbortRequest"); + if (abortRequest === @undefined) { + @writableStreamRejectCloseAndClosedPromiseIfNeeded(stream); + return; + } + + @putByIdDirectPrivate(stream, "pendingAbortRequest", @undefined); + if (abortRequest.wasAlreadyErroring) { + abortRequest.promise.@reject.@call(@undefined, storedError); + @writableStreamRejectCloseAndClosedPromiseIfNeeded(stream); + return; + } + + @getByIdDirectPrivate(controller, "abortSteps").@call(@undefined, abortRequest.reason).@then(() => { + abortRequest.promise.@resolve.@call(); + @writableStreamRejectCloseAndClosedPromiseIfNeeded(stream); + }, (reason) => { + abortRequest.promise.@reject.@call(@undefined, reason); + @writableStreamRejectCloseAndClosedPromiseIfNeeded(stream); + }); +} + +function writableStreamFinishInFlightClose(stream) +{ + "use strict"; + + const inFlightCloseRequest = @getByIdDirectPrivate(stream, "inFlightCloseRequest"); + inFlightCloseRequest.@resolve.@call(); + + @putByIdDirectPrivate(stream, "inFlightCloseRequest", @undefined); + + const state = @getByIdDirectPrivate(stream, "state"); + @assert(state === "writable" || state === "erroring"); + + if (state === "erroring") { + @putByIdDirectPrivate(stream, "storedError", @undefined); + const abortRequest = @getByIdDirectPrivate(stream, "pendingAbortRequest"); + if (abortRequest !== @undefined) { + abortRequest.promise.@resolve.@call(); + @putByIdDirectPrivate(stream, "pendingAbortRequest", @undefined); + } + } + + @putByIdDirectPrivate(stream, "state", "closed"); + + const writer = @getByIdDirectPrivate(stream, "writer"); + if (writer !== @undefined) + @getByIdDirectPrivate(writer, "closedPromise").@resolve.@call(); + + @assert(@getByIdDirectPrivate(stream, "pendingAbortRequest") === @undefined); + @assert(@getByIdDirectPrivate(stream, "storedError") === @undefined); +} + +function writableStreamFinishInFlightCloseWithError(stream, error) +{ + "use strict"; + + const inFlightCloseRequest = @getByIdDirectPrivate(stream, "inFlightCloseRequest"); + @assert(inFlightCloseRequest !== @undefined); + inFlightCloseRequest.@reject.@call(@undefined, error); + + @putByIdDirectPrivate(stream, "inFlightCloseRequest", @undefined); + + const state = @getByIdDirectPrivate(stream, "state"); + @assert(state === "writable" || state === "erroring"); + + const abortRequest = @getByIdDirectPrivate(stream, "pendingAbortRequest"); + if (abortRequest !== @undefined) { + abortRequest.promise.@reject.@call(@undefined, error); + @putByIdDirectPrivate(stream, "pendingAbortRequest", @undefined); + } + + @writableStreamDealWithRejection(stream, error); +} + +function writableStreamFinishInFlightWrite(stream) +{ + "use strict"; + + const inFlightWriteRequest = @getByIdDirectPrivate(stream, "inFlightWriteRequest"); + @assert(inFlightWriteRequest !== @undefined); + inFlightWriteRequest.@resolve.@call(); + + @putByIdDirectPrivate(stream, "inFlightWriteRequest", @undefined); +} + +function writableStreamFinishInFlightWriteWithError(stream, error) +{ + "use strict"; + + const inFlightWriteRequest = @getByIdDirectPrivate(stream, "inFlightWriteRequest"); + @assert(inFlightWriteRequest !== @undefined); + inFlightWriteRequest.@reject.@call(@undefined, error); + + @putByIdDirectPrivate(stream, "inFlightWriteRequest", @undefined); + + const state = @getByIdDirectPrivate(stream, "state"); + @assert(state === "writable" || state === "erroring"); + + @writableStreamDealWithRejection(stream, error); +} + +function writableStreamHasOperationMarkedInFlight(stream) +{ + "use strict"; + + return @getByIdDirectPrivate(stream, "inFlightWriteRequest") !== @undefined || @getByIdDirectPrivate(stream, "inFlightCloseRequest") !== @undefined; +} + +function writableStreamMarkCloseRequestInFlight(stream) +{ + "use strict"; + + const closeRequest = @getByIdDirectPrivate(stream, "closeRequest"); + @assert(@getByIdDirectPrivate(stream, "inFlightCloseRequest") === @undefined); + @assert(closeRequest !== @undefined); + + @putByIdDirectPrivate(stream, "inFlightCloseRequest", closeRequest); + @putByIdDirectPrivate(stream, "closeRequest", @undefined); +} + +function writableStreamMarkFirstWriteRequestInFlight(stream) +{ + "use strict"; + + const writeRequests = @getByIdDirectPrivate(stream, "writeRequests"); + @assert(@getByIdDirectPrivate(stream, "inFlightWriteRequest") === @undefined); + @assert(writeRequests.isNotEmpty()); + + const writeRequest = writeRequests.shift(); + @putByIdDirectPrivate(stream, "inFlightWriteRequest", writeRequest); +} + +function writableStreamRejectCloseAndClosedPromiseIfNeeded(stream) +{ + "use strict"; + + @assert(@getByIdDirectPrivate(stream, "state") === "errored"); + + const storedError = @getByIdDirectPrivate(stream, "storedError"); + + const closeRequest = @getByIdDirectPrivate(stream, "closeRequest"); + if (closeRequest !== @undefined) { + @assert(@getByIdDirectPrivate(stream, "inFlightCloseRequest") === @undefined); + closeRequest.@reject.@call(@undefined, storedError); + @putByIdDirectPrivate(stream, "closeRequest", @undefined); + } + + const writer = @getByIdDirectPrivate(stream, "writer"); + if (writer !== @undefined) { + const closedPromise = @getByIdDirectPrivate(writer, "closedPromise"); + closedPromise.@reject.@call(@undefined, storedError); + @markPromiseAsHandled(closedPromise.@promise); + } +} + +function writableStreamStartErroring(stream, reason) +{ + "use strict"; + + @assert(@getByIdDirectPrivate(stream, "storedError") === @undefined); + @assert(@getByIdDirectPrivate(stream, "state") === "writable"); + + const controller = @getByIdDirectPrivate(stream, "controller"); + @assert(controller !== @undefined); + + @putByIdDirectPrivate(stream, "state", "erroring"); + @putByIdDirectPrivate(stream, "storedError", reason); + + const writer = @getByIdDirectPrivate(stream, "writer"); + if (writer !== @undefined) + @writableStreamDefaultWriterEnsureReadyPromiseRejected(writer, reason); + + if (!@writableStreamHasOperationMarkedInFlight(stream) && @getByIdDirectPrivate(controller, "started") === 1) + @writableStreamFinishErroring(stream); +} + +function writableStreamUpdateBackpressure(stream, backpressure) +{ + "use strict"; + @assert(@getByIdDirectPrivate(stream, "state") === "writable"); + @assert(!@writableStreamCloseQueuedOrInFlight(stream)); + + const writer = @getByIdDirectPrivate(stream, "writer"); + if (writer !== @undefined && backpressure !== @getByIdDirectPrivate(stream, "backpressure")) { + if (backpressure) + @putByIdDirectPrivate(writer, "readyPromise", @newPromiseCapability(@Promise)); + else + @getByIdDirectPrivate(writer, "readyPromise").@resolve.@call(); + } + @putByIdDirectPrivate(stream, "backpressure", backpressure); +} + +function writableStreamDefaultWriterAbort(writer, reason) +{ + "use strict"; + const stream = @getByIdDirectPrivate(writer, "stream"); + @assert(stream !== @undefined); + return @writableStreamAbort(stream, reason); +} + +function writableStreamDefaultWriterClose(writer) +{ + "use strict"; + const stream = @getByIdDirectPrivate(writer, "stream"); + @assert(stream !== @undefined); + return @writableStreamClose(stream); +} + +function writableStreamDefaultWriterCloseWithErrorPropagation(writer) +{ + "use strict"; + const stream = @getByIdDirectPrivate(writer, "stream"); + @assert(stream !== @undefined); + + const state = @getByIdDirectPrivate(stream, "state"); + + if (@writableStreamCloseQueuedOrInFlight(stream) || state === "closed") + return @Promise.@resolve(); + + if (state === "errored") + return @Promise.@reject(@getByIdDirectPrivate(stream, "storedError")); + + @assert(state === "writable" || state === "erroring"); + return @writableStreamDefaultWriterClose(writer); +} + +function writableStreamDefaultWriterEnsureClosedPromiseRejected(writer, error) +{ + "use strict"; + let closedPromiseCapability = @getByIdDirectPrivate(writer, "closedPromise"); + let closedPromise = closedPromiseCapability.@promise; + + if ((@getPromiseInternalField(closedPromise, @promiseFieldFlags) & @promiseStateMask) !== @promiseStatePending) { + closedPromiseCapability = @newPromiseCapability(@Promise); + closedPromise = closedPromiseCapability.@promise; + @putByIdDirectPrivate(writer, "closedPromise", closedPromiseCapability); + } + + closedPromiseCapability.@reject.@call(@undefined, error); + @markPromiseAsHandled(closedPromise); +} + +function writableStreamDefaultWriterEnsureReadyPromiseRejected(writer, error) +{ + "use strict"; + let readyPromiseCapability = @getByIdDirectPrivate(writer, "readyPromise"); + let readyPromise = readyPromiseCapability.@promise; + + if ((@getPromiseInternalField(readyPromise, @promiseFieldFlags) & @promiseStateMask) !== @promiseStatePending) { + readyPromiseCapability = @newPromiseCapability(@Promise); + readyPromise = readyPromiseCapability.@promise; + @putByIdDirectPrivate(writer, "readyPromise", readyPromiseCapability); + } + + readyPromiseCapability.@reject.@call(@undefined, error); + @markPromiseAsHandled(readyPromise); +} + +function writableStreamDefaultWriterGetDesiredSize(writer) +{ + "use strict"; + const stream = @getByIdDirectPrivate(writer, "stream"); + @assert(stream !== @undefined); + + const state = @getByIdDirectPrivate(stream, "state"); + + if (state === "errored" || state === "erroring") + return null; + + if (state === "closed") + return 0; + + return @writableStreamDefaultControllerGetDesiredSize(@getByIdDirectPrivate(stream, "controller")); +} + +function writableStreamDefaultWriterRelease(writer) +{ + "use strict"; + + const stream = @getByIdDirectPrivate(writer, "stream"); + @assert(stream !== @undefined); + @assert(@getByIdDirectPrivate(stream, "writer") === writer); + + const releasedError = @makeTypeError("writableStreamDefaultWriterRelease"); + + @writableStreamDefaultWriterEnsureReadyPromiseRejected(writer, releasedError); + @writableStreamDefaultWriterEnsureClosedPromiseRejected(writer, releasedError); + + @putByIdDirectPrivate(stream, "writer", @undefined); + @putByIdDirectPrivate(writer, "stream", @undefined); +} + +function writableStreamDefaultWriterWrite(writer, chunk) +{ + "use strict"; + + const stream = @getByIdDirectPrivate(writer, "stream"); + @assert(stream !== @undefined); + + const controller = @getByIdDirectPrivate(stream, "controller"); + @assert(controller !== @undefined); + const chunkSize = @writableStreamDefaultControllerGetChunkSize(controller, chunk); + + if (stream !== @getByIdDirectPrivate(writer, "stream")) + return @Promise.@reject(@makeTypeError("writer is not stream's writer")); + + const state = @getByIdDirectPrivate(stream, "state"); + if (state === "errored") + return @Promise.@reject(@getByIdDirectPrivate(stream, "storedError")); + + if (@writableStreamCloseQueuedOrInFlight(stream) || state === "closed") + return @Promise.@reject(@makeTypeError("stream is closing or closed")); + + if (@writableStreamCloseQueuedOrInFlight(stream) || state === "closed") + return @Promise.@reject(@makeTypeError("stream is closing or closed")); + + if (state === "erroring") + return @Promise.@reject(@getByIdDirectPrivate(stream, "storedError")); + + @assert(state === "writable"); + + const promise = @writableStreamAddWriteRequest(stream); + @writableStreamDefaultControllerWrite(controller, chunk, chunkSize); + return promise; +} + +function setUpWritableStreamDefaultController(stream, controller, startAlgorithm, writeAlgorithm, closeAlgorithm, abortAlgorithm, highWaterMark, sizeAlgorithm) +{ + "use strict"; + + @assert(@isWritableStream(stream)); + @assert(@getByIdDirectPrivate(stream, "controller") === @undefined); + + @putByIdDirectPrivate(controller, "stream", stream); + @putByIdDirectPrivate(stream, "controller", controller); + + @resetQueue(@getByIdDirectPrivate(controller, "queue")); + + @putByIdDirectPrivate(controller, "started", -1); + @putByIdDirectPrivate(controller, "startAlgorithm", startAlgorithm); + @putByIdDirectPrivate(controller, "strategySizeAlgorithm", sizeAlgorithm); + @putByIdDirectPrivate(controller, "strategyHWM", highWaterMark); + @putByIdDirectPrivate(controller, "writeAlgorithm", writeAlgorithm); + @putByIdDirectPrivate(controller, "closeAlgorithm", closeAlgorithm); + @putByIdDirectPrivate(controller, "abortAlgorithm", abortAlgorithm); + + const backpressure = @writableStreamDefaultControllerGetBackpressure(controller); + @writableStreamUpdateBackpressure(stream, backpressure); + + @writableStreamDefaultControllerStart(controller); +} + +function writableStreamDefaultControllerStart(controller) { + "use strict"; + + if (@getByIdDirectPrivate(controller, "started") !== -1) + return; + + @putByIdDirectPrivate(controller, "started", 0); + + const startAlgorithm = @getByIdDirectPrivate(controller, "startAlgorithm"); + @putByIdDirectPrivate(controller, "startAlgorithm", @undefined); + const stream = @getByIdDirectPrivate(controller, "stream"); + return @Promise.@resolve(startAlgorithm.@call()).@then(() => { + const state = @getByIdDirectPrivate(stream, "state"); + @assert(state === "writable" || state === "erroring"); + @putByIdDirectPrivate(controller, "started", 1); + @writableStreamDefaultControllerAdvanceQueueIfNeeded(controller); + }, (error) => { + const state = @getByIdDirectPrivate(stream, "state"); + @assert(state === "writable" || state === "erroring"); + @putByIdDirectPrivate(controller, "started", 1); + @writableStreamDealWithRejection(stream, error); + }); +} + +function setUpWritableStreamDefaultControllerFromUnderlyingSink(stream, underlyingSink, underlyingSinkDict, highWaterMark, sizeAlgorithm) +{ + "use strict"; + const controller = new @WritableStreamDefaultController(); + + let startAlgorithm = () => { }; + let writeAlgorithm = () => { return @Promise.@resolve(); }; + let closeAlgorithm = () => { return @Promise.@resolve(); }; + let abortAlgorithm = () => { return @Promise.@resolve(); }; + + if ("start" in underlyingSinkDict) { + const startMethod = underlyingSinkDict["start"]; + startAlgorithm = () => @promiseInvokeOrNoopMethodNoCatch(underlyingSink, startMethod, [controller]); + } + if ("write" in underlyingSinkDict) { + const writeMethod = underlyingSinkDict["write"]; + writeAlgorithm = (chunk) => @promiseInvokeOrNoopMethod(underlyingSink, writeMethod, [chunk, controller]); + } + if ("close" in underlyingSinkDict) { + const closeMethod = underlyingSinkDict["close"]; + closeAlgorithm = () => @promiseInvokeOrNoopMethod(underlyingSink, closeMethod, []); + } + if ("abort" in underlyingSinkDict) { + const abortMethod = underlyingSinkDict["abort"]; + abortAlgorithm = (reason) => @promiseInvokeOrNoopMethod(underlyingSink, abortMethod, [reason]); + } + + @setUpWritableStreamDefaultController(stream, controller, startAlgorithm, writeAlgorithm, closeAlgorithm, abortAlgorithm, highWaterMark, sizeAlgorithm); +} + +function writableStreamDefaultControllerAdvanceQueueIfNeeded(controller) +{ + "use strict"; + const stream = @getByIdDirectPrivate(controller, "stream"); + + if (@getByIdDirectPrivate(controller, "started") !== 1) + return; + + @assert(stream !== @undefined); + if (@getByIdDirectPrivate(stream, "inFlightWriteRequest") !== @undefined) + return; + + const state = @getByIdDirectPrivate(stream, "state"); + @assert(state !== "closed" || state !== "errored"); + if (state === "erroring") { + @writableStreamFinishErroring(stream); + return; + } + + if (@getByIdDirectPrivate(controller, "queue").content?.isEmpty() ?? false) + return; + + const value = @peekQueueValue(@getByIdDirectPrivate(controller, "queue")); + if (value === @isCloseSentinel) + @writableStreamDefaultControllerProcessClose(controller); + else + @writableStreamDefaultControllerProcessWrite(controller, value); +} + +function isCloseSentinel() +{ +} + +function writableStreamDefaultControllerClearAlgorithms(controller) +{ + "use strict"; + @putByIdDirectPrivate(controller, "writeAlgorithm", @undefined); + @putByIdDirectPrivate(controller, "closeAlgorithm", @undefined); + @putByIdDirectPrivate(controller, "abortAlgorithm", @undefined); + @putByIdDirectPrivate(controller, "strategySizeAlgorithm", @undefined); +} + +function writableStreamDefaultControllerClose(controller) +{ + "use strict"; + @enqueueValueWithSize(@getByIdDirectPrivate(controller, "queue"), @isCloseSentinel, 0); + @writableStreamDefaultControllerAdvanceQueueIfNeeded(controller); +} + +function writableStreamDefaultControllerError(controller, error) +{ + "use strict"; + const stream = @getByIdDirectPrivate(controller, "stream"); + @assert(stream !== @undefined); + @assert(@getByIdDirectPrivate(stream, "state") === "writable"); + + @writableStreamDefaultControllerClearAlgorithms(controller); + @writableStreamStartErroring(stream, error); +} + +function writableStreamDefaultControllerErrorIfNeeded(controller, error) +{ + "use strict"; + const stream = @getByIdDirectPrivate(controller, "stream"); + if (@getByIdDirectPrivate(stream, "state") === "writable") + @writableStreamDefaultControllerError(controller, error); +} + +function writableStreamDefaultControllerGetBackpressure(controller) +{ + "use strict"; + const desiredSize = @writableStreamDefaultControllerGetDesiredSize(controller); + return desiredSize <= 0; +} + +function writableStreamDefaultControllerGetChunkSize(controller, chunk) +{ + "use strict"; + try { + return @getByIdDirectPrivate(controller, "strategySizeAlgorithm").@call(@undefined, chunk); + } catch (e) { + @writableStreamDefaultControllerErrorIfNeeded(controller, e); + return 1; + } +} + +function writableStreamDefaultControllerGetDesiredSize(controller) +{ + "use strict"; + return @getByIdDirectPrivate(controller, "strategyHWM") - @getByIdDirectPrivate(controller, "queue").size; +} + +function writableStreamDefaultControllerProcessClose(controller) +{ + "use strict"; + const stream = @getByIdDirectPrivate(controller, "stream"); + + @writableStreamMarkCloseRequestInFlight(stream); + @dequeueValue(@getByIdDirectPrivate(controller, "queue")); + + @assert(@getByIdDirectPrivate(controller, "queue").content?.isEmpty()); + + const sinkClosePromise = @getByIdDirectPrivate(controller, "closeAlgorithm").@call(); + @writableStreamDefaultControllerClearAlgorithms(controller); + + sinkClosePromise.@then(() => { + @writableStreamFinishInFlightClose(stream); + }, (reason) => { + @writableStreamFinishInFlightCloseWithError(stream, reason); + }); +} + +function writableStreamDefaultControllerProcessWrite(controller, chunk) +{ + "use strict"; + const stream = @getByIdDirectPrivate(controller, "stream"); + + @writableStreamMarkFirstWriteRequestInFlight(stream); + + const sinkWritePromise = @getByIdDirectPrivate(controller, "writeAlgorithm").@call(@undefined, chunk); + + sinkWritePromise.@then(() => { + @writableStreamFinishInFlightWrite(stream); + const state = @getByIdDirectPrivate(stream, "state"); + @assert(state === "writable" || state === "erroring"); + + @dequeueValue(@getByIdDirectPrivate(controller, "queue")); + if (!@writableStreamCloseQueuedOrInFlight(stream) && state === "writable") { + const backpressure = @writableStreamDefaultControllerGetBackpressure(controller); + @writableStreamUpdateBackpressure(stream, backpressure); + } + @writableStreamDefaultControllerAdvanceQueueIfNeeded(controller); + }, (reason) => { + const state = @getByIdDirectPrivate(stream, "state"); + if (state === "writable") + @writableStreamDefaultControllerClearAlgorithms(controller); + + @writableStreamFinishInFlightWriteWithError(stream, reason); + }); +} + +function writableStreamDefaultControllerWrite(controller, chunk, chunkSize) +{ + "use strict"; + try { + @enqueueValueWithSize(@getByIdDirectPrivate(controller, "queue"), chunk, chunkSize); + + const stream = @getByIdDirectPrivate(controller, "stream"); + + const state = @getByIdDirectPrivate(stream, "state"); + if (!@writableStreamCloseQueuedOrInFlight(stream) && state === "writable") { + const backpressure = @writableStreamDefaultControllerGetBackpressure(controller); + @writableStreamUpdateBackpressure(stream, backpressure); + } + @writableStreamDefaultControllerAdvanceQueueIfNeeded(controller); + } catch (e) { + @writableStreamDefaultControllerErrorIfNeeded(controller, e); + } +} |