aboutsummaryrefslogtreecommitdiff
path: root/src/bun.js/builtins/js
diff options
context:
space:
mode:
Diffstat (limited to 'src/bun.js/builtins/js')
-rw-r--r--src/bun.js/builtins/js/ByteLengthQueuingStrategy.js51
-rw-r--r--src/bun.js/builtins/js/CountQueuingStrategy.js50
-rw-r--r--src/bun.js/builtins/js/JSBufferConstructor.js65
-rw-r--r--src/bun.js/builtins/js/JSBufferPrototype.js303
-rw-r--r--src/bun.js/builtins/js/JSZigGlobalObject.js62
-rw-r--r--src/bun.js/builtins/js/ReadableByteStreamController.js117
-rw-r--r--src/bun.js/builtins/js/ReadableByteStreamInternals.js712
-rw-r--r--src/bun.js/builtins/js/ReadableStream.js506
-rw-r--r--src/bun.js/builtins/js/ReadableStreamBYOBReader.js102
-rw-r--r--src/bun.js/builtins/js/ReadableStreamBYOBRequest.js77
-rw-r--r--src/bun.js/builtins/js/ReadableStreamDefaultController.js82
-rw-r--r--src/bun.js/builtins/js/ReadableStreamDefaultReader.js182
-rw-r--r--src/bun.js/builtins/js/ReadableStreamInternals.js1255
-rw-r--r--src/bun.js/builtins/js/StreamInternals.js317
-rw-r--r--src/bun.js/builtins/js/TransformStream.js116
-rw-r--r--src/bun.js/builtins/js/TransformStreamDefaultController.js76
-rw-r--r--src/bun.js/builtins/js/TransformStreamInternals.js350
-rw-r--r--src/bun.js/builtins/js/WritableStreamDefaultController.js56
-rw-r--r--src/bun.js/builtins/js/WritableStreamDefaultWriter.js135
-rw-r--r--src/bun.js/builtins/js/WritableStreamInternals.js856
20 files changed, 5470 insertions, 0 deletions
diff --git a/src/bun.js/builtins/js/ByteLengthQueuingStrategy.js b/src/bun.js/builtins/js/ByteLengthQueuingStrategy.js
new file mode 100644
index 000000000..e8f5b1cfc
--- /dev/null
+++ b/src/bun.js/builtins/js/ByteLengthQueuingStrategy.js
@@ -0,0 +1,51 @@
+/*
+ * Copyright (C) 2015 Canon Inc.
+ * Copyright (C) 2015 Igalia S.L.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ * 1. Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY APPLE INC. ``AS IS'' AND ANY
+ * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+ * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL APPLE INC. OR
+ * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
+ * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
+ * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
+ * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY
+ * OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+@getter
+function highWaterMark()
+{
+ "use strict";
+
+ const highWaterMark = @getByIdDirectPrivate(this, "highWaterMark");
+ if (highWaterMark === @undefined)
+ @throwTypeError("ByteLengthQueuingStrategy.highWaterMark getter called on incompatible |this| value.");
+
+ return highWaterMark;
+}
+
+function size(chunk)
+{
+ "use strict";
+
+ return chunk.byteLength;
+}
+
+function initializeByteLengthQueuingStrategy(parameters)
+{
+ "use strict";
+
+ @putByIdDirectPrivate(this, "highWaterMark", @extractHighWaterMarkFromQueuingStrategyInit(parameters));
+}
diff --git a/src/bun.js/builtins/js/CountQueuingStrategy.js b/src/bun.js/builtins/js/CountQueuingStrategy.js
new file mode 100644
index 000000000..3cd9cffc2
--- /dev/null
+++ b/src/bun.js/builtins/js/CountQueuingStrategy.js
@@ -0,0 +1,50 @@
+/*
+ * Copyright (C) 2015 Canon Inc.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ * 1. Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY APPLE INC. ``AS IS'' AND ANY
+ * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+ * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL APPLE INC. OR
+ * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
+ * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
+ * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
+ * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY
+ * OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+@getter
+function highWaterMark()
+{
+ "use strict";
+
+ const highWaterMark = @getByIdDirectPrivate(this, "highWaterMark");
+ if (highWaterMark === @undefined)
+ @throwTypeError("CountQueuingStrategy.highWaterMark getter called on incompatible |this| value.");
+
+ return highWaterMark;
+}
+
+function size()
+{
+ "use strict";
+
+ return 1;
+}
+
+function initializeCountQueuingStrategy(parameters)
+{
+ "use strict";
+
+ @putByIdDirectPrivate(this, "highWaterMark", @extractHighWaterMarkFromQueuingStrategyInit(parameters));
+}
diff --git a/src/bun.js/builtins/js/JSBufferConstructor.js b/src/bun.js/builtins/js/JSBufferConstructor.js
new file mode 100644
index 000000000..9a3f0e1b7
--- /dev/null
+++ b/src/bun.js/builtins/js/JSBufferConstructor.js
@@ -0,0 +1,65 @@
+/*
+ * Copyright 2022 Codeblog Corp. All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ * 1. Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY APPLE INC. ``AS IS'' AND ANY
+ * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+ * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL APPLE INC. OR
+ * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
+ * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
+ * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
+ * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY
+ * OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+// ^ that comment is required or the builtins generator will have a fit.
+
+
+function from(items) {
+ "use strict";
+
+ if (!@isConstructor(this))
+ @throwTypeError("Buffer.from requires |this| to be a constructor");
+
+
+ // TODO: figure out why private symbol not found
+ if (typeof items === 'string' || (typeof items === 'object' && items && (items instanceof ArrayBuffer || items instanceof SharedArrayBuffer))) {
+ switch (@argumentCount()) {
+ case 1: {
+ return new this(items);
+ }
+ case 2: {
+ return new this(items, @argument(1));
+ }
+ default: {
+ return new this(items, @argument(1), @argument(2));
+ }
+ }
+ }
+
+
+ var arrayLike = @toObject(items, "Buffer.from requires an array-like object - not null or undefined");
+
+ // Buffer-specific fast path:
+ // - uninitialized memory
+ // - use .set
+ if (@isTypedArrayView(arrayLike)) {
+ var length = @typedArrayLength(arrayLike);
+ var result = this.allocUnsafe(length);
+ result.set(arrayLike);
+ return result;
+ }
+
+ return @tailCallForwardArguments(@Uint8Array.from, this);
+}
diff --git a/src/bun.js/builtins/js/JSBufferPrototype.js b/src/bun.js/builtins/js/JSBufferPrototype.js
new file mode 100644
index 000000000..c841bcd6c
--- /dev/null
+++ b/src/bun.js/builtins/js/JSBufferPrototype.js
@@ -0,0 +1,303 @@
+/*
+ * Copyright 2022 Codeblog Corp. All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ * 1. Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY APPLE INC. ``AS IS'' AND ANY
+ * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+ * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL APPLE INC. OR
+ * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
+ * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
+ * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
+ * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY
+ * OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+
+// ^ that comment is required or the builtins generator will have a fit.
+
+// The fastest way as of April 2022 is to use DataView.
+// DataView has intrinsics that cause inlining
+
+function setBigUint64(offset, value, le) {
+ "use strict";
+ return this.dataView.setBigUint64(offset, value, le);
+}
+function readInt8(offset) {
+ "use strict";
+ return this.dataView.getInt8(offset);
+}
+function readUInt8(offset) {
+ "use strict";
+ return this.dataView.getUint8(offset);
+}
+function readInt16LE(offset) {
+ "use strict";
+ return this.dataView.getInt16(offset, true);
+}
+function readInt16BE(offset) {
+ "use strict";
+ return this.dataView.getInt16(offset, false);
+}
+function readUInt16LE(offset) {
+ "use strict";
+ return this.dataView.getUint16(offset, true);
+}
+function readUInt16BE(offset) {
+ "use strict";
+ return this.dataView.getUint16(offset, false);
+}
+function readInt32LE(offset) {
+ "use strict";
+ return this.dataView.getInt32(offset, true);
+}
+function readInt32BE(offset) {
+ "use strict";
+ return this.dataView.getInt32(offset, false);
+}
+function readUInt32LE(offset) {
+ "use strict";
+ return this.dataView.getUint32(offset, true);
+}
+function readUInt32BE(offset) {
+ "use strict";
+ return this.dataView.getUint32(offset, false);
+}
+function readFloatLE(offset) {
+ "use strict";
+ return this.dataView.getFloat32(offset, true);
+}
+function readFloatBE(offset) {
+ "use strict";
+ return this.dataView.getFloat32(offset, false);
+}
+function readDoubleLE(offset) {
+ "use strict";
+ return this.dataView.getFloat64(offset, true);
+}
+function readDoubleBE(offset) {
+ "use strict";
+ return this.dataView.getFloat64(offset, false);
+}
+function readBigInt64LE(offset) {
+ "use strict";
+ return this.dataView.getBigInt64(offset, true);
+}
+function readBigInt64BE(offset) {
+ "use strict";
+ return this.dataView.getBigInt64(offset, false);
+}
+function readBigUInt64LE(offset) {
+ "use strict";
+ return this.dataView.getBigUint64(offset, true);
+}
+function readBigUInt64BE(offset) {
+ "use strict";
+ return this.dataView.getBigUint64(offset, false);
+}
+function writeInt8(value, offset) {
+ "use strict";
+ this.dataView.setInt8(offset, value);
+ return offset + 1;
+}
+function writeUInt8(value, offset) {
+ "use strict";
+ this.dataView.setUint8(offset, value);
+ return offset + 1;
+}
+function writeInt16LE(value, offset) {
+ "use strict";
+ this.dataView.setInt16(offset, value, true);
+ return offset + 2;
+}
+function writeInt16BE(value, offset) {
+ "use strict";
+ this.dataView.setInt16(offset, value, false);
+ return offset + 2;
+}
+function writeUInt16LE(value, offset) {
+ "use strict";
+ this.dataView.setUint16(offset, value, true);
+ return offset + 2;
+}
+function writeUInt16BE(value, offset) {
+ "use strict";
+ this.dataView.setUint16(offset, value, false);
+ return offset + 2;
+}
+function writeInt32LE(value, offset) {
+ "use strict";
+ this.dataView.setInt32(offset, value, true);
+ return offset + 4;
+}
+function writeInt32BE(value, offset) {
+ "use strict";
+ this.dataView.setInt32(offset, value, false);
+ return offset + 4;
+}
+function writeUInt32LE(value, offset) {
+ "use strict";
+ this.dataView.setUint32(offset, value, true);
+ return offset + 4;
+}
+function writeUInt32BE(value, offset) {
+ "use strict";
+ this.dataView.setUint32(offset, value, false);
+ return offset + 4;
+}
+
+function writeFloatLE(value, offset) {
+ "use strict";
+ this.dataView.setFloat32(offset, value, true);
+ return offset + 4;
+}
+
+function writeFloatBE(value, offset) {
+ "use strict";
+ this.dataView.setFloat32(offset, value, false);
+ return offset + 4;
+}
+
+function writeDoubleLE(value, offset) {
+ "use strict";
+ this.dataView.setFloat64(offset, value, true);
+ return offset + 8;
+}
+
+function writeDoubleBE(value, offset) {
+ "use strict";
+ this.dataView.setFloat64(offset, value, false);
+ return offset + 8;
+}
+
+function writeBigInt64LE(value, offset) {
+ "use strict";
+ this.dataView.setBigInt64(offset, value, true);
+ return offset + 8;
+}
+
+function writeBigInt64BE(value, offset) {
+ "use strict";
+ this.dataView.setBigInt64(offset, value, false);
+ return offset + 8;
+}
+
+function writeBigUInt64LE(value, offset) {
+ "use strict";
+ this.dataView.setBigUint64(offset, value, true);
+ return offset + 8;
+}
+
+function writeBigUInt64BE(value, offset) {
+ "use strict";
+ this.dataView.setBigUint64(offset, value, false);
+ return offset + 8;
+}
+
+function slice(start, end) {
+ "use strict";
+ if (start === undefined && end === undefined) {
+ return this;
+ }
+
+ Buffer[Symbol.species] ||= Buffer;
+
+ return new Buffer(this.buffer, this.byteOffset + (start || 0), (end || this.byteLength) - (start || 0));
+}
+
+function utf8Write(text, offset, length) {
+ "use strict";
+ return this.write(text, offset, length, "utf8");
+}
+function ucs2Write(text, offset, length) {
+ "use strict";
+ return this.write(text, offset, length, "ucs2");
+}
+function utf16leWrite(text, offset, length) {
+ "use strict";
+ return this.write(text, offset, length, "utf16le");
+}
+function latin1Write(text, offset, length) {
+ "use strict";
+ return this.write(text, offset, length, "latin1");
+}
+function asciiWrite(text, offset, length) {
+ "use strict";
+ return this.write(text, offset, length, "ascii");
+}
+function base64Write(text, offset, length) {
+ "use strict";
+ return this.write(text, offset, length, "base64");
+}
+function base64urlWrite(text, offset, length) {
+ "use strict";
+ return this.write(text, offset, length, "base64url");
+}
+function hexWrite(text, offset, length) {
+ "use strict";
+ return this.write(text, offset, length, "hex");
+}
+
+function utf8Slice(offset, length) {
+ "use strict";
+ return this.toString(offset, length, "utf8");
+}
+function ucs2Slice(offset, length) {
+ "use strict";
+ return this.toString(offset, length, "ucs2");
+}
+function utf16leSlice(offset, length) {
+ "use strict";
+ return this.toString(offset, length, "utf16le");
+}
+function latin1Slice(offset, length) {
+ "use strict";
+ return this.toString(offset, length, "latin1");
+}
+function asciiSlice(offset, length) {
+ "use strict";
+ return this.toString(offset, length, "ascii");
+}
+function base64Slice(offset, length) {
+ "use strict";
+ return this.toString(offset, length, "base64");
+}
+function base64urlSlice(offset, length) {
+ "use strict";
+ return this.toString(offset, length, "base64url");
+}
+function hexSlice(offset, length) {
+ "use strict";
+ return this.toString(offset, length, "hex");
+}
+
+function toJSON() {
+ "use strict";
+ const type = "Buffer";
+ const data = @Array.from(this);
+ return { type, data };
+}
+
+function subarray(start, end) {
+ "use strict";
+
+ Buffer[Symbol.species] ??= Buffer;
+ return new Buffer(this.buffer, this.byteOffset + (start || 0), (end || this.byteLength) - (start || 0));
+}
+
+
+function initializeBunBuffer(parameters)
+{
+ "use strict";
+
+}
diff --git a/src/bun.js/builtins/js/JSZigGlobalObject.js b/src/bun.js/builtins/js/JSZigGlobalObject.js
new file mode 100644
index 000000000..cb3446159
--- /dev/null
+++ b/src/bun.js/builtins/js/JSZigGlobalObject.js
@@ -0,0 +1,62 @@
+/*
+ * Copyright 2022 Codeblog Corp. All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ * 1. Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY APPLE INC. ``AS IS'' AND ANY
+ * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+ * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL APPLE INC. OR
+ * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
+ * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
+ * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
+ * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY
+ * OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+function require(name) {
+ "use strict";
+ if (typeof name !== "string") {
+ @throwTypeError("require() expects a string as its argument");
+ }
+
+ const resolved = this.resolveSync(name, this.path);
+ var requireCache = (globalThis[Symbol.for("_requireCache")] ||= new @Map);
+ var cached = requireCache.@get(resolved);
+ if (cached) {
+ if (resolved.endsWith(".node")) {
+ return cached.exports;
+ }
+
+ return cached;
+ }
+
+ // TODO: remove this hardcoding
+ if (resolved.endsWith(".json")) {
+ var fs = (globalThis[Symbol.for("_fs")] ||= Bun.fs());
+ var exports = JSON.parse(fs.readFileSync(resolved, "utf8"));
+ requireCache.@set(resolved, exports);
+ return exports;
+ } else if (resolved.endsWith(".node")) {
+ var module = { exports: {} };
+ globalThis.process.dlopen(module, resolved);
+ requireCache.@set(resolved, module);
+ return module.exports;
+ } else if (resolved.endsWith(".toml")) {
+ var fs = (globalThis[Symbol.for("_fs")] ||= Bun.fs());
+ var exports = Bun.TOML.parse(fs.readFileSync(resolved, "utf8"));
+ requireCache.@set(resolved, exports);
+ return exports;
+ }
+
+ @throwTypeError(`Dynamic require isn't supported for file type: ${resolved.subsring(resolved.lastIndexOf(".") + 1) || resolved}`);
+}
diff --git a/src/bun.js/builtins/js/ReadableByteStreamController.js b/src/bun.js/builtins/js/ReadableByteStreamController.js
new file mode 100644
index 000000000..0b47d730c
--- /dev/null
+++ b/src/bun.js/builtins/js/ReadableByteStreamController.js
@@ -0,0 +1,117 @@
+/*
+ * Copyright (C) 2016 Canon Inc.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ * 1. Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY APPLE INC. ``AS IS'' AND ANY
+ * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+ * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL APPLE INC. OR
+ * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
+ * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
+ * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
+ * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY
+ * OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+function initializeReadableByteStreamController(stream, underlyingByteSource, highWaterMark)
+{
+ "use strict";
+
+ if (arguments.length !== 4 && arguments[3] !== @isReadableStream)
+ @throwTypeError("ReadableByteStreamController constructor should not be called directly");
+
+ return @privateInitializeReadableByteStreamController.@call(this, stream, underlyingByteSource, highWaterMark);
+}
+
+function enqueue(chunk)
+{
+ "use strict";
+
+ if (!@isReadableByteStreamController(this))
+ throw @makeThisTypeError("ReadableByteStreamController", "enqueue");
+
+ if (@getByIdDirectPrivate(this, "closeRequested"))
+ @throwTypeError("ReadableByteStreamController is requested to close");
+
+ if (@getByIdDirectPrivate(@getByIdDirectPrivate(this, "controlledReadableStream"), "state") !== @streamReadable)
+ @throwTypeError("ReadableStream is not readable");
+
+ if (!@isObject(chunk) || !@ArrayBuffer.@isView(chunk))
+ @throwTypeError("Provided chunk is not a TypedArray");
+
+ return @readableByteStreamControllerEnqueue(this, chunk);
+}
+
+function error(error)
+{
+ "use strict";
+
+ if (!@isReadableByteStreamController(this))
+ throw @makeThisTypeError("ReadableByteStreamController", "error");
+
+ if (@getByIdDirectPrivate(@getByIdDirectPrivate(this, "controlledReadableStream"), "state") !== @streamReadable)
+ @throwTypeError("ReadableStream is not readable");
+
+ @readableByteStreamControllerError(this, error);
+}
+
+function close()
+{
+ "use strict";
+
+ if (!@isReadableByteStreamController(this))
+ throw @makeThisTypeError("ReadableByteStreamController", "close");
+
+ if (@getByIdDirectPrivate(this, "closeRequested"))
+ @throwTypeError("Close has already been requested");
+
+ if (@getByIdDirectPrivate(@getByIdDirectPrivate(this, "controlledReadableStream"), "state") !== @streamReadable)
+ @throwTypeError("ReadableStream is not readable");
+
+ @readableByteStreamControllerClose(this);
+}
+
+@getter
+function byobRequest()
+{
+ "use strict";
+
+ if (!@isReadableByteStreamController(this))
+ throw @makeGetterTypeError("ReadableByteStreamController", "byobRequest");
+
+
+ var request = @getByIdDirectPrivate(this, "byobRequest");
+ if (request === @undefined) {
+ var pending = @getByIdDirectPrivate(this, "pendingPullIntos");
+ const firstDescriptor = pending.peek();
+ if (firstDescriptor) {
+ const view = new @Uint8Array(firstDescriptor.buffer,
+ firstDescriptor.byteOffset + firstDescriptor.bytesFilled,
+ firstDescriptor.byteLength - firstDescriptor.bytesFilled);
+ @putByIdDirectPrivate(this, "byobRequest", new @ReadableStreamBYOBRequest(this, view, @isReadableStream));
+ }
+ }
+
+ return @getByIdDirectPrivate(this, "byobRequest");
+}
+
+@getter
+function desiredSize()
+{
+ "use strict";
+
+ if (!@isReadableByteStreamController(this))
+ throw @makeGetterTypeError("ReadableByteStreamController", "desiredSize");
+
+ return @readableByteStreamControllerGetDesiredSize(this);
+}
diff --git a/src/bun.js/builtins/js/ReadableByteStreamInternals.js b/src/bun.js/builtins/js/ReadableByteStreamInternals.js
new file mode 100644
index 000000000..01da62e1a
--- /dev/null
+++ b/src/bun.js/builtins/js/ReadableByteStreamInternals.js
@@ -0,0 +1,712 @@
+/*
+ * Copyright (C) 2016 Canon Inc. All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ * 1. Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY APPLE INC. ``AS IS'' AND ANY
+ * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+ * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL APPLE INC. OR
+ * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
+ * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
+ * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
+ * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY
+ * OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+// @internal
+
+function privateInitializeReadableByteStreamController(stream, underlyingByteSource, highWaterMark)
+{
+ "use strict";
+
+ if (!@isReadableStream(stream))
+ @throwTypeError("ReadableByteStreamController needs a ReadableStream");
+
+ // readableStreamController is initialized with null value.
+ if (@getByIdDirectPrivate(stream, "readableStreamController") !== null)
+ @throwTypeError("ReadableStream already has a controller");
+
+ @putByIdDirectPrivate(this, "controlledReadableStream", stream);
+ @putByIdDirectPrivate(this, "underlyingByteSource", underlyingByteSource);
+ @putByIdDirectPrivate(this, "pullAgain", false);
+ @putByIdDirectPrivate(this, "pulling", false);
+ @readableByteStreamControllerClearPendingPullIntos(this);
+ @putByIdDirectPrivate(this, "queue", @newQueue());
+ @putByIdDirectPrivate(this, "started", 0);
+ @putByIdDirectPrivate(this, "closeRequested", false);
+
+ let hwm = @toNumber(highWaterMark);
+ if (@isNaN(hwm) || hwm < 0)
+ @throwRangeError("highWaterMark value is negative or not a number");
+ @putByIdDirectPrivate(this, "strategyHWM", hwm);
+
+ let autoAllocateChunkSize = underlyingByteSource.autoAllocateChunkSize;
+ if (autoAllocateChunkSize !== @undefined) {
+ autoAllocateChunkSize = @toNumber(autoAllocateChunkSize);
+ if (autoAllocateChunkSize <= 0 || autoAllocateChunkSize === @Infinity || autoAllocateChunkSize === -@Infinity)
+ @throwRangeError("autoAllocateChunkSize value is negative or equal to positive or negative infinity");
+ }
+ @putByIdDirectPrivate(this, "autoAllocateChunkSize", autoAllocateChunkSize);
+ @putByIdDirectPrivate(this, "pendingPullIntos", @createFIFO());
+
+
+ const controller = this;
+ @promiseInvokeOrNoopNoCatch(@getByIdDirectPrivate(controller, "underlyingByteSource"), "start", [controller]).@then(() => {
+ @putByIdDirectPrivate(controller, "started", 1);
+ @assert(!@getByIdDirectPrivate(controller, "pulling"));
+ @assert(!@getByIdDirectPrivate(controller, "pullAgain"));
+ @readableByteStreamControllerCallPullIfNeeded(controller);
+ }, (error) => {
+ if (@getByIdDirectPrivate(stream, "state") === @streamReadable)
+ @readableByteStreamControllerError(controller, error);
+ });
+
+ @putByIdDirectPrivate(this, "cancel", @readableByteStreamControllerCancel);
+ @putByIdDirectPrivate(this, "pull", @readableByteStreamControllerPull);
+
+ return this;
+}
+
+function readableStreamByteStreamControllerStart(controller) {
+ "use strict";
+ @putByIdDirectPrivate(controller, "start", @undefined);
+
+}
+
+
+function privateInitializeReadableStreamBYOBRequest(controller, view)
+{
+ "use strict";
+
+ @putByIdDirectPrivate(this, "associatedReadableByteStreamController", controller);
+ @putByIdDirectPrivate(this, "view", view);
+}
+
+function isReadableByteStreamController(controller)
+{
+ "use strict";
+
+ // Same test mechanism as in isReadableStreamDefaultController (ReadableStreamInternals.js).
+ // See corresponding function for explanations.
+ return @isObject(controller) && !!@getByIdDirectPrivate(controller, "underlyingByteSource");
+}
+
+function isReadableStreamBYOBRequest(byobRequest)
+{
+ "use strict";
+
+ // Same test mechanism as in isReadableStreamDefaultController (ReadableStreamInternals.js).
+ // See corresponding function for explanations.
+ return @isObject(byobRequest) && !!@getByIdDirectPrivate(byobRequest, "associatedReadableByteStreamController");
+}
+
+function isReadableStreamBYOBReader(reader)
+{
+ "use strict";
+
+ // Spec tells to return true only if reader has a readIntoRequests internal slot.
+ // However, since it is a private slot, it cannot be checked using hasOwnProperty().
+ // Since readIntoRequests is initialized with an empty array, the following test is ok.
+ return @isObject(reader) && !!@getByIdDirectPrivate(reader, "readIntoRequests");
+}
+
+function readableByteStreamControllerCancel(controller, reason)
+{
+ "use strict";
+
+ var pendingPullIntos = @getByIdDirectPrivate(controller, "pendingPullIntos");
+ var first = pendingPullIntos.peek();
+ if (first)
+ first.bytesFilled = 0;
+
+ @putByIdDirectPrivate(controller, "queue", @newQueue());
+ return @promiseInvokeOrNoop(@getByIdDirectPrivate(controller, "underlyingByteSource"), "cancel", [reason]);
+}
+
+function readableByteStreamControllerError(controller, e)
+{
+ "use strict";
+
+ @assert(@getByIdDirectPrivate(@getByIdDirectPrivate(controller, "controlledReadableStream"), "state") === @streamReadable);
+ @readableByteStreamControllerClearPendingPullIntos(controller);
+ @putByIdDirectPrivate(controller, "queue", @newQueue());
+ @readableStreamError(@getByIdDirectPrivate(controller, "controlledReadableStream"), e);
+}
+
+function readableByteStreamControllerClose(controller)
+{
+ "use strict";
+
+ @assert(!@getByIdDirectPrivate(controller, "closeRequested"));
+ @assert(@getByIdDirectPrivate(@getByIdDirectPrivate(controller, "controlledReadableStream"), "state") === @streamReadable);
+
+ if (@getByIdDirectPrivate(controller, "queue").size > 0) {
+ @putByIdDirectPrivate(controller, "closeRequested", true);
+ return;
+ }
+
+ var first = @getByIdDirectPrivate(controller, "pendingPullIntos")?.peek();
+ if (first) {
+ if (first.bytesFilled > 0) {
+ const e = @makeTypeError("Close requested while there remain pending bytes");
+ @readableByteStreamControllerError(controller, e);
+ throw e;
+ }
+ }
+
+ @readableStreamClose(@getByIdDirectPrivate(controller, "controlledReadableStream"));
+}
+
+function readableByteStreamControllerClearPendingPullIntos(controller)
+{
+ "use strict";
+
+ @readableByteStreamControllerInvalidateBYOBRequest(controller);
+ var existing = @getByIdDirectPrivate(controller, "pendingPullIntos");
+ if (existing !== @undefined) {
+ existing.clear();
+ } else {
+ @putByIdDirectPrivate(controller, "pendingPullIntos", @createFIFO());
+ }
+}
+
+function readableByteStreamControllerGetDesiredSize(controller)
+{
+ "use strict";
+
+ const stream = @getByIdDirectPrivate(controller, "controlledReadableStream");
+ const state = @getByIdDirectPrivate(stream, "state");
+
+ if (state === @streamErrored)
+ return null;
+ if (state === @streamClosed)
+ return 0;
+
+ return @getByIdDirectPrivate(controller, "strategyHWM") - @getByIdDirectPrivate(controller, "queue").size;
+}
+
+function readableStreamHasBYOBReader(stream)
+{
+ "use strict";
+
+ const reader = @getByIdDirectPrivate(stream, "reader");
+ return reader !== @undefined && @isReadableStreamBYOBReader(reader);
+}
+
+function readableStreamHasDefaultReader(stream)
+{
+ "use strict";
+
+ const reader = @getByIdDirectPrivate(stream, "reader");
+ return reader !== @undefined && @isReadableStreamDefaultReader(reader);
+}
+
+function readableByteStreamControllerHandleQueueDrain(controller) {
+
+ "use strict";
+
+ @assert(@getByIdDirectPrivate(@getByIdDirectPrivate(controller, "controlledReadableStream"), "state") === @streamReadable);
+ if (!@getByIdDirectPrivate(controller, "queue").size && @getByIdDirectPrivate(controller, "closeRequested"))
+ @readableStreamClose(@getByIdDirectPrivate(controller, "controlledReadableStream"));
+ else
+ @readableByteStreamControllerCallPullIfNeeded(controller);
+}
+
+function readableByteStreamControllerPull(controller)
+{
+ "use strict";
+
+ const stream = @getByIdDirectPrivate(controller, "controlledReadableStream");
+ @assert(@readableStreamHasDefaultReader(stream));
+ if (@getByIdDirectPrivate(controller, "queue").content?.isNotEmpty()) {
+ const entry = @getByIdDirectPrivate(controller, "queue").content.shift();
+ @getByIdDirectPrivate(controller, "queue").size -= entry.byteLength;
+ @readableByteStreamControllerHandleQueueDrain(controller);
+ let view;
+ try {
+ view = new @Uint8Array(entry.buffer, entry.byteOffset, entry.byteLength);
+ } catch (error) {
+ return @Promise.@reject(error);
+ }
+ return @createFulfilledPromise({ value: view, done: false });
+ }
+
+ if (@getByIdDirectPrivate(controller, "autoAllocateChunkSize") !== @undefined) {
+ let buffer;
+ try {
+ buffer = @createUninitializedArrayBuffer(@getByIdDirectPrivate(controller, "autoAllocateChunkSize"));
+ } catch (error) {
+ return @Promise.@reject(error);
+ }
+ const pullIntoDescriptor = {
+ buffer,
+ byteOffset: 0,
+ byteLength: @getByIdDirectPrivate(controller, "autoAllocateChunkSize"),
+ bytesFilled: 0,
+ elementSize: 1,
+ ctor: @Uint8Array,
+ readerType: 'default'
+ };
+ @getByIdDirectPrivate(controller, "pendingPullIntos").push(pullIntoDescriptor);
+ }
+
+ const promise = @readableStreamAddReadRequest(stream);
+ @readableByteStreamControllerCallPullIfNeeded(controller);
+ return promise;
+}
+
+function readableByteStreamControllerShouldCallPull(controller)
+{
+ "use strict";
+
+ const stream = @getByIdDirectPrivate(controller, "controlledReadableStream");
+
+ if (@getByIdDirectPrivate(stream, "state") !== @streamReadable)
+ return false;
+ if (@getByIdDirectPrivate(controller, "closeRequested"))
+ return false;
+ if (!(@getByIdDirectPrivate(controller, "started") > 0))
+ return false;
+ const reader = @getByIdDirectPrivate(stream, "reader");
+
+ if (reader && (@getByIdDirectPrivate(reader, "readRequests")?.isNotEmpty() || !!@getByIdDirectPrivate(reader, "bunNativePtr")))
+ return true;
+ if (@readableStreamHasBYOBReader(stream) && @getByIdDirectPrivate(@getByIdDirectPrivate(stream, "reader"), "readIntoRequests")?.isNotEmpty())
+ return true;
+ if (@readableByteStreamControllerGetDesiredSize(controller) > 0)
+ return true;
+ return false;
+}
+
+function readableByteStreamControllerCallPullIfNeeded(controller)
+{
+ "use strict";
+
+ if (!@readableByteStreamControllerShouldCallPull(controller))
+ return;
+
+ if (@getByIdDirectPrivate(controller, "pulling")) {
+ @putByIdDirectPrivate(controller, "pullAgain", true);
+ return;
+ }
+
+ @assert(!@getByIdDirectPrivate(controller, "pullAgain"));
+ @putByIdDirectPrivate(controller, "pulling", true);
+ @promiseInvokeOrNoop(@getByIdDirectPrivate(controller, "underlyingByteSource"), "pull", [controller]).@then(() => {
+ @putByIdDirectPrivate(controller, "pulling", false);
+ if (@getByIdDirectPrivate(controller, "pullAgain")) {
+ @putByIdDirectPrivate(controller, "pullAgain", false);
+ @readableByteStreamControllerCallPullIfNeeded(controller);
+ }
+ }, (error) => {
+ if (@getByIdDirectPrivate(@getByIdDirectPrivate(controller, "controlledReadableStream"), "state") === @streamReadable)
+ @readableByteStreamControllerError(controller, error);
+ });
+}
+
+function transferBufferToCurrentRealm(buffer)
+{
+ "use strict";
+
+ // FIXME: Determine what should be done here exactly (what is already existing in current
+ // codebase and what has to be added). According to spec, Transfer operation should be
+ // performed in order to transfer buffer to current realm. For the moment, simply return
+ // received buffer.
+ return buffer;
+}
+
+function readableStreamReaderKind(reader) {
+ "use strict";
+
+
+ if (!!@getByIdDirectPrivate(reader, "readRequests"))
+ return @getByIdDirectPrivate(reader, "bunNativePtr") ? 3 : 1;
+
+ if (!!@getByIdDirectPrivate(reader, "readIntoRequests"))
+ return 2;
+
+ return 0;
+}
+function readableByteStreamControllerEnqueue(controller, chunk)
+{
+ "use strict";
+
+ const stream = @getByIdDirectPrivate(controller, "controlledReadableStream");
+ @assert(!@getByIdDirectPrivate(controller, "closeRequested"));
+ @assert(@getByIdDirectPrivate(stream, "state") === @streamReadable);
+
+
+ switch (@getByIdDirectPrivate(stream, "reader") ? @readableStreamReaderKind(@getByIdDirectPrivate(stream, "reader")) : 0) {
+ /* default reader */
+ case 1: {
+ if (!@getByIdDirectPrivate(@getByIdDirectPrivate(stream, "reader"), "readRequests")?.isNotEmpty())
+ @readableByteStreamControllerEnqueueChunk(controller, @transferBufferToCurrentRealm(chunk.buffer), chunk.byteOffset, chunk.byteLength);
+ else {
+ @assert(!@getByIdDirectPrivate(controller, "queue").content.size());
+ const transferredView = chunk.constructor === @Uint8Array ? chunk : new @Uint8Array(chunk.buffer, chunk.byteOffset, chunk.byteLength);
+ @readableStreamFulfillReadRequest(stream, transferredView, false);
+ }
+ break;
+ }
+
+ /* BYOB */
+ case 2: {
+ @readableByteStreamControllerEnqueueChunk(controller, @transferBufferToCurrentRealm(chunk.buffer), chunk.byteOffset, chunk.byteLength);
+ @readableByteStreamControllerProcessPullDescriptors(controller);
+ break;
+ }
+
+ /* NativeReader */
+ case 3: {
+ // reader.@enqueueNative(@getByIdDirectPrivate(reader, "bunNativePtr"), chunk);
+
+ break;
+ }
+
+ default: {
+ @assert(!@isReadableStreamLocked(stream));
+ @readableByteStreamControllerEnqueueChunk(controller, @transferBufferToCurrentRealm(chunk.buffer), chunk.byteOffset, chunk.byteLength);
+ break;
+ }
+ }
+}
+
+// Spec name: readableByteStreamControllerEnqueueChunkToQueue.
+function readableByteStreamControllerEnqueueChunk(controller, buffer, byteOffset, byteLength)
+{
+ "use strict";
+
+ @getByIdDirectPrivate(controller, "queue").content.push({
+ buffer: buffer,
+ byteOffset: byteOffset,
+ byteLength: byteLength
+ });
+ @getByIdDirectPrivate(controller, "queue").size += byteLength;
+}
+
+function readableByteStreamControllerRespondWithNewView(controller, view)
+{
+ "use strict";
+
+ @assert(@getByIdDirectPrivate(controller, "pendingPullIntos").isNotEmpty());
+
+ let firstDescriptor = @getByIdDirectPrivate(controller, "pendingPullIntos").peek();
+
+ if (firstDescriptor.byteOffset + firstDescriptor.bytesFilled !== view.byteOffset)
+ @throwRangeError("Invalid value for view.byteOffset");
+
+ if (firstDescriptor.byteLength !== view.byteLength)
+ @throwRangeError("Invalid value for view.byteLength");
+
+ firstDescriptor.buffer = view.buffer;
+ @readableByteStreamControllerRespondInternal(controller, view.byteLength);
+}
+
+function readableByteStreamControllerRespond(controller, bytesWritten)
+{
+ "use strict";
+
+ bytesWritten = @toNumber(bytesWritten);
+
+ if (@isNaN(bytesWritten) || bytesWritten === @Infinity || bytesWritten < 0 )
+ @throwRangeError("bytesWritten has an incorrect value");
+
+ @assert(@getByIdDirectPrivate(controller, "pendingPullIntos").isNotEmpty());
+
+ @readableByteStreamControllerRespondInternal(controller, bytesWritten);
+}
+
+function readableByteStreamControllerRespondInternal(controller, bytesWritten)
+{
+ "use strict";
+
+ let firstDescriptor = @getByIdDirectPrivate(controller, "pendingPullIntos").peek();
+ let stream = @getByIdDirectPrivate(controller, "controlledReadableStream");
+
+ if (@getByIdDirectPrivate(stream, "state") === @streamClosed) {
+ if (bytesWritten !== 0)
+ @throwTypeError("bytesWritten is different from 0 even though stream is closed");
+ @readableByteStreamControllerRespondInClosedState(controller, firstDescriptor);
+ } else {
+ @assert(@getByIdDirectPrivate(stream, "state") === @streamReadable);
+ @readableByteStreamControllerRespondInReadableState(controller, bytesWritten, firstDescriptor);
+ }
+}
+
+function readableByteStreamControllerRespondInReadableState(controller, bytesWritten, pullIntoDescriptor)
+{
+ "use strict";
+
+ if (pullIntoDescriptor.bytesFilled + bytesWritten > pullIntoDescriptor.byteLength)
+ @throwRangeError("bytesWritten value is too great");
+
+ @assert(@getByIdDirectPrivate(controller, "pendingPullIntos").isEmpty() || @getByIdDirectPrivate(controller, "pendingPullIntos").peek() === pullIntoDescriptor);
+ @readableByteStreamControllerInvalidateBYOBRequest(controller);
+ pullIntoDescriptor.bytesFilled += bytesWritten;
+
+ if (pullIntoDescriptor.bytesFilled < pullIntoDescriptor.elementSize)
+ return;
+
+ @readableByteStreamControllerShiftPendingDescriptor(controller);
+ const remainderSize = pullIntoDescriptor.bytesFilled % pullIntoDescriptor.elementSize;
+
+ if (remainderSize > 0) {
+ const end = pullIntoDescriptor.byteOffset + pullIntoDescriptor.bytesFilled;
+ const remainder = @cloneArrayBuffer(pullIntoDescriptor.buffer, end - remainderSize, remainderSize);
+ @readableByteStreamControllerEnqueueChunk(controller, remainder, 0, remainder.byteLength);
+ }
+
+ pullIntoDescriptor.buffer = @transferBufferToCurrentRealm(pullIntoDescriptor.buffer);
+ pullIntoDescriptor.bytesFilled -= remainderSize;
+ @readableByteStreamControllerCommitDescriptor(@getByIdDirectPrivate(controller, "controlledReadableStream"), pullIntoDescriptor);
+ @readableByteStreamControllerProcessPullDescriptors(controller);
+}
+
+function readableByteStreamControllerRespondInClosedState(controller, firstDescriptor)
+{
+ "use strict";
+
+ firstDescriptor.buffer = @transferBufferToCurrentRealm(firstDescriptor.buffer);
+ @assert(firstDescriptor.bytesFilled === 0);
+
+ if (@readableStreamHasBYOBReader(@getByIdDirectPrivate(controller, "controlledReadableStream"))) {
+ while (@getByIdDirectPrivate(@getByIdDirectPrivate(@getByIdDirectPrivate(controller, "controlledReadableStream"), "reader"), "readIntoRequests")?.isNotEmpty()) {
+ let pullIntoDescriptor = @readableByteStreamControllerShiftPendingDescriptor(controller);
+ @readableByteStreamControllerCommitDescriptor(@getByIdDirectPrivate(controller, "controlledReadableStream"), pullIntoDescriptor);
+ }
+ }
+}
+
+// Spec name: readableByteStreamControllerProcessPullIntoDescriptorsUsingQueue (shortened for readability).
+function readableByteStreamControllerProcessPullDescriptors(controller)
+{
+ "use strict";
+
+ @assert(!@getByIdDirectPrivate(controller, "closeRequested"));
+ while (@getByIdDirectPrivate(controller, "pendingPullIntos").isNotEmpty()) {
+ if (@getByIdDirectPrivate(controller, "queue").size === 0)
+ return;
+ let pullIntoDescriptor = @getByIdDirectPrivate(controller, "pendingPullIntos").peek();
+ if (@readableByteStreamControllerFillDescriptorFromQueue(controller, pullIntoDescriptor)) {
+ @readableByteStreamControllerShiftPendingDescriptor(controller);
+ @readableByteStreamControllerCommitDescriptor(@getByIdDirectPrivate(controller, "controlledReadableStream"), pullIntoDescriptor);
+ }
+ }
+}
+
+// Spec name: readableByteStreamControllerFillPullIntoDescriptorFromQueue (shortened for readability).
+function readableByteStreamControllerFillDescriptorFromQueue(controller, pullIntoDescriptor)
+{
+ "use strict";
+
+ const currentAlignedBytes = pullIntoDescriptor.bytesFilled - (pullIntoDescriptor.bytesFilled % pullIntoDescriptor.elementSize);
+ const maxBytesToCopy = @getByIdDirectPrivate(controller, "queue").size < pullIntoDescriptor.byteLength - pullIntoDescriptor.bytesFilled ?
+ @getByIdDirectPrivate(controller, "queue").size : pullIntoDescriptor.byteLength - pullIntoDescriptor.bytesFilled;
+ const maxBytesFilled = pullIntoDescriptor.bytesFilled + maxBytesToCopy;
+ const maxAlignedBytes = maxBytesFilled - (maxBytesFilled % pullIntoDescriptor.elementSize);
+ let totalBytesToCopyRemaining = maxBytesToCopy;
+ let ready = false;
+
+ if (maxAlignedBytes > currentAlignedBytes) {
+ totalBytesToCopyRemaining = maxAlignedBytes - pullIntoDescriptor.bytesFilled;
+ ready = true;
+ }
+
+ while (totalBytesToCopyRemaining > 0) {
+ let headOfQueue = @getByIdDirectPrivate(controller, "queue").content.peek();
+ const bytesToCopy = totalBytesToCopyRemaining < headOfQueue.byteLength ? totalBytesToCopyRemaining : headOfQueue.byteLength;
+ // Copy appropriate part of pullIntoDescriptor.buffer to headOfQueue.buffer.
+ // Remark: this implementation is not completely aligned on the definition of CopyDataBlockBytes
+ // operation of ECMAScript (the case of Shared Data Block is not considered here, but it doesn't seem to be an issue).
+ const destStart = pullIntoDescriptor.byteOffset + pullIntoDescriptor.bytesFilled;
+ // FIXME: As indicated in comments of bug 172717, access to set is not safe. However, using prototype.@set.@call does
+ // not work (@set is undefined). A safe way to do that is needed.
+ new @Uint8Array(pullIntoDescriptor.buffer).set(new @Uint8Array(headOfQueue.buffer, headOfQueue.byteOffset, bytesToCopy), destStart);
+
+ if (headOfQueue.byteLength === bytesToCopy)
+ @getByIdDirectPrivate(controller, "queue").content.shift();
+ else {
+ headOfQueue.byteOffset += bytesToCopy;
+ headOfQueue.byteLength -= bytesToCopy;
+ }
+
+ @getByIdDirectPrivate(controller, "queue").size -= bytesToCopy;
+ @assert(@getByIdDirectPrivate(controller, "pendingPullIntos").isEmpty() || @getByIdDirectPrivate(controller, "pendingPullIntos").peek() === pullIntoDescriptor);
+ @readableByteStreamControllerInvalidateBYOBRequest(controller);
+ pullIntoDescriptor.bytesFilled += bytesToCopy;
+ totalBytesToCopyRemaining -= bytesToCopy;
+ }
+
+ if (!ready) {
+ @assert(@getByIdDirectPrivate(controller, "queue").size === 0);
+ @assert(pullIntoDescriptor.bytesFilled > 0);
+ @assert(pullIntoDescriptor.bytesFilled < pullIntoDescriptor.elementSize);
+ }
+
+ return ready;
+}
+
+// Spec name: readableByteStreamControllerShiftPendingPullInto (renamed for consistency).
+function readableByteStreamControllerShiftPendingDescriptor(controller)
+{
+ "use strict";
+
+ let descriptor = @getByIdDirectPrivate(controller, "pendingPullIntos").shift();
+ @readableByteStreamControllerInvalidateBYOBRequest(controller);
+ return descriptor;
+}
+
+function readableByteStreamControllerInvalidateBYOBRequest(controller)
+{
+ "use strict";
+
+ if (@getByIdDirectPrivate(controller, "byobRequest") === @undefined)
+ return;
+ const byobRequest = @getByIdDirectPrivate(controller, "byobRequest");
+ @putByIdDirectPrivate(byobRequest, "associatedReadableByteStreamController", @undefined);
+ @putByIdDirectPrivate(byobRequest, "view", @undefined);
+ @putByIdDirectPrivate(controller, "byobRequest", @undefined);
+}
+
+// Spec name: readableByteStreamControllerCommitPullIntoDescriptor (shortened for readability).
+function readableByteStreamControllerCommitDescriptor(stream, pullIntoDescriptor)
+{
+ "use strict";
+
+ @assert(@getByIdDirectPrivate(stream, "state") !== @streamErrored);
+ let done = false;
+ if (@getByIdDirectPrivate(stream, "state") === @streamClosed) {
+ @assert(!pullIntoDescriptor.bytesFilled);
+ done = true;
+ }
+ let filledView = @readableByteStreamControllerConvertDescriptor(pullIntoDescriptor);
+ if (pullIntoDescriptor.readerType === "default")
+ @readableStreamFulfillReadRequest(stream, filledView, done);
+ else {
+ @assert(pullIntoDescriptor.readerType === "byob");
+ @readableStreamFulfillReadIntoRequest(stream, filledView, done);
+ }
+}
+
+// Spec name: readableByteStreamControllerConvertPullIntoDescriptor (shortened for readability).
+function readableByteStreamControllerConvertDescriptor(pullIntoDescriptor)
+{
+ "use strict";
+
+ @assert(pullIntoDescriptor.bytesFilled <= pullIntoDescriptor.byteLength);
+ @assert(pullIntoDescriptor.bytesFilled % pullIntoDescriptor.elementSize === 0);
+
+ return new pullIntoDescriptor.ctor(pullIntoDescriptor.buffer, pullIntoDescriptor.byteOffset, pullIntoDescriptor.bytesFilled / pullIntoDescriptor.elementSize);
+}
+
+function readableStreamFulfillReadIntoRequest(stream, chunk, done)
+{
+ "use strict";
+ const readIntoRequest = @getByIdDirectPrivate(@getByIdDirectPrivate(stream, "reader"), "readIntoRequests").shift();
+ @fulfillPromise(readIntoRequest, { value: chunk, done: done });
+}
+
+function readableStreamBYOBReaderRead(reader, view)
+{
+ "use strict";
+
+ const stream = @getByIdDirectPrivate(reader, "ownerReadableStream");
+ @assert(!!stream);
+
+ @putByIdDirectPrivate(stream, "disturbed", true);
+ if (@getByIdDirectPrivate(stream, "state") === @streamErrored)
+ return @Promise.@reject(@getByIdDirectPrivate(stream, "storedError"));
+
+ return @readableByteStreamControllerPullInto(@getByIdDirectPrivate(stream, "readableStreamController"), view);
+}
+
+function readableByteStreamControllerPullInto(controller, view)
+{
+ "use strict";
+
+ const stream = @getByIdDirectPrivate(controller, "controlledReadableStream");
+ let elementSize = 1;
+ // Spec describes that in the case where view is a TypedArray, elementSize
+ // should be set to the size of an element (e.g. 2 for UInt16Array). For
+ // DataView, BYTES_PER_ELEMENT is undefined, contrary to the same property
+ // for TypedArrays.
+ // FIXME: Getting BYTES_PER_ELEMENT like this is not safe (property is read-only
+ // but can be modified if the prototype is redefined). A safe way of getting
+ // it would be to determine which type of ArrayBufferView view is an instance
+ // of based on typed arrays private variables. However, this is not possible due
+ // to bug 167697, which prevents access to typed arrays through their private
+ // names unless public name has already been met before.
+ if (view.BYTES_PER_ELEMENT !== @undefined)
+ elementSize = view.BYTES_PER_ELEMENT;
+
+ // FIXME: Getting constructor like this is not safe. A safe way of getting
+ // it would be to determine which type of ArrayBufferView view is an instance
+ // of, and to assign appropriate constructor based on this (e.g. ctor =
+ // @Uint8Array). However, this is not possible due to bug 167697, which
+ // prevents access to typed arrays through their private names unless public
+ // name has already been met before.
+ const ctor = view.constructor;
+
+ const pullIntoDescriptor = {
+ buffer: view.buffer,
+ byteOffset: view.byteOffset,
+ byteLength: view.byteLength,
+ bytesFilled: 0,
+ elementSize,
+ ctor,
+ readerType: 'byob'
+ };
+
+ var pending = @getByIdDirectPrivate(controller, "pendingPullIntos");
+ if (pending?.isNotEmpty()) {
+ pullIntoDescriptor.buffer = @transferBufferToCurrentRealm(pullIntoDescriptor.buffer);
+ pending.push(pullIntoDescriptor);
+ return @readableStreamAddReadIntoRequest(stream);
+ }
+
+ if (@getByIdDirectPrivate(stream, "state") === @streamClosed) {
+ const emptyView = new ctor(pullIntoDescriptor.buffer, pullIntoDescriptor.byteOffset, 0);
+ return @createFulfilledPromise({ value: emptyView, done: true });
+ }
+
+ if (@getByIdDirectPrivate(controller, "queue").size > 0) {
+ if (@readableByteStreamControllerFillDescriptorFromQueue(controller, pullIntoDescriptor)) {
+ const filledView = @readableByteStreamControllerConvertDescriptor(pullIntoDescriptor);
+ @readableByteStreamControllerHandleQueueDrain(controller);
+ return @createFulfilledPromise({ value: filledView, done: false });
+ }
+ if (@getByIdDirectPrivate(controller, "closeRequested")) {
+ const e = @makeTypeError("Closing stream has been requested");
+ @readableByteStreamControllerError(controller, e);
+ return @Promise.@reject(e);
+ }
+ }
+
+ pullIntoDescriptor.buffer = @transferBufferToCurrentRealm(pullIntoDescriptor.buffer);
+ @getByIdDirectPrivate(controller, "pendingPullIntos").push(pullIntoDescriptor);
+ const promise = @readableStreamAddReadIntoRequest(stream);
+ @readableByteStreamControllerCallPullIfNeeded(controller);
+ return promise;
+}
+
+function readableStreamAddReadIntoRequest(stream)
+{
+ "use strict";
+
+ @assert(@isReadableStreamBYOBReader(@getByIdDirectPrivate(stream, "reader")));
+ @assert(@getByIdDirectPrivate(stream, "state") === @streamReadable || @getByIdDirectPrivate(stream, "state") === @streamClosed);
+
+ const readRequest = @newPromise();
+ @getByIdDirectPrivate(@getByIdDirectPrivate(stream, "reader"), "readIntoRequests").push(readRequest);
+
+ return readRequest;
+}
diff --git a/src/bun.js/builtins/js/ReadableStream.js b/src/bun.js/builtins/js/ReadableStream.js
new file mode 100644
index 000000000..db7cf85a8
--- /dev/null
+++ b/src/bun.js/builtins/js/ReadableStream.js
@@ -0,0 +1,506 @@
+/*
+ * Copyright (C) 2015 Canon Inc.
+ * Copyright (C) 2015 Igalia.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ * 1. Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY APPLE INC. ``AS IS'' AND ANY
+ * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+ * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL APPLE INC. OR
+ * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
+ * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
+ * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
+ * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY
+ * OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+function initializeReadableStream(underlyingSource, strategy)
+{
+ "use strict";
+
+ if (underlyingSource === @undefined)
+ underlyingSource = { @bunNativeType: 0, @bunNativePtr: 0, @lazy: false };
+ if (strategy === @undefined)
+ strategy = { };
+
+ if (!@isObject(underlyingSource))
+ @throwTypeError("ReadableStream constructor takes an object as first argument");
+
+ if (strategy !== @undefined && !@isObject(strategy))
+ @throwTypeError("ReadableStream constructor takes an object as second argument, if any");
+
+ @putByIdDirectPrivate(this, "state", @streamReadable);
+
+ @putByIdDirectPrivate(this, "reader", @undefined);
+
+ @putByIdDirectPrivate(this, "storedError", @undefined);
+
+ @putByIdDirectPrivate(this, "disturbed", false);
+
+ // Initialized with null value to enable distinction with undefined case.
+ @putByIdDirectPrivate(this, "readableStreamController", null);
+ @putByIdDirectPrivate(this, "bunNativeType", @getByIdDirectPrivate(underlyingSource, "bunNativeType") ?? 0);
+ @putByIdDirectPrivate(this, "bunNativePtr", @getByIdDirectPrivate(underlyingSource, "bunNativePtr") ?? 0);
+
+ const isDirect = underlyingSource.type === "direct";
+ // direct streams are always lazy
+ const isUnderlyingSourceLazy = !!underlyingSource.@lazy;
+ const isLazy = isDirect || isUnderlyingSourceLazy;
+
+ // // FIXME: We should introduce https://streams.spec.whatwg.org/#create-readable-stream.
+ // // For now, we emulate this with underlyingSource with private properties.
+ if (@getByIdDirectPrivate(underlyingSource, "pull") !== @undefined && !isLazy) {
+ const size = @getByIdDirectPrivate(strategy, "size");
+ const highWaterMark = @getByIdDirectPrivate(strategy, "highWaterMark");
+ @setupReadableStreamDefaultController(this, underlyingSource, size, highWaterMark !== @undefined ? highWaterMark : 1, @getByIdDirectPrivate(underlyingSource, "start"), @getByIdDirectPrivate(underlyingSource, "pull"), @getByIdDirectPrivate(underlyingSource, "cancel"));
+
+ return this;
+ }
+ if (isDirect) {
+ @putByIdDirectPrivate(this, "start", () => @createReadableStreamController(this, underlyingSource, strategy));
+ } else if (isLazy) {
+ const autoAllocateChunkSize = underlyingSource.autoAllocateChunkSize;
+
+
+ @putByIdDirectPrivate(this, "start", () => {
+ const instance = @lazyLoadStream(this, autoAllocateChunkSize);
+ if (instance) {
+ @createReadableStreamController(this, instance, strategy);
+ }
+ });
+ } else {
+ @putByIdDirectPrivate(this, "start", @undefined);
+ @createReadableStreamController(this, underlyingSource, strategy);
+ }
+
+
+ return this;
+}
+
+
+@globalPrivate
+function readableStreamToArray(stream) {
+ "use strict";
+
+ if (!stream || @getByIdDirectPrivate(stream, "state") === @streamClosed) {
+ return null;
+ }
+ var reader = stream.getReader();
+ var manyResult = reader.readMany();
+
+ async function processManyResult(result) {
+
+ if (result.done) {
+ return null;
+ }
+
+ var chunks = result.value || [];
+
+ while (true) {
+ var thisResult = await reader.read();
+
+ if (thisResult.done) {
+ return chunks;
+ }
+
+ chunks.push(thisResult.value);
+ }
+
+ return chunks;
+ };
+
+
+ if (manyResult && @isPromise(manyResult)) {
+ return manyResult.@then(processManyResult);
+ }
+
+ if (manyResult && manyResult.done) {
+ return null;
+ }
+
+ return processManyResult(manyResult);
+}
+
+@globalPrivate
+function readableStreamToText(stream) {
+ "use strict";
+
+ // TODO: optimize this to skip the extra ArrayBuffer
+ return globalThis.Bun.readableStreamToArrayBuffer(stream).@then(function(arrayBuffer) {
+ return new globalThis.TextDecoder().decode(arrayBuffer);
+ });
+}
+
+@globalPrivate
+function readableStreamToJSON(stream) {
+ "use strict";
+
+ // TODO: optimize this to skip the extra ArrayBuffer
+ return globalThis.Bun.readableStreamToArrayBuffer(stream).@then(function(arrayBuffer) {
+ return globalThis.JSON.parse(new globalThis.TextDecoder().decode(arrayBuffer));
+ });
+}
+
+@globalPrivate
+function readableStreamToBlob(stream) {
+ "use strict";
+
+
+ const array = @readableStreamToArray(stream);
+ if (array === null) {
+ return new globalThis.Blob();
+ }
+
+ return array.@then(function(chunks) {
+ if (chunks === null || chunks.length === 0) {
+ return new globalThis.Blob();
+ }
+
+ return new globalThis.Blob(chunks);
+ });
+}
+
+@globalPrivate
+function readableStreamToArrayPublic(stream) {
+ "use strict";
+
+ if (@getByIdDirectPrivate(stream, "state") === @streamClosed) {
+ return [];
+ }
+ var reader = stream.getReader();
+
+ var manyResult = reader.readMany();
+
+ var processManyResult = (0, (async function(result) {
+ if (result.done) {
+ return [];
+ }
+
+ var chunks = result.value || [];
+
+ while (true) {
+ var thisResult = await reader.read();
+ if (thisResult.done) {
+ return chunks;
+ }
+
+ chunks.push(thisResult.value);
+ }
+
+ return chunks;
+ }));
+
+
+ if (manyResult && @isPromise(manyResult)) {
+ return manyResult.then(processManyResult);
+ }
+
+ if (manyResult && manyResult.done) {
+ return [];
+ }
+
+ return processManyResult(manyResult);
+}
+
+
+
+@globalPrivate
+function consumeReadableStream(nativePtr, nativeType, inputStream) {
+ "use strict";
+ const symbol = globalThis.Symbol.for("Bun.consumeReadableStreamPrototype");
+ var cached = globalThis[symbol];
+ if (!cached) {
+ cached = globalThis[symbol] = [];
+ }
+ var Prototype = cached[nativeType];
+ if (Prototype === @undefined) {
+ var [doRead, doError, doReadMany, doClose, onClose, deinit] = globalThis[globalThis.Symbol.for("Bun.lazy")](nativeType);
+
+ Prototype = class NativeReadableStreamSink {
+ constructor(reader, ptr) {
+ this.#ptr = ptr;
+ this.#reader = reader;
+ this.#didClose = false;
+
+ this.handleError = this._handleError.bind(this);
+ this.handleClosed = this._handleClosed.bind(this);
+ this.processResult = this._processResult.bind(this);
+
+ reader.closed.then(this.handleClosed, this.handleError);
+ }
+
+ handleError;
+ handleClosed;
+ _handleClosed() {
+ if (this.#didClose) return;
+ this.#didClose = true;
+ var ptr = this.#ptr;
+ this.#ptr = 0;
+ doClose(ptr);
+ deinit(ptr);
+ }
+
+ _handleError(error) {
+ if (this.#didClose) return;
+ this.#didClose = true;
+ var ptr = this.#ptr;
+ this.#ptr = 0;
+ doError(ptr, error);
+ deinit(ptr);
+ }
+
+ #ptr;
+ #didClose = false;
+ #reader;
+
+ _handleReadMany({value, done, size}) {
+ if (done) {
+ this.handleClosed();
+ return;
+ }
+
+ if (this.#didClose) return;
+
+
+ doReadMany(this.#ptr, value, done, size);
+ }
+
+
+ read() {
+ if (!this.#ptr) return @throwTypeError("ReadableStreamSink is already closed");
+
+ return this.processResult(this.#reader.read());
+
+ }
+
+ _processResult(result) {
+ if (result && @isPromise(result)) {
+ const flags = @getPromiseInternalField(result, @promiseFieldFlags);
+ if (flags & @promiseStateFulfilled) {
+ const fulfilledValue = @getPromiseInternalField(result, @promiseFieldReactionsOrResult);
+ if (fulfilledValue) {
+ result = fulfilledValue;
+ }
+ }
+ }
+
+ if (result && @isPromise(result)) {
+ result.then(this.processResult, this.handleError);
+ return null;
+ }
+
+ if (result.done) {
+ this.handleClosed();
+ return 0;
+ } else if (result.value) {
+ return result.value;
+ } else {
+ return -1;
+ }
+ }
+
+ readMany() {
+ if (!this.#ptr) return @throwTypeError("ReadableStreamSink is already closed");
+ return this.processResult(this.#reader.readMany());
+ }
+
+
+ };
+
+ const minlength = nativeType + 1;
+ if (cached.length < minlength) {
+ cached.length = minlength;
+ }
+ @putByValDirect(cached, nativeType, Prototype);
+ }
+
+ if (@isReadableStreamLocked(inputStream)) {
+ @throwTypeError("Cannot start reading from a locked stream");
+ }
+
+ return new Prototype(inputStream.getReader(), nativePtr);
+}
+
+@globalPrivate
+function createEmptyReadableStream() {
+ "use strict";
+
+ var stream = new @ReadableStream({
+ pull() {},
+ });
+ @readableStreamClose(stream);
+ return stream;
+}
+
+@globalPrivate
+function createNativeReadableStream(nativePtr, nativeType, autoAllocateChunkSize) {
+ "use strict";
+ return new @ReadableStream({
+ @lazy: true,
+ @bunNativeType: nativeType,
+ @bunNativePtr: nativePtr,
+ autoAllocateChunkSize: autoAllocateChunkSize,
+ });
+}
+
+function cancel(reason)
+{
+ "use strict";
+
+ if (!@isReadableStream(this))
+ return @Promise.@reject(@makeThisTypeError("ReadableStream", "cancel"));
+
+ if (@isReadableStreamLocked(this))
+ return @Promise.@reject(@makeTypeError("ReadableStream is locked"));
+
+ return @readableStreamCancel(this, reason);
+}
+
+function getReader(options)
+{
+ "use strict";
+
+ if (!@isReadableStream(this))
+ throw @makeThisTypeError("ReadableStream", "getReader");
+
+ const mode = @toDictionary(options, { }, "ReadableStream.getReader takes an object as first argument").mode;
+ if (mode === @undefined) {
+ var start_ = @getByIdDirectPrivate(this, "start");
+ if (start_) {
+ @putByIdDirectPrivate(this, "start", @undefined);
+ start_();
+ }
+
+ return new @ReadableStreamDefaultReader(this);
+ }
+ // String conversion is required by spec, hence double equals.
+ if (mode == 'byob') {
+ return new @ReadableStreamBYOBReader(this);
+ }
+
+
+ @throwTypeError("Invalid mode is specified");
+}
+
+function pipeThrough(streams, options)
+{
+ "use strict";
+
+ const transforms = streams;
+
+ const readable = transforms["readable"];
+ if (!@isReadableStream(readable))
+ throw @makeTypeError("readable should be ReadableStream");
+
+ const writable = transforms["writable"];
+ const internalWritable = @getInternalWritableStream(writable);
+ if (!@isWritableStream(internalWritable))
+ throw @makeTypeError("writable should be WritableStream");
+
+ let preventClose = false;
+ let preventAbort = false;
+ let preventCancel = false;
+ let signal;
+ if (!@isUndefinedOrNull(options)) {
+ if (!@isObject(options))
+ throw @makeTypeError("options must be an object");
+
+ preventAbort = !!options["preventAbort"];
+ preventCancel = !!options["preventCancel"];
+ preventClose = !!options["preventClose"];
+
+ signal = options["signal"];
+ if (signal !== @undefined && !@isAbortSignal(signal))
+ throw @makeTypeError("options.signal must be AbortSignal");
+ }
+
+ if (!@isReadableStream(this))
+ throw @makeThisTypeError("ReadableStream", "pipeThrough");
+
+ if (@isReadableStreamLocked(this))
+ throw @makeTypeError("ReadableStream is locked");
+
+ if (@isWritableStreamLocked(internalWritable))
+ throw @makeTypeError("WritableStream is locked");
+
+ @readableStreamPipeToWritableStream(this, internalWritable, preventClose, preventAbort, preventCancel, signal);
+
+ return readable;
+}
+
+function pipeTo(destination)
+{
+ "use strict";
+
+ // FIXME: https://bugs.webkit.org/show_bug.cgi?id=159869.
+ // Built-in generator should be able to parse function signature to compute the function length correctly.
+ let options = arguments[1];
+
+ let preventClose = false;
+ let preventAbort = false;
+ let preventCancel = false;
+ let signal;
+ if (!@isUndefinedOrNull(options)) {
+ if (!@isObject(options))
+ return @Promise.@reject(@makeTypeError("options must be an object"));
+
+ try {
+ preventAbort = !!options["preventAbort"];
+ preventCancel = !!options["preventCancel"];
+ preventClose = !!options["preventClose"];
+
+ signal = options["signal"];
+ } catch(e) {
+ return @Promise.@reject(e);
+ }
+
+ if (signal !== @undefined && !@isAbortSignal(signal))
+ return @Promise.@reject(@makeTypeError("options.signal must be AbortSignal"));
+ }
+
+ const internalDestination = @getInternalWritableStream(destination);
+ if (!@isWritableStream(internalDestination))
+ return @Promise.@reject(@makeTypeError("ReadableStream pipeTo requires a WritableStream"));
+
+ if (!@isReadableStream(this))
+ return @Promise.@reject(@makeThisTypeError("ReadableStream", "pipeTo"));
+
+ if (@isReadableStreamLocked(this))
+ return @Promise.@reject(@makeTypeError("ReadableStream is locked"));
+
+ if (@isWritableStreamLocked(internalDestination))
+ return @Promise.@reject(@makeTypeError("WritableStream is locked"));
+
+ return @readableStreamPipeToWritableStream(this, internalDestination, preventClose, preventAbort, preventCancel, signal);
+}
+
+function tee()
+{
+ "use strict";
+
+ if (!@isReadableStream(this))
+ throw @makeThisTypeError("ReadableStream", "tee");
+
+ return @readableStreamTee(this, false);
+}
+
+@getter
+function locked()
+{
+ "use strict";
+
+ if (!@isReadableStream(this))
+ throw @makeGetterTypeError("ReadableStream", "locked");
+
+ return @isReadableStreamLocked(this);
+}
diff --git a/src/bun.js/builtins/js/ReadableStreamBYOBReader.js b/src/bun.js/builtins/js/ReadableStreamBYOBReader.js
new file mode 100644
index 000000000..16e4ebce5
--- /dev/null
+++ b/src/bun.js/builtins/js/ReadableStreamBYOBReader.js
@@ -0,0 +1,102 @@
+/*
+ * Copyright (C) 2017 Canon Inc.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ * 1. Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY CANON INC. AND ITS CONTRIBUTORS "AS IS" AND ANY
+ * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+ * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+ * DISCLAIMED. IN NO EVENT SHALL CANON INC. AND ITS CONTRIBUTORS BE LIABLE FOR
+ * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+ * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
+ * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
+ * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
+ * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+function initializeReadableStreamBYOBReader(stream)
+{
+ "use strict";
+
+ if (!@isReadableStream(stream))
+ @throwTypeError("ReadableStreamBYOBReader needs a ReadableStream");
+ if (!@isReadableByteStreamController(@getByIdDirectPrivate(stream, "readableStreamController")))
+ @throwTypeError("ReadableStreamBYOBReader needs a ReadableByteStreamController");
+ if (@isReadableStreamLocked(stream))
+ @throwTypeError("ReadableStream is locked");
+
+ @readableStreamReaderGenericInitialize(this, stream);
+ @putByIdDirectPrivate(this, "readIntoRequests", @createFIFO());
+
+ return this;
+}
+
+function cancel(reason)
+{
+ "use strict";
+
+ if (!@isReadableStreamBYOBReader(this))
+ return @Promise.@reject(@makeThisTypeError("ReadableStreamBYOBReader", "cancel"));
+
+ if (!@getByIdDirectPrivate(this, "ownerReadableStream"))
+ return @Promise.@reject(@makeTypeError("cancel() called on a reader owned by no readable stream"));
+
+ return @readableStreamReaderGenericCancel(this, reason);
+}
+
+function read(view)
+{
+ "use strict";
+
+ if (!@isReadableStreamBYOBReader(this))
+ return @Promise.@reject(@makeThisTypeError("ReadableStreamBYOBReader", "read"));
+
+ if (!@getByIdDirectPrivate(this, "ownerReadableStream"))
+ return @Promise.@reject(@makeTypeError("read() called on a reader owned by no readable stream"));
+
+ if (!@isObject(view))
+ return @Promise.@reject(@makeTypeError("Provided view is not an object"));
+
+ if (!@ArrayBuffer.@isView(view))
+ return @Promise.@reject(@makeTypeError("Provided view is not an ArrayBufferView"));
+
+ if (view.byteLength === 0)
+ return @Promise.@reject(@makeTypeError("Provided view cannot have a 0 byteLength"));
+
+ return @readableStreamBYOBReaderRead(this, view);
+}
+
+function releaseLock()
+{
+ "use strict";
+
+ if (!@isReadableStreamBYOBReader(this))
+ throw @makeThisTypeError("ReadableStreamBYOBReader", "releaseLock");
+
+ if (!@getByIdDirectPrivate(this, "ownerReadableStream"))
+ return;
+
+ if (@getByIdDirectPrivate(this, "readIntoRequests")?.isNotEmpty())
+ @throwTypeError("There are still pending read requests, cannot release the lock");
+
+ @readableStreamReaderGenericRelease(this);
+}
+
+@getter
+function closed()
+{
+ "use strict";
+
+ if (!@isReadableStreamBYOBReader(this))
+ return @Promise.@reject(@makeGetterTypeError("ReadableStreamBYOBReader", "closed"));
+
+ return @getByIdDirectPrivate(this, "closedPromiseCapability").@promise;
+}
diff --git a/src/bun.js/builtins/js/ReadableStreamBYOBRequest.js b/src/bun.js/builtins/js/ReadableStreamBYOBRequest.js
new file mode 100644
index 000000000..d97667165
--- /dev/null
+++ b/src/bun.js/builtins/js/ReadableStreamBYOBRequest.js
@@ -0,0 +1,77 @@
+/*
+ * Copyright (C) 2017 Canon Inc.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ * 1. Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY APPLE INC. ``AS IS'' AND ANY
+ * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+ * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL APPLE INC. OR
+ * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
+ * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
+ * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
+ * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY
+ * OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+function initializeReadableStreamBYOBRequest(controller, view)
+{
+ "use strict";
+
+ if (arguments.length !== 3 && arguments[2] !== @isReadableStream)
+ @throwTypeError("ReadableStreamBYOBRequest constructor should not be called directly");
+
+ return @privateInitializeReadableStreamBYOBRequest.@call(this, controller, view);
+}
+
+function respond(bytesWritten)
+{
+ "use strict";
+
+ if (!@isReadableStreamBYOBRequest(this))
+ throw @makeThisTypeError("ReadableStreamBYOBRequest", "respond");
+
+ if (@getByIdDirectPrivate(this, "associatedReadableByteStreamController") === @undefined)
+ @throwTypeError("ReadableStreamBYOBRequest.associatedReadableByteStreamController is undefined");
+
+ return @readableByteStreamControllerRespond(@getByIdDirectPrivate(this, "associatedReadableByteStreamController"), bytesWritten);
+}
+
+function respondWithNewView(view)
+{
+ "use strict";
+
+ if (!@isReadableStreamBYOBRequest(this))
+ throw @makeThisTypeError("ReadableStreamBYOBRequest", "respond");
+
+ if (@getByIdDirectPrivate(this, "associatedReadableByteStreamController") === @undefined)
+ @throwTypeError("ReadableStreamBYOBRequest.associatedReadableByteStreamController is undefined");
+
+ if (!@isObject(view))
+ @throwTypeError("Provided view is not an object");
+
+ if (!@ArrayBuffer.@isView(view))
+ @throwTypeError("Provided view is not an ArrayBufferView");
+
+ return @readableByteStreamControllerRespondWithNewView(@getByIdDirectPrivate(this, "associatedReadableByteStreamController"), view);
+}
+
+@getter
+function view()
+{
+ "use strict";
+
+ if (!@isReadableStreamBYOBRequest(this))
+ throw @makeGetterTypeError("ReadableStreamBYOBRequest", "view");
+
+ return @getByIdDirectPrivate(this, "view");
+}
diff --git a/src/bun.js/builtins/js/ReadableStreamDefaultController.js b/src/bun.js/builtins/js/ReadableStreamDefaultController.js
new file mode 100644
index 000000000..07fc65cb1
--- /dev/null
+++ b/src/bun.js/builtins/js/ReadableStreamDefaultController.js
@@ -0,0 +1,82 @@
+/*
+ * Copyright (C) 2015 Canon Inc.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ * 1. Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY APPLE INC. ``AS IS'' AND ANY
+ * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+ * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL APPLE INC. OR
+ * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
+ * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
+ * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
+ * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY
+ * OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+function initializeReadableStreamDefaultController(stream, underlyingSource, size, highWaterMark)
+{
+ "use strict";
+
+ if (arguments.length !== 5 && arguments[4] !== @isReadableStream)
+ @throwTypeError("ReadableStreamDefaultController constructor should not be called directly");
+
+ return @privateInitializeReadableStreamDefaultController.@call(this, stream, underlyingSource, size, highWaterMark);
+}
+
+function enqueue(chunk)
+{
+ "use strict";
+
+ if (!@isReadableStreamDefaultController(this))
+ throw @makeThisTypeError("ReadableStreamDefaultController", "enqueue");
+
+ if (!@readableStreamDefaultControllerCanCloseOrEnqueue(this))
+ @throwTypeError("ReadableStreamDefaultController is not in a state where chunk can be enqueued");
+
+ return @readableStreamDefaultControllerEnqueue(this, chunk);
+}
+
+function error(error)
+{
+ "use strict";
+
+ if (!@isReadableStreamDefaultController(this))
+ throw @makeThisTypeError("ReadableStreamDefaultController", "error");
+
+ @readableStreamDefaultControllerError(this, error);
+}
+
+function close()
+{
+ "use strict";
+
+ if (!@isReadableStreamDefaultController(this))
+ throw @makeThisTypeError("ReadableStreamDefaultController", "close");
+
+ if (!@readableStreamDefaultControllerCanCloseOrEnqueue(this))
+ @throwTypeError("ReadableStreamDefaultController is not in a state where it can be closed");
+
+ @readableStreamDefaultControllerClose(this);
+}
+
+@getter
+function desiredSize()
+{
+ "use strict";
+
+ if (!@isReadableStreamDefaultController(this))
+ throw @makeGetterTypeError("ReadableStreamDefaultController", "desiredSize");
+
+ return @readableStreamDefaultControllerGetDesiredSize(this);
+}
+
diff --git a/src/bun.js/builtins/js/ReadableStreamDefaultReader.js b/src/bun.js/builtins/js/ReadableStreamDefaultReader.js
new file mode 100644
index 000000000..774c7161e
--- /dev/null
+++ b/src/bun.js/builtins/js/ReadableStreamDefaultReader.js
@@ -0,0 +1,182 @@
+/*
+ * Copyright (C) 2015 Canon Inc.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ * 1. Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY APPLE INC. ``AS IS'' AND ANY
+ * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+ * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL APPLE INC. OR
+ * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
+ * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
+ * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
+ * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY
+ * OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+function initializeReadableStreamDefaultReader(stream)
+{
+ "use strict";
+
+ if (!@isReadableStream(stream))
+ @throwTypeError("ReadableStreamDefaultReader needs a ReadableStream");
+ if (@isReadableStreamLocked(stream))
+ @throwTypeError("ReadableStream is locked");
+
+ @readableStreamReaderGenericInitialize(this, stream);
+ @putByIdDirectPrivate(this, "readRequests", @createFIFO());
+
+ return this;
+}
+
+function cancel(reason)
+{
+ "use strict";
+
+ if (!@isReadableStreamDefaultReader(this))
+ return @Promise.@reject(@makeThisTypeError("ReadableStreamDefaultReader", "cancel"));
+
+ if (!@getByIdDirectPrivate(this, "ownerReadableStream"))
+ return @Promise.@reject(@makeTypeError("cancel() called on a reader owned by no readable stream"));
+
+ return @readableStreamReaderGenericCancel(this, reason);
+}
+
+function readMany()
+{
+ "use strict";
+
+ if (!@isReadableStreamDefaultReader(this))
+ @throwTypeError("ReadableStreamDefaultReader.readMany() should not be called directly");
+
+ const stream = @getByIdDirectPrivate(this, "ownerReadableStream");
+ if (!stream)
+ @throwTypeError("readMany() called on a reader owned by no readable stream");
+
+ const state = @getByIdDirectPrivate(stream, "state");
+ @putByIdDirectPrivate(stream, "disturbed", true);
+ if (state === @streamClosed)
+ return {value: [], size: 0, done: true};
+ else if (state === @streamErrored) {
+ throw @getByIdDirectPrivate(stream, "storedError");
+ }
+
+
+ var controller = @getByIdDirectPrivate(stream, "readableStreamController");
+
+ const content = @getByIdDirectPrivate(controller, "queue").content;
+ var size = @getByIdDirectPrivate(controller, "queue").size;
+ var values = content.toArray(false);
+ var length = values.length;
+
+ if (length > 0) {
+
+ if (@isReadableByteStreamController(controller)) {
+ for (var i = 0; i < value.length; i++) {
+ const buf = value[i];
+ if (!(@ArrayBuffer.@isView(buf) || buf instanceof @ArrayBuffer)) {
+ value[i] = new @Uint8Array(buf.buffer, buf.byteOffset, buf.byteLength);
+ }
+ }
+ }
+
+ @resetQueue(@getByIdDirectPrivate(controller, "queue"));
+
+ if (@getByIdDirectPrivate(controller, "closeRequested"))
+ @readableStreamClose(@getByIdDirectPrivate(controller, "controlledReadableStream"));
+ else if (@isReadableStreamDefaultController(controller))
+ @readableStreamDefaultControllerCallPullIfNeeded(controller);
+ else if (@isReadableByteStreamController(controller))
+ @readableByteStreamControllerCallPullIfNeeded(controller);
+
+ return {value: values, size, done: false};
+ }
+
+ var onPullMany = (result) => {
+ if (result.done) {
+ return {value: [], size: 0, done: true};
+ }
+ var controller = @getByIdDirectPrivate(stream, "readableStreamController");
+
+ var queue = @getByIdDirectPrivate(controller, "queue");
+ var value = [result.value].concat(queue.content.toArray(false));
+
+ if (@isReadableByteStreamController(controller)) {
+ for (var i = 0; i < value.length; i++) {
+ const buf = value[i];
+ if (!(@ArrayBuffer.@isView(buf) || buf instanceof @ArrayBuffer)) {
+ value[i] = new @Uint8Array(buf.buffer, buf.byteOffset, buf.byteLength);
+ }
+ }
+ }
+
+ var size = queue.size;
+ @resetQueue(queue);
+
+ if (@getByIdDirectPrivate(controller, "closeRequested"))
+ @readableStreamClose(@getByIdDirectPrivate(controller, "controlledReadableStream"));
+ else if (@isReadableStreamDefaultController(controller))
+ @readableStreamDefaultControllerCallPullIfNeeded(controller);
+ else if (@isReadableByteStreamController(controller))
+ @readableByteStreamControllerCallPullIfNeeded(controller);
+
+
+
+ return {value: value, size: size, done: false};
+ };
+
+ var pullResult = controller.@pull(controller);
+ if (pullResult && @isPromise(pullResult)) {
+ return pullResult.@then(onPullMany);
+ }
+
+ return onPullMany(pullResult);
+}
+
+function read()
+{
+ "use strict";
+
+ if (!@isReadableStreamDefaultReader(this))
+ return @Promise.@reject(@makeThisTypeError("ReadableStreamDefaultReader", "read"));
+ if (!@getByIdDirectPrivate(this, "ownerReadableStream"))
+ return @Promise.@reject(@makeTypeError("read() called on a reader owned by no readable stream"));
+
+ return @readableStreamDefaultReaderRead(this);
+}
+
+function releaseLock()
+{
+ "use strict";
+
+ if (!@isReadableStreamDefaultReader(this))
+ throw @makeThisTypeError("ReadableStreamDefaultReader", "releaseLock");
+
+ if (!@getByIdDirectPrivate(this, "ownerReadableStream"))
+ return;
+
+ if (@getByIdDirectPrivate(this, "readRequests")?.isNotEmpty())
+ @throwTypeError("There are still pending read requests, cannot release the lock");
+
+ @readableStreamReaderGenericRelease(this);
+}
+
+@getter
+function closed()
+{
+ "use strict";
+
+ if (!@isReadableStreamDefaultReader(this))
+ return @Promise.@reject(@makeGetterTypeError("ReadableStreamDefaultReader", "closed"));
+
+ return @getByIdDirectPrivate(this, "closedPromiseCapability").@promise;
+}
diff --git a/src/bun.js/builtins/js/ReadableStreamInternals.js b/src/bun.js/builtins/js/ReadableStreamInternals.js
new file mode 100644
index 000000000..3e6590f31
--- /dev/null
+++ b/src/bun.js/builtins/js/ReadableStreamInternals.js
@@ -0,0 +1,1255 @@
+/*
+ * Copyright (C) 2015 Canon Inc. All rights reserved.
+ * Copyright (C) 2015 Igalia.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ * 1. Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY APPLE INC. ``AS IS'' AND ANY
+ * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+ * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL APPLE INC. OR
+ * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
+ * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
+ * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
+ * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY
+ * OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+// @internal
+
+function readableStreamReaderGenericInitialize(reader, stream)
+{
+ "use strict";
+
+ @putByIdDirectPrivate(reader, "ownerReadableStream", stream);
+ @putByIdDirectPrivate(stream, "reader", reader);
+ if (@getByIdDirectPrivate(stream, "state") === @streamReadable)
+ @putByIdDirectPrivate(reader, "closedPromiseCapability", @newPromiseCapability(@Promise));
+ else if (@getByIdDirectPrivate(stream, "state") === @streamClosed)
+ @putByIdDirectPrivate(reader, "closedPromiseCapability", { @promise: @Promise.@resolve() });
+ else {
+ @assert(@getByIdDirectPrivate(stream, "state") === @streamErrored);
+ @putByIdDirectPrivate(reader, "closedPromiseCapability", { @promise: @newHandledRejectedPromise(@getByIdDirectPrivate(stream, "storedError")) });
+ }
+}
+
+function privateInitializeReadableStreamDefaultController(stream, underlyingSource, size, highWaterMark)
+{
+ "use strict";
+
+ if (!@isReadableStream(stream))
+ @throwTypeError("ReadableStreamDefaultController needs a ReadableStream");
+
+ // readableStreamController is initialized with null value.
+ if (@getByIdDirectPrivate(stream, "readableStreamController") !== null)
+ @throwTypeError("ReadableStream already has a controller");
+
+
+
+ @putByIdDirectPrivate(this, "controlledReadableStream", stream);
+ @putByIdDirectPrivate(this, "underlyingSource", underlyingSource);
+ @putByIdDirectPrivate(this, "queue", @newQueue());
+ @putByIdDirectPrivate(this, "started", -1);
+ @putByIdDirectPrivate(this, "closeRequested", false);
+ @putByIdDirectPrivate(this, "pullAgain", false);
+ @putByIdDirectPrivate(this, "pulling", false);
+ @putByIdDirectPrivate(this, "strategy", @validateAndNormalizeQueuingStrategy(size, highWaterMark));
+
+ return this;
+}
+
+function readableStreamDefaultControllerError(controller, error)
+{
+ "use strict";
+
+ const stream = @getByIdDirectPrivate(controller, "controlledReadableStream");
+ if (@getByIdDirectPrivate(stream, "state") !== @streamReadable)
+ return;
+ @putByIdDirectPrivate(controller, "queue", @newQueue());
+
+ @readableStreamError(stream, error);
+}
+
+function readableStreamPipeTo(stream, sink)
+{
+ "use strict";
+ @assert(@isReadableStream(stream));
+
+ const reader = new @ReadableStreamDefaultReader(stream);
+
+ @getByIdDirectPrivate(reader, "closedPromiseCapability").@promise.@then(() => { }, (e) => { sink.error(e); });
+
+ function doPipe() {
+ @readableStreamDefaultReaderRead(reader).@then(function(result) {
+ if (result.done) {
+ sink.close();
+ return;
+ }
+ try {
+ sink.enqueue(result.value);
+ } catch (e) {
+ sink.error("ReadableStream chunk enqueueing in the sink failed");
+ return;
+ }
+ doPipe();
+ }, function(e) {
+ sink.error(e);
+ });
+ }
+ doPipe();
+}
+
+
+
+function acquireReadableStreamDefaultReader(stream)
+{
+ "use strict";
+ var start = @getByIdDirectPrivate(stream, "start");
+ if (start) {
+ start.@call(stream);
+ }
+
+ return new @ReadableStreamDefaultReader(stream);
+}
+
+// https://streams.spec.whatwg.org/#set-up-readable-stream-default-controller, starting from step 6.
+// The other part is implemented in privateInitializeReadableStreamDefaultController.
+function setupReadableStreamDefaultController(stream, underlyingSource, size, highWaterMark, startMethod, pullMethod, cancelMethod)
+{
+ "use strict";
+
+ const controller = new @ReadableStreamDefaultController(stream, underlyingSource, size, highWaterMark, @isReadableStream);
+
+ const pullAlgorithm = () => @promiseInvokeOrNoopMethod(underlyingSource, pullMethod, [controller]);
+ const cancelAlgorithm = (reason) => @promiseInvokeOrNoopMethod(underlyingSource, cancelMethod, [reason]);
+
+ @putByIdDirectPrivate(controller, "pullAlgorithm", pullAlgorithm);
+ @putByIdDirectPrivate(controller, "cancelAlgorithm", cancelAlgorithm);
+ @putByIdDirectPrivate(controller, "pull", @readableStreamDefaultControllerPull);
+ @putByIdDirectPrivate(controller, "cancel", @readableStreamDefaultControllerCancel);
+ @putByIdDirectPrivate(stream, "readableStreamController", controller);
+
+ @readableStreamDefaultControllerStart(controller);
+
+}
+
+
+function createReadableStreamController(stream, underlyingSource, strategy) {
+ "use strict";
+
+ const type = underlyingSource.type;
+ const typeString = @toString(type);
+
+ if (typeString === "bytes") {
+ // if (!@readableByteStreamAPIEnabled())
+ // @throwTypeError("ReadableByteStreamController is not implemented");
+
+ if (strategy.highWaterMark === @undefined)
+ strategy.highWaterMark = 0;
+ if (strategy.size !== @undefined)
+ @throwRangeError("Strategy for a ReadableByteStreamController cannot have a size");
+
+ @putByIdDirectPrivate(stream, "readableStreamController", new @ReadableByteStreamController(stream, underlyingSource, strategy.highWaterMark, @isReadableStream));
+ } else if (typeString === "direct") {
+ var highWaterMark = strategy?.highWaterMark;
+ @initializeArrayBufferStream.@call(stream, underlyingSource, highWaterMark);
+ } else if (type === @undefined) {
+ if (strategy.highWaterMark === @undefined)
+ strategy.highWaterMark = 1;
+
+ @setupReadableStreamDefaultController(stream, underlyingSource, strategy.size, strategy.highWaterMark, underlyingSource.start, underlyingSource.pull, underlyingSource.cancel);
+ } else
+ @throwRangeError("Invalid type for underlying source");
+
+}
+
+function readableStreamDefaultControllerStart(controller) {
+ "use strict";
+
+
+
+ if (@getByIdDirectPrivate(controller, "started") !== -1)
+ return;
+
+ const underlyingSource = @getByIdDirectPrivate(controller, "underlyingSource");
+ const startMethod = underlyingSource.start;
+ @putByIdDirectPrivate(controller, "started", 0);
+
+ @promiseInvokeOrNoopMethodNoCatch(underlyingSource, startMethod, [controller]).@then(() => {
+ @putByIdDirectPrivate(controller, "started", 1);
+ @assert(!@getByIdDirectPrivate(controller, "pulling"));
+ @assert(!@getByIdDirectPrivate(controller, "pullAgain"));
+ @readableStreamDefaultControllerCallPullIfNeeded(controller);
+ }, (error) => {
+ @readableStreamDefaultControllerError(controller, error);
+ });
+}
+
+
+// FIXME: Replace readableStreamPipeTo by below function.
+// This method implements the latest https://streams.spec.whatwg.org/#readable-stream-pipe-to.
+function readableStreamPipeToWritableStream(source, destination, preventClose, preventAbort, preventCancel, signal)
+{
+ "use strict";
+
+ @assert(@isReadableStream(source));
+ @assert(@isWritableStream(destination));
+ @assert(!@isReadableStreamLocked(source));
+ @assert(!@isWritableStreamLocked(destination));
+ @assert(signal === @undefined || @isAbortSignal(signal));
+
+ if (@getByIdDirectPrivate(source, "underlyingByteSource") !== @undefined)
+ return @Promise.@reject("Piping to a readable bytestream is not supported");
+
+ let pipeState = { source : source, destination : destination, preventAbort : preventAbort, preventCancel : preventCancel, preventClose : preventClose, signal : signal };
+
+ pipeState.reader = @acquireReadableStreamDefaultReader(source);
+ pipeState.writer = @acquireWritableStreamDefaultWriter(destination);
+
+ @putByIdDirectPrivate(source, "disturbed", true);
+
+ pipeState.finalized = false;
+ pipeState.shuttingDown = false;
+ pipeState.promiseCapability = @newPromiseCapability(@Promise);
+ pipeState.pendingReadPromiseCapability = @newPromiseCapability(@Promise);
+ pipeState.pendingReadPromiseCapability.@resolve.@call();
+ pipeState.pendingWritePromise = @Promise.@resolve();
+
+ if (signal !== @undefined) {
+ const algorithm = () => {
+ if (pipeState.finalized)
+ return;
+
+ const error = @makeDOMException("AbortError", "abort pipeTo from signal");
+
+ @pipeToShutdownWithAction(pipeState, () => {
+ const shouldAbortDestination = !pipeState.preventAbort && @getByIdDirectPrivate(pipeState.destination, "state") === "writable";
+ const promiseDestination = shouldAbortDestination ? @writableStreamAbort(pipeState.destination, error) : @Promise.@resolve();
+
+ const shouldAbortSource = !pipeState.preventCancel && @getByIdDirectPrivate(pipeState.source, "state") === @streamReadable;
+ const promiseSource = shouldAbortSource ? @readableStreamCancel(pipeState.source, error) : @Promise.@resolve();
+
+ let promiseCapability = @newPromiseCapability(@Promise);
+ let shouldWait = true;
+ let handleResolvedPromise = () => {
+ if (shouldWait) {
+ shouldWait = false;
+ return;
+ }
+ promiseCapability.@resolve.@call();
+ }
+ let handleRejectedPromise = (e) => {
+ promiseCapability.@reject.@call(@undefined, e);
+ }
+ promiseDestination.@then(handleResolvedPromise, handleRejectedPromise);
+ promiseSource.@then(handleResolvedPromise, handleRejectedPromise);
+ return promiseCapability.@promise;
+ }, error);
+ };
+ if (@whenSignalAborted(signal, algorithm))
+ return pipeState.promiseCapability.@promise;
+ }
+
+ @pipeToErrorsMustBePropagatedForward(pipeState);
+ @pipeToErrorsMustBePropagatedBackward(pipeState);
+ @pipeToClosingMustBePropagatedForward(pipeState);
+ @pipeToClosingMustBePropagatedBackward(pipeState);
+
+ @pipeToLoop(pipeState);
+
+ return pipeState.promiseCapability.@promise;
+}
+
+function pipeToLoop(pipeState)
+{
+ "use strict";
+ if (pipeState.shuttingDown)
+ return;
+
+ @pipeToDoReadWrite(pipeState).@then((result) => {
+ if (result)
+ @pipeToLoop(pipeState);
+ });
+}
+
+function pipeToDoReadWrite(pipeState)
+{
+ "use strict";
+ @assert(!pipeState.shuttingDown);
+
+ pipeState.pendingReadPromiseCapability = @newPromiseCapability(@Promise);
+ @getByIdDirectPrivate(pipeState.writer, "readyPromise").@promise.@then(() => {
+ if (pipeState.shuttingDown) {
+ pipeState.pendingReadPromiseCapability.@resolve.@call(@undefined, false);
+ return;
+ }
+
+ @readableStreamDefaultReaderRead(pipeState.reader).@then((result) => {
+ const canWrite = !result.done && @getByIdDirectPrivate(pipeState.writer, "stream") !== @undefined;
+ pipeState.pendingReadPromiseCapability.@resolve.@call(@undefined, canWrite);
+ if (!canWrite)
+ return;
+
+ pipeState.pendingWritePromise = @writableStreamDefaultWriterWrite(pipeState.writer, result.value);
+ }, (e) => {
+ pipeState.pendingReadPromiseCapability.@resolve.@call(@undefined, false);
+ });
+ }, (e) => {
+ pipeState.pendingReadPromiseCapability.@resolve.@call(@undefined, false);
+ });
+ return pipeState.pendingReadPromiseCapability.@promise;
+}
+
+function pipeToErrorsMustBePropagatedForward(pipeState)
+{
+ "use strict";
+
+ const action = () => {
+ pipeState.pendingReadPromiseCapability.@resolve.@call(@undefined, false);
+ const error = @getByIdDirectPrivate(pipeState.source, "storedError");
+ if (!pipeState.preventAbort) {
+ @pipeToShutdownWithAction(pipeState, () => @writableStreamAbort(pipeState.destination, error), error);
+ return;
+ }
+ @pipeToShutdown(pipeState, error);
+ };
+
+ if (@getByIdDirectPrivate(pipeState.source, "state") === @streamErrored) {
+ action();
+ return;
+ }
+
+ @getByIdDirectPrivate(pipeState.reader, "closedPromiseCapability").@promise.@then(@undefined, action);
+}
+
+function pipeToErrorsMustBePropagatedBackward(pipeState)
+{
+ "use strict";
+ const action = () => {
+ const error = @getByIdDirectPrivate(pipeState.destination, "storedError");
+ if (!pipeState.preventCancel) {
+ @pipeToShutdownWithAction(pipeState, () => @readableStreamCancel(pipeState.source, error), error);
+ return;
+ }
+ @pipeToShutdown(pipeState, error);
+ };
+ if (@getByIdDirectPrivate(pipeState.destination, "state") === "errored") {
+ action();
+ return;
+ }
+ @getByIdDirectPrivate(pipeState.writer, "closedPromise").@promise.@then(@undefined, action);
+}
+
+function pipeToClosingMustBePropagatedForward(pipeState)
+{
+ "use strict";
+ const action = () => {
+ pipeState.pendingReadPromiseCapability.@resolve.@call(@undefined, false);
+ const error = @getByIdDirectPrivate(pipeState.source, "storedError");
+ if (!pipeState.preventClose) {
+ @pipeToShutdownWithAction(pipeState, () => @writableStreamDefaultWriterCloseWithErrorPropagation(pipeState.writer));
+ return;
+ }
+ @pipeToShutdown(pipeState);
+ };
+ if (@getByIdDirectPrivate(pipeState.source, "state") === @streamClosed) {
+ action();
+ return;
+ }
+ @getByIdDirectPrivate(pipeState.reader, "closedPromiseCapability").@promise.@then(action, @undefined);
+}
+
+function pipeToClosingMustBePropagatedBackward(pipeState)
+{
+ "use strict";
+ if (!@writableStreamCloseQueuedOrInFlight(pipeState.destination) && @getByIdDirectPrivate(pipeState.destination, "state") !== "closed")
+ return;
+
+ // @assert no chunks have been read/written
+
+ const error = @makeTypeError("closing is propagated backward");
+ if (!pipeState.preventCancel) {
+ @pipeToShutdownWithAction(pipeState, () => @readableStreamCancel(pipeState.source, error), error);
+ return;
+ }
+ @pipeToShutdown(pipeState, error);
+}
+
+function pipeToShutdownWithAction(pipeState, action)
+{
+ "use strict";
+
+ if (pipeState.shuttingDown)
+ return;
+
+ pipeState.shuttingDown = true;
+
+ const hasError = arguments.length > 2;
+ const error = arguments[2];
+ const finalize = () => {
+ const promise = action();
+ promise.@then(() => {
+ if (hasError)
+ @pipeToFinalize(pipeState, error);
+ else
+ @pipeToFinalize(pipeState);
+ }, (e) => {
+ @pipeToFinalize(pipeState, e);
+ });
+ };
+
+ if (@getByIdDirectPrivate(pipeState.destination, "state") === "writable" && !@writableStreamCloseQueuedOrInFlight(pipeState.destination)) {
+ pipeState.pendingReadPromiseCapability.@promise.@then(() => {
+ pipeState.pendingWritePromise.@then(finalize, finalize);
+ }, (e) => @pipeToFinalize(pipeState, e));
+ return;
+ }
+
+ finalize();
+}
+
+function pipeToShutdown(pipeState)
+{
+ "use strict";
+
+ if (pipeState.shuttingDown)
+ return;
+
+ pipeState.shuttingDown = true;
+
+ const hasError = arguments.length > 1;
+ const error = arguments[1];
+ const finalize = () => {
+ if (hasError)
+ @pipeToFinalize(pipeState, error);
+ else
+ @pipeToFinalize(pipeState);
+ };
+
+ if (@getByIdDirectPrivate(pipeState.destination, "state") === "writable" && !@writableStreamCloseQueuedOrInFlight(pipeState.destination)) {
+ pipeState.pendingReadPromiseCapability.@promise.@then(() => {
+ pipeState.pendingWritePromise.@then(finalize, finalize);
+ }, (e) => @pipeToFinalize(pipeState, e));
+ return;
+ }
+ finalize();
+}
+
+function pipeToFinalize(pipeState)
+{
+ "use strict";
+
+ @writableStreamDefaultWriterRelease(pipeState.writer);
+ @readableStreamReaderGenericRelease(pipeState.reader);
+
+ // Instead of removing the abort algorithm as per spec, we make it a no-op which is equivalent.
+ pipeState.finalized = true;
+
+ if (arguments.length > 1)
+ pipeState.promiseCapability.@reject.@call(@undefined, arguments[1]);
+ else
+ pipeState.promiseCapability.@resolve.@call();
+}
+
+function readableStreamTee(stream, shouldClone)
+{
+ "use strict";
+
+ @assert(@isReadableStream(stream));
+ @assert(typeof(shouldClone) === "boolean");
+
+ const reader = new @ReadableStreamDefaultReader(stream);
+
+ const teeState = {
+ closedOrErrored: false,
+ canceled1: false,
+ canceled2: false,
+ reason1: @undefined,
+ reason2: @undefined,
+ };
+
+ teeState.cancelPromiseCapability = @newPromiseCapability(@Promise);
+
+ const pullFunction = @readableStreamTeePullFunction(teeState, reader, shouldClone);
+
+ const branch1Source = { };
+ @putByIdDirectPrivate(branch1Source, "pull", pullFunction);
+ @putByIdDirectPrivate(branch1Source, "cancel", @readableStreamTeeBranch1CancelFunction(teeState, stream));
+
+ const branch2Source = { };
+ @putByIdDirectPrivate(branch2Source, "pull", pullFunction);
+ @putByIdDirectPrivate(branch2Source, "cancel", @readableStreamTeeBranch2CancelFunction(teeState, stream));
+
+ const branch1 = new @ReadableStream(branch1Source);
+ const branch2 = new @ReadableStream(branch2Source);
+
+ @getByIdDirectPrivate(reader, "closedPromiseCapability").@promise.@then(@undefined, function(e) {
+ if (teeState.closedOrErrored)
+ return;
+ @readableStreamDefaultControllerError(branch1.@readableStreamController, e);
+ @readableStreamDefaultControllerError(branch2.@readableStreamController, e);
+ teeState.closedOrErrored = true;
+ if (!teeState.canceled1 || !teeState.canceled2)
+ teeState.cancelPromiseCapability.@resolve.@call();
+ });
+
+ // Additional fields compared to the spec, as they are needed within pull/cancel functions.
+ teeState.branch1 = branch1;
+ teeState.branch2 = branch2;
+
+ return [branch1, branch2];
+}
+
+function readableStreamTeePullFunction(teeState, reader, shouldClone)
+{
+ "use strict";
+
+ return function() {
+ @Promise.prototype.@then.@call(@readableStreamDefaultReaderRead(reader), function(result) {
+ @assert(@isObject(result));
+ @assert(typeof result.done === "boolean");
+ if (result.done && !teeState.closedOrErrored) {
+ if (!teeState.canceled1)
+ @readableStreamDefaultControllerClose(teeState.branch1.@readableStreamController);
+ if (!teeState.canceled2)
+ @readableStreamDefaultControllerClose(teeState.branch2.@readableStreamController);
+ teeState.closedOrErrored = true;
+ if (!teeState.canceled1 || !teeState.canceled2)
+ teeState.cancelPromiseCapability.@resolve.@call();
+ }
+ if (teeState.closedOrErrored)
+ return;
+ if (!teeState.canceled1)
+ @readableStreamDefaultControllerEnqueue(teeState.branch1.@readableStreamController, result.value);
+ if (!teeState.canceled2)
+ @readableStreamDefaultControllerEnqueue(teeState.branch2.@readableStreamController, shouldClone ? @structuredCloneForStream(result.value) : result.value);
+ });
+ }
+}
+
+function readableStreamTeeBranch1CancelFunction(teeState, stream)
+{
+ "use strict";
+
+ return function(r) {
+ teeState.canceled1 = true;
+ teeState.reason1 = r;
+ if (teeState.canceled2) {
+ @readableStreamCancel(stream, [teeState.reason1, teeState.reason2]).@then(
+ teeState.cancelPromiseCapability.@resolve,
+ teeState.cancelPromiseCapability.@reject);
+ }
+ return teeState.cancelPromiseCapability.@promise;
+ }
+}
+
+function readableStreamTeeBranch2CancelFunction(teeState, stream)
+{
+ "use strict";
+
+ return function(r) {
+ teeState.canceled2 = true;
+ teeState.reason2 = r;
+ if (teeState.canceled1) {
+ @readableStreamCancel(stream, [teeState.reason1, teeState.reason2]).@then(
+ teeState.cancelPromiseCapability.@resolve,
+ teeState.cancelPromiseCapability.@reject);
+ }
+ return teeState.cancelPromiseCapability.@promise;
+ }
+}
+
+function isReadableStream(stream)
+{
+ "use strict";
+
+ // Spec tells to return true only if stream has a readableStreamController internal slot.
+ // However, since it is a private slot, it cannot be checked using hasOwnProperty().
+ // Therefore, readableStreamController is initialized with null value.
+ return @isObject(stream) && @getByIdDirectPrivate(stream, "readableStreamController") !== @undefined;
+}
+
+function isReadableStreamDefaultReader(reader)
+{
+ "use strict";
+
+ // Spec tells to return true only if reader has a readRequests internal slot.
+ // However, since it is a private slot, it cannot be checked using hasOwnProperty().
+ // Since readRequests is initialized with an empty array, the following test is ok.
+ return @isObject(reader) && !!@getByIdDirectPrivate(reader, "readRequests");
+}
+
+function isReadableStreamDefaultController(controller)
+{
+ "use strict";
+
+ // Spec tells to return true only if controller has an underlyingSource internal slot.
+ // However, since it is a private slot, it cannot be checked using hasOwnProperty().
+ // underlyingSource is obtained in ReadableStream constructor: if undefined, it is set
+ // to an empty object. Therefore, following test is ok.
+ return @isObject(controller) && !!@getByIdDirectPrivate(controller, "underlyingSource");
+}
+
+
+@globalPrivate
+function assignDirectStream() {
+ "use strict";
+
+ var stream = this;
+}
+
+
+function handleDirectStreamError(e) {
+ "use strict";
+
+ var controller = this;
+ var sink = controller.@sink;
+ if (sink) {
+ @putByIdDirectPrivate(controller, "sink", @undefined);
+ try {
+ sink.close(e);
+ } catch (f) {}
+ }
+
+ this.error = this.drain = this.write = this.close = this.end = @onReadableStreamDirectControllerClosed;
+
+ if (typeof this.@underlyingSource.close === 'function') {
+ try {
+ this.@underlyingSource.close.@call(this.@underlyingSource, e);
+ } catch (e) {
+ }
+ }
+
+ try {
+ var pend = controller._pendingRead;
+ if (pend) {
+ controller._pendingRead = @undefined;
+ @rejectPromise(pend, e);
+ }
+ } catch (f) {}
+ var stream = controller.@controlledReadableStream;
+ if (stream) @readableStreamError(stream, e);
+}
+
+function handleDirectStreamErrorReject(e) {
+ @handleDirectStreamError.@call(this, e);
+ return @Promise.@reject(e);
+}
+
+function onPullDirectStream(controller)
+{
+
+ "use strict";
+
+ var stream = controller.@controlledReadableStream;
+ if (!stream || @getByIdDirectPrivate(stream, "state") !== @streamReadable)
+ return;
+
+ // pull is in progress
+ // this is a recursive call
+ // ignore it
+ if (controller._deferClose === -1) {
+ return;
+ }
+
+
+ controller._deferClose = -1;
+ controller._deferDrain = -1;
+ var deferClose;
+ var deferDrain;
+
+ // Direct streams allow @pull to be called multiple times, unlike the spec.
+ // Backpressure is handled by the destination, not by the underlying source.
+ // In this case, we rely on the heuristic that repeatedly draining in the same tick
+ // is bad for performance
+ // this code is only run when consuming a direct stream from JS
+ // without the HTTP server or anything else
+ try {
+ var result = controller.@underlyingSource.pull(
+ controller,
+ );
+
+ if (result && @isPromise(result)) {
+ if (controller._handleError === @undefined) {
+ controller._handleError = @handleDirectStreamErrorReject.bind(controller);
+ }
+
+ @Promise.prototype.catch.@call(result, controller._handleError);
+ }
+ } catch(e) {
+ return @handleDirectStreamErrorReject.@call(controller, e);
+ } finally {
+ deferClose = controller._deferClose;
+ deferDrain = controller._deferDrain;
+ controller._deferDrain = controller._deferClose = 0;
+ }
+
+
+ var promiseToReturn;
+
+
+ if (controller._pendingRead === @undefined) {
+ controller._pendingRead = promiseToReturn = @newPromise();
+ } else {
+ promiseToReturn = @readableStreamAddReadRequest(stream);
+ }
+
+
+ // they called close during @pull()
+ // we delay that
+ if (deferClose === 1) {
+ var reason = controller._deferCloseReason;
+ controller._deferCloseReason = @undefined;
+ @onCloseDirectStream.@call(controller, reason);
+ return promiseToReturn;
+ }
+
+ // not done, but they called drain()
+ if (deferDrain === 1) {
+ @onDrainDirectStream.@call(controller);
+ }
+
+
+ return promiseToReturn;
+}
+
+function noopDoneFunction() {
+ return @Promise.@resolve({value: @undefined, done: true});
+}
+
+function onReadableStreamDirectControllerClosed(reason)
+{
+ "use strict";
+ @throwTypeError("ReadableStreamDirectController is now closed");
+}
+
+function onCloseDirectStream(reason)
+{
+ "use strict";
+ var stream = this.@controlledReadableStream;
+ if (!stream || @getByIdDirectPrivate(stream, "state") !== @streamReadable)
+ return;
+
+ if (this._deferClose !== 0) {
+ this._deferClose = 1;
+ this._deferCloseReason = reason;
+ return;
+ }
+
+ @putByIdDirectPrivate(stream, "state", @streamClosing);
+ if (typeof this.@underlyingSource.close === 'function') {
+ try {
+ this.@underlyingSource.close.@call(this.@underlyingSource, reason);
+ } catch (e) {
+
+ }
+ }
+
+ var drained;
+ try {
+ drained = this.@sink.end();
+ @putByIdDirectPrivate(this, "sink", @undefined);
+ } catch (e) {
+ if (this._pendingRead) {
+ var read = this._pendingRead;
+ this._pendingRead = @undefined;
+ @rejectPromise(read, e);
+ }
+ @readableStreamError(stream, e);
+ return;
+ }
+
+ this.error = this.drain = this.write = this.close = this.end = @onReadableStreamDirectControllerClosed;
+
+ var reader = @getByIdDirectPrivate(stream, "reader");
+
+ if (reader && @isReadableStreamDefaultReader(reader)) {
+ var _pendingRead = this._pendingRead;
+ if (_pendingRead && @isPromise(_pendingRead) && drained?.byteLength) {
+ this._pendingRead = @undefined;
+ @fulfillPromise(_pendingRead, {value: drained, done: false});
+ @readableStreamClose(stream);
+ return;
+ }
+ }
+
+ if (drained?.byteLength) {
+ var requests = @getByIdDirectPrivate(reader, "readRequests");
+ if (requests?.isNotEmpty()) {
+ @readableStreamFulfillReadRequest(stream, drained, false);
+ @readableStreamClose(stream);
+ return;
+ }
+
+ @putByIdDirectPrivate(stream, "state", @streamReadable);
+ this.@pull = () => {
+ var thisResult = @createFulfilledPromise({value: drained, done: false});
+ drained = @undefined;
+ @readableStreamClose(stream);
+ stream = @undefined;
+ return thisResult;
+ };
+ } else if (this._pendingRead) {
+ var read = this._pendingRead;
+ this._pendingRead = @undefined;
+ @putByIdDirectPrivate(this, "pull", @noopDoneFunction);
+ @fulfillPromise(read, {value: @undefined, done: true});
+ }
+
+ @readableStreamClose(stream);
+}
+
+function onDrainDirectStream()
+{
+ "use strict";
+
+ var stream = this.@controlledReadableStream;
+ var reader = @getByIdDirectPrivate(stream, "reader");
+ if (!reader || !@isReadableStreamDefaultReader(reader)) {
+ return;
+ }
+
+ var _pendingRead = this._pendingRead;
+ this._pendingRead = @undefined;
+ if (_pendingRead && @isPromise(_pendingRead)) {
+ var drained = this.@sink.drain();
+ if (drained?.byteLength) {
+ this._pendingRead = @getByIdDirectPrivate(stream, "readRequests")?.shift();
+ @fulfillPromise(_pendingRead, {value: drained, done: false});
+ } else {
+ this._pendingRead = _pendingRead;
+ }
+ } else if (@getByIdDirectPrivate(stream, "readRequests")?.isNotEmpty()) {
+ var drained = this.@sink.drain();
+ if (drained?.byteLength) {
+ @readableStreamFulfillReadRequest(stream, drained, false);
+ }
+ } else if (this._deferDrain === -1) {
+ this._deferDrain = 1;
+ }
+
+}
+
+function initializeArrayBufferStream(underlyingSource, highWaterMark)
+{
+ "use strict";
+
+ // This is the fallback implementation for direct streams
+ // When we don't know what the destination type is
+ // We assume it is a Uint8Array.
+
+ var opts = highWaterMark ? {highWaterMark, stream: true, asUint8Array: true} : {stream: true, asUint8Array: true};
+ var sink = new globalThis.Bun.ArrayBufferSink();
+ sink.start(opts);
+
+ var controller = {
+ @underlyingSource: underlyingSource,
+ @pull: @onPullDirectStream,
+ @controlledReadableStream: this,
+ @sink: sink,
+ close: @onCloseDirectStream,
+ write: sink.write.bind(sink),
+ error: @handleDirectStreamError,
+ end: @onCloseDirectStream,
+ @close: @onCloseDirectStream,
+ drain: @onDrainDirectStream,
+ _pendingRead: @undefined,
+ _deferClose: 0,
+ _deferDrain: 0,
+ _deferCloseReason: @undefined,
+ _handleError: @undefined,
+ };
+
+
+ @putByIdDirectPrivate(this, "readableStreamController", controller);
+
+}
+
+function readableStreamError(stream, error)
+{
+ "use strict";
+
+ @assert(@isReadableStream(stream));
+ @assert(@getByIdDirectPrivate(stream, "state") === @streamReadable);
+ @putByIdDirectPrivate(stream, "state", @streamErrored);
+ @putByIdDirectPrivate(stream, "storedError", error);
+
+ const reader = @getByIdDirectPrivate(stream, "reader");
+
+ if (!reader)
+ return;
+
+ if (@isReadableStreamDefaultReader(reader)) {
+ const requests = @getByIdDirectPrivate(reader, "readRequests");
+ @putByIdDirectPrivate(reader, "readRequests", @createFIFO());
+ for (var request = requests.shift(); request; request = requests.shift())
+ @rejectPromise(request, error);
+ } else {
+ @assert(@isReadableStreamBYOBReader(reader));
+ const requests = @getByIdDirectPrivate(reader, "readIntoRequests");
+ @putByIdDirectPrivate(reader, "readIntoRequests", @createFIFO());
+ for (var request = requests.shift(); request; request = requests.shift())
+ @rejectPromise(request, error);
+ }
+
+ @getByIdDirectPrivate(reader, "closedPromiseCapability").@reject.@call(@undefined, error);
+ const promise = @getByIdDirectPrivate(reader, "closedPromiseCapability").@promise;
+ @markPromiseAsHandled(promise);
+}
+
+function readableStreamDefaultControllerShouldCallPull(controller)
+{
+ "use strict";
+
+ const stream = @getByIdDirectPrivate(controller, "controlledReadableStream");
+
+ if (!@readableStreamDefaultControllerCanCloseOrEnqueue(controller))
+ return false;
+ if (!(@getByIdDirectPrivate(controller, "started") === 1))
+ return false;
+ if ((!@isReadableStreamLocked(stream) || !@getByIdDirectPrivate(@getByIdDirectPrivate(stream, "reader"), "readRequests")?.isNotEmpty()) && @readableStreamDefaultControllerGetDesiredSize(controller) <= 0)
+ return false;
+ const desiredSize = @readableStreamDefaultControllerGetDesiredSize(controller);
+ @assert(desiredSize !== null);
+ return desiredSize > 0;
+}
+
+function readableStreamDefaultControllerCallPullIfNeeded(controller)
+{
+ "use strict";
+
+ // FIXME: use @readableStreamDefaultControllerShouldCallPull
+ const stream = @getByIdDirectPrivate(controller, "controlledReadableStream");
+
+ if (!@readableStreamDefaultControllerCanCloseOrEnqueue(controller))
+ return;
+ if (!(@getByIdDirectPrivate(controller, "started") === 1))
+ return;
+ if ((!@isReadableStreamLocked(stream) || !@getByIdDirectPrivate(@getByIdDirectPrivate(stream, "reader"), "readRequests")?.isNotEmpty()) && @readableStreamDefaultControllerGetDesiredSize(controller) <= 0)
+ return;
+
+ if (@getByIdDirectPrivate(controller, "pulling")) {
+ @putByIdDirectPrivate(controller, "pullAgain", true);
+ return;
+ }
+
+
+ @assert(!@getByIdDirectPrivate(controller, "pullAgain"));
+ @putByIdDirectPrivate(controller, "pulling", true);
+
+ @getByIdDirectPrivate(controller, "pullAlgorithm").@call(@undefined).@then(function() {
+ @putByIdDirectPrivate(controller, "pulling", false);
+ if (@getByIdDirectPrivate(controller, "pullAgain")) {
+ @putByIdDirectPrivate(controller, "pullAgain", false);
+
+ @readableStreamDefaultControllerCallPullIfNeeded(controller);
+ }
+ }, function(error) {
+ @readableStreamDefaultControllerError(controller, error);
+ });
+}
+
+function isReadableStreamLocked(stream)
+{
+ "use strict";
+
+ @assert(@isReadableStream(stream));
+ return !!@getByIdDirectPrivate(stream, "reader");
+}
+
+function readableStreamDefaultControllerGetDesiredSize(controller)
+{
+ "use strict";
+
+ const stream = @getByIdDirectPrivate(controller, "controlledReadableStream");
+ const state = @getByIdDirectPrivate(stream, "state");
+
+ if (state === @streamErrored)
+ return null;
+ if (state === @streamClosed)
+ return 0;
+
+ return @getByIdDirectPrivate(controller, "strategy").highWaterMark - @getByIdDirectPrivate(controller, "queue").size;
+}
+
+
+function readableStreamReaderGenericCancel(reader, reason)
+{
+ "use strict";
+
+ const stream = @getByIdDirectPrivate(reader, "ownerReadableStream");
+ @assert(!!stream);
+ return @readableStreamCancel(stream, reason);
+}
+
+function readableStreamCancel(stream, reason)
+{
+ "use strict";
+
+ @putByIdDirectPrivate(stream, "disturbed", true);
+ const state = @getByIdDirectPrivate(stream, "state");
+ if (state === @streamClosed)
+ return @Promise.@resolve();
+ if (state === @streamErrored)
+ return @Promise.@reject(@getByIdDirectPrivate(stream, "storedError"));
+ @readableStreamClose(stream);
+
+ var controller = @getByIdDirectPrivate(stream, "readableStreamController");
+ return controller.@cancel(controller, reason).@then(function() { });
+}
+
+function readableStreamDefaultControllerCancel(controller, reason)
+{
+ "use strict";
+
+ @putByIdDirectPrivate(controller, "queue", @newQueue());
+ return @getByIdDirectPrivate(controller, "cancelAlgorithm").@call(@undefined, reason);
+}
+
+function readableStreamDefaultControllerPull(controller)
+{
+ "use strict";
+
+ var queue = @getByIdDirectPrivate(controller, "queue");
+ if (queue.content.isNotEmpty()) {
+ const chunk = @dequeueValue(queue);
+ if (@getByIdDirectPrivate(controller, "closeRequested") && queue.content.isEmpty())
+ @readableStreamClose(@getByIdDirectPrivate(controller, "controlledReadableStream"));
+ else
+ @readableStreamDefaultControllerCallPullIfNeeded(controller);
+
+ return @createFulfilledPromise({ value: chunk, done: false });
+ }
+ const pendingPromise = @readableStreamAddReadRequest(@getByIdDirectPrivate(controller, "controlledReadableStream"));
+ @readableStreamDefaultControllerCallPullIfNeeded(controller);
+ return pendingPromise;
+}
+
+function readableStreamDefaultControllerClose(controller)
+{
+ "use strict";
+
+ @assert(@readableStreamDefaultControllerCanCloseOrEnqueue(controller));
+ @putByIdDirectPrivate(controller, "closeRequested", true);
+ if (@getByIdDirectPrivate(controller, "queue")?.content?.isEmpty())
+ @readableStreamClose(@getByIdDirectPrivate(controller, "controlledReadableStream"));
+}
+
+function readableStreamClose(stream)
+{
+ "use strict";
+
+ @assert(@getByIdDirectPrivate(stream, "state") === @streamReadable);
+ @putByIdDirectPrivate(stream, "state", @streamClosed);
+ if (!@getByIdDirectPrivate(stream, "reader"))
+ return;
+
+ if (@isReadableStreamDefaultReader(@getByIdDirectPrivate(stream, "reader"))) {
+ const requests = @getByIdDirectPrivate(@getByIdDirectPrivate(stream, "reader"), "readRequests");
+ if (requests.isNotEmpty()) {
+ @putByIdDirectPrivate(@getByIdDirectPrivate(stream, "reader"), "readRequests", @createFIFO());
+
+ for (var request = requests.shift(); request; request = requests.shift())
+ @fulfillPromise(request, { value: @undefined, done: true });
+ }
+ }
+
+ @getByIdDirectPrivate(@getByIdDirectPrivate(stream, "reader"), "closedPromiseCapability").@resolve.@call();
+}
+
+function readableStreamFulfillReadRequest(stream, chunk, done)
+{
+ "use strict";
+ const readRequest = @getByIdDirectPrivate(@getByIdDirectPrivate(stream, "reader"), "readRequests").shift();
+ @fulfillPromise(readRequest, { value: chunk, done: done });
+}
+
+function readableStreamDefaultControllerEnqueue(controller, chunk)
+{
+ "use strict";
+
+ const stream = @getByIdDirectPrivate(controller, "controlledReadableStream");
+ // this is checked by callers
+ @assert(@readableStreamDefaultControllerCanCloseOrEnqueue(controller));
+
+ if (@isReadableStreamLocked(stream) && @getByIdDirectPrivate(@getByIdDirectPrivate(stream, "reader"), "readRequests")?.isNotEmpty()) {
+ @readableStreamFulfillReadRequest(stream, chunk, false);
+ @readableStreamDefaultControllerCallPullIfNeeded(controller);
+ return;
+ }
+
+ try {
+ let chunkSize = 1;
+ if (@getByIdDirectPrivate(controller, "strategy").size !== @undefined)
+ chunkSize = @getByIdDirectPrivate(controller, "strategy").size(chunk);
+ @enqueueValueWithSize(@getByIdDirectPrivate(controller, "queue"), chunk, chunkSize);
+ }
+ catch(error) {
+ @readableStreamDefaultControllerError(controller, error);
+ throw error;
+ }
+ @readableStreamDefaultControllerCallPullIfNeeded(controller);
+}
+
+function readableStreamDefaultReaderRead(reader)
+{
+ "use strict";
+
+ const stream = @getByIdDirectPrivate(reader, "ownerReadableStream");
+ @assert(!!stream);
+ const state = @getByIdDirectPrivate(stream, "state");
+
+ @putByIdDirectPrivate(stream, "disturbed", true);
+ if (state === @streamClosed)
+ return @createFulfilledPromise({ value: @undefined, done: true });
+ if (state === @streamErrored)
+ return @Promise.@reject(@getByIdDirectPrivate(stream, "storedError"));
+ @assert(state === @streamReadable);
+
+ return @getByIdDirectPrivate(stream, "readableStreamController").@pull(@getByIdDirectPrivate(stream, "readableStreamController"));
+}
+
+function readableStreamAddReadRequest(stream)
+{
+ "use strict";
+
+ @assert(@isReadableStreamDefaultReader(@getByIdDirectPrivate(stream, "reader")));
+ @assert(@getByIdDirectPrivate(stream, "state") == @streamReadable);
+
+ const readRequest = @newPromise();
+
+ @getByIdDirectPrivate(@getByIdDirectPrivate(stream, "reader"), "readRequests").push(readRequest);
+
+ return readRequest;
+}
+
+function isReadableStreamDisturbed(stream)
+{
+ "use strict";
+
+ @assert(@isReadableStream(stream));
+ return @getByIdDirectPrivate(stream, "disturbed");
+}
+
+function readableStreamReaderGenericRelease(reader)
+{
+ "use strict";
+
+ @assert(!!@getByIdDirectPrivate(reader, "ownerReadableStream"));
+ @assert(@getByIdDirectPrivate(@getByIdDirectPrivate(reader, "ownerReadableStream"), "reader") === reader);
+
+ if (@getByIdDirectPrivate(@getByIdDirectPrivate(reader, "ownerReadableStream"), "state") === @streamReadable)
+ @getByIdDirectPrivate(reader, "closedPromiseCapability").@reject.@call(@undefined, @makeTypeError("releasing lock of reader whose stream is still in readable state"));
+ else
+ @putByIdDirectPrivate(reader, "closedPromiseCapability", { @promise: @newHandledRejectedPromise(@makeTypeError("reader released lock")) });
+
+ const promise = @getByIdDirectPrivate(reader, "closedPromiseCapability").@promise;
+ @markPromiseAsHandled(promise);
+ @putByIdDirectPrivate(@getByIdDirectPrivate(reader, "ownerReadableStream"), "reader", @undefined);
+ @putByIdDirectPrivate(reader, "ownerReadableStream", @undefined);
+}
+
+function readableStreamDefaultControllerCanCloseOrEnqueue(controller)
+{
+ "use strict";
+
+ return !@getByIdDirectPrivate(controller, "closeRequested") && @getByIdDirectPrivate(@getByIdDirectPrivate(controller, "controlledReadableStream"), "state") === @streamReadable;
+}
+
+
+function lazyLoadStream(stream, autoAllocateChunkSize) {
+ "use strict";
+
+ var nativeType = @getByIdDirectPrivate(stream, "bunNativeType");
+ var nativePtr = @getByIdDirectPrivate(stream, "bunNativePtr");
+ var cached = @lazyStreamPrototypeMap;
+ var Prototype = cached.@get(nativeType);
+ if (Prototype === @undefined) {
+ var [pull, start, cancel, setClose, deinit] = @lazyLoad(nativeType);
+ var closer = [false];
+ var handleResult;
+ function handleNativeReadableStreamPromiseResult(val) {
+ "use strict";
+ var {c, v} = this;
+ this.c = @undefined;
+ this.v = @undefined;
+ handleResult(val, c, v);
+ }
+
+ handleResult = function handleResult(result, controller, view) {
+ "use strict";
+
+ if (result && @isPromise(result)) {
+ return result.then(handleNativeReadableStreamPromiseResult.bind({c: controller, v: view}), (err) => controller.error(err));
+ } else if (result !== false) {
+ if (view && view.byteLength === result) {
+ controller.byobRequest.respondWithNewView(view);
+ } else {
+ controller.byobRequest.respond(result);
+ }
+ }
+
+ if (closer[0] || result === false) {
+ @enqueueJob(() => controller.close());
+ closer[0] = false;
+ }
+ };
+
+ Prototype = class NativeReadableStreamSource {
+ constructor(tag, autoAllocateChunkSize) {
+ this.pull = this.pull_.bind(tag);
+ this.cancel = this.cancel_.bind(tag);
+ this.autoAllocateChunkSize = autoAllocateChunkSize;
+ }
+
+ pull;
+ cancel;
+
+ type = "bytes";
+ autoAllocateChunkSize = 0;
+
+ static startSync = start;
+
+ pull_(controller) {
+ closer[0] = false;
+ var result;
+
+ const view = controller.byobRequest.view;
+ try {
+ result = pull(this, view, closer);
+ } catch(err) {
+ return controller.error(err);
+ }
+
+ return handleResult(result, controller, view);
+ }
+
+ cancel_(reason) {
+ cancel(this, reason);
+ }
+ static deinit = deinit;
+ static registry = new FinalizationRegistry(deinit);
+ }
+ cached.@set(nativeType, Prototype);
+ }
+
+ const chunkSize = Prototype.startSync(nativePtr, autoAllocateChunkSize);
+
+ // empty file, no need for native back-and-forth on this
+ if (chunkSize === 0) {
+ @readableStreamClose(stream);
+ return null;
+ }
+ var instance = new Prototype(nativePtr, chunkSize);
+ Prototype.registry.register(instance, nativePtr);
+ return instance;
+} \ No newline at end of file
diff --git a/src/bun.js/builtins/js/StreamInternals.js b/src/bun.js/builtins/js/StreamInternals.js
new file mode 100644
index 000000000..c2ca3f5b5
--- /dev/null
+++ b/src/bun.js/builtins/js/StreamInternals.js
@@ -0,0 +1,317 @@
+/*
+ * Copyright (C) 2015 Canon Inc.
+ * Copyright (C) 2015 Igalia.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ * 1. Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY APPLE INC. ``AS IS'' AND ANY
+ * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+ * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL APPLE INC. OR
+ * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
+ * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
+ * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
+ * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY
+ * OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+// @internal
+
+function markPromiseAsHandled(promise)
+{
+ "use strict";
+
+ @assert(@isPromise(promise));
+ @putPromiseInternalField(promise, @promiseFieldFlags, @getPromiseInternalField(promise, @promiseFieldFlags) | @promiseFlagsIsHandled);
+}
+
+function shieldingPromiseResolve(result)
+{
+ "use strict";
+
+ const promise = @Promise.@resolve(result);
+ if (promise.@then === @undefined)
+ promise.@then = @Promise.prototype.@then;
+ return promise;
+}
+
+function promiseInvokeOrNoopMethodNoCatch(object, method, args)
+{
+ "use strict";
+
+ if (method === @undefined)
+ return @Promise.@resolve();
+ return @shieldingPromiseResolve(method.@apply(object, args));
+}
+
+function promiseInvokeOrNoopNoCatch(object, key, args)
+{
+ "use strict";
+
+ return @promiseInvokeOrNoopMethodNoCatch(object, object[key], args);
+}
+
+function promiseInvokeOrNoopMethod(object, method, args)
+{
+ "use strict";
+
+ try {
+ return @promiseInvokeOrNoopMethodNoCatch(object, method, args);
+ }
+ catch(error) {
+ return @Promise.@reject(error);
+ }
+}
+
+function promiseInvokeOrNoop(object, key, args)
+{
+ "use strict";
+
+ try {
+ return @promiseInvokeOrNoopNoCatch(object, key, args);
+ }
+ catch(error) {
+ return @Promise.@reject(error);
+ }
+}
+
+function promiseInvokeOrFallbackOrNoop(object, key1, args1, key2, args2)
+{
+ "use strict";
+
+ try {
+ const method = object[key1];
+ if (method === @undefined)
+ return @promiseInvokeOrNoopNoCatch(object, key2, args2);
+ return @shieldingPromiseResolve(method.@apply(object, args1));
+ }
+ catch(error) {
+ return @Promise.@reject(error);
+ }
+}
+
+function validateAndNormalizeQueuingStrategy(size, highWaterMark)
+{
+ "use strict";
+
+ if (size !== @undefined && typeof size !== "function")
+ @throwTypeError("size parameter must be a function");
+
+ const newHighWaterMark = @toNumber(highWaterMark);
+
+ if (@isNaN(newHighWaterMark) || newHighWaterMark < 0)
+ @throwRangeError("highWaterMark value is negative or not a number");
+
+ return { size: size, highWaterMark: newHighWaterMark };
+}
+
+@globalPrivate
+function createFIFO() {
+ "use strict";
+ class Denqueue {
+ constructor() {
+ this._head = 0;
+ this._tail = 0;
+ // this._capacity = 0;
+ this._capacityMask = 0x3;
+ this._list = @newArrayWithSize(4);
+ }
+
+ size() {
+ if (this._head === this._tail) return 0;
+ if (this._head < this._tail) return this._tail - this._head;
+ else return this._capacityMask + 1 - (this._head - this._tail);
+ }
+
+ isEmpty() {
+ return this.size() == 0;
+ }
+
+ isNotEmpty() {
+ return this.size() > 0;
+ }
+
+ shift() {
+ var head = this._head;
+ if (head === this._tail) return @undefined;
+ var item = this._list[head];
+ @putByValDirect(this._list, head, @undefined);
+ this._head = (head + 1) & this._capacityMask;
+ if (head < 2 && this._tail > 10000 && this._tail <= this._list.length >>> 2) this._shrinkArray();
+ return item;
+ }
+
+ peek() {
+ if (this._head === this._tail) return @undefined;
+ return this._list[this._head];
+ }
+
+ push(item) {
+ var tail = this._tail;
+ @putByValDirect(this._list, tail, item);
+ this._tail = (tail + 1) & this._capacityMask;
+ if (this._tail === this._head) {
+ this._growArray();
+ }
+ // if (this._capacity && this.size() > this._capacity) {
+ // this.shift();
+ // }
+ }
+
+ toArray(fullCopy) {
+ var list = this._list;
+ var len = @toLength(list.length);
+
+ if (fullCopy || this._head > this._tail) {
+ var _head = @toLength(this._head);
+ var _tail = @toLength(this._tail);
+ var total = @toLength((len - _head) + _tail);
+ var array = @newArrayWithSize(total);
+ var j = 0;
+ for (var i = _head; i < len; i++) @putByValDirect(array, j++, list[i]);
+ for (var i = 0; i < _tail; i++) @putByValDirect(array, j++, list[i]);
+ return array;
+ } else {
+ return @Array.prototype.slice.@call(list, this._head, this._tail);
+ }
+ }
+
+ clear() {
+ this._head = 0;
+ this._tail = 0;
+ this._list.fill(undefined);
+ }
+
+ _growArray() {
+ if (this._head) {
+ // copy existing data, head to end, then beginning to tail.
+ this._list = this.toArray(true);
+ this._head = 0;
+ }
+
+ // head is at 0 and array is now full, safe to extend
+ this._tail = @toLength(this._list.length);
+
+ this._list.length <<= 1;
+ this._capacityMask = (this._capacityMask << 1) | 1;
+ }
+
+ shrinkArray() {
+ this._list.length >>>= 1;
+ this._capacityMask >>>= 1;
+ }
+ }
+
+
+ return new Denqueue();
+}
+
+function newQueue()
+{
+ "use strict";
+
+ return { content: @createFIFO(), size: 0 };
+}
+
+function dequeueValue(queue)
+{
+ "use strict";
+
+ const record = queue.content.shift();
+ queue.size -= record.size;
+ // As described by spec, below case may occur due to rounding errors.
+ if (queue.size < 0)
+ queue.size = 0;
+ return record.value;
+}
+
+function enqueueValueWithSize(queue, value, size)
+{
+ "use strict";
+
+ size = @toNumber(size);
+ if (!@isFinite(size) || size < 0)
+ @throwRangeError("size has an incorrect value");
+
+ queue.content.push({ value, size });
+ queue.size += size;
+}
+
+function peekQueueValue(queue)
+{
+ "use strict";
+
+ @assert(queue.content.isNotEmpty());
+
+ return queue.peek().value;
+}
+
+function resetQueue(queue)
+{
+ "use strict";
+
+ @assert("content" in queue);
+ @assert("size" in queue);
+ queue.content.clear();
+ queue.size = 0;
+}
+
+function extractSizeAlgorithm(strategy)
+{
+ if (!("size" in strategy))
+ return () => 1;
+ const sizeAlgorithm = strategy["size"];
+ if (typeof sizeAlgorithm !== "function")
+ @throwTypeError("strategy.size must be a function");
+
+ return (chunk) => { return sizeAlgorithm(chunk); };
+}
+
+function extractHighWaterMark(strategy, defaultHWM)
+{
+ if (!("highWaterMark" in strategy))
+ return defaultHWM;
+ const highWaterMark = strategy["highWaterMark"];
+ if (@isNaN(highWaterMark) || highWaterMark < 0)
+ @throwRangeError("highWaterMark value is negative or not a number");
+
+ return @toNumber(highWaterMark);
+}
+
+function extractHighWaterMarkFromQueuingStrategyInit(init)
+{
+ "use strict";
+
+ if (!@isObject(init))
+ @throwTypeError("QueuingStrategyInit argument must be an object.");
+ const {highWaterMark} = init;
+ if (highWaterMark === @undefined)
+ @throwTypeError("QueuingStrategyInit.highWaterMark member is required.");
+
+ return @toNumber(highWaterMark);
+}
+
+function createFulfilledPromise(value)
+{
+ const promise = @newPromise();
+ @fulfillPromise(promise, value);
+ return promise;
+}
+
+function toDictionary(value, defaultValue, errorMessage)
+{
+ if (value === @undefined || value === null)
+ return defaultValue;
+ if (!@isObject(value))
+ @throwTypeError(errorMessage);
+ return value;
+}
diff --git a/src/bun.js/builtins/js/TransformStream.js b/src/bun.js/builtins/js/TransformStream.js
new file mode 100644
index 000000000..8d82d87d8
--- /dev/null
+++ b/src/bun.js/builtins/js/TransformStream.js
@@ -0,0 +1,116 @@
+/*
+ * Copyright (C) 2020 Apple Inc. All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ * 1. Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY APPLE INC. AND ITS CONTRIBUTORS ``AS IS''
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
+ * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+ * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL APPLE INC. OR ITS CONTRIBUTORS
+ * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
+ * THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+function initializeTransformStream()
+{
+ "use strict";
+
+ let transformer = arguments[0];
+
+ // This is the path for CreateTransformStream.
+ if (@isObject(transformer) && @getByIdDirectPrivate(transformer, "TransformStream"))
+ return this;
+
+ let writableStrategy = arguments[1];
+ let readableStrategy = arguments[2];
+
+ if (transformer === @undefined)
+ transformer = null;
+
+ if (readableStrategy === @undefined)
+ readableStrategy = { };
+
+ if (writableStrategy === @undefined)
+ writableStrategy = { };
+
+ let transformerDict = { };
+ if (transformer !== null) {
+ if ("start" in transformer) {
+ transformerDict["start"] = transformer["start"];
+ if (typeof transformerDict["start"] !== "function")
+ @throwTypeError("transformer.start should be a function");
+ }
+ if ("transform" in transformer) {
+ transformerDict["transform"] = transformer["transform"];
+ if (typeof transformerDict["transform"] !== "function")
+ @throwTypeError("transformer.transform should be a function");
+ }
+ if ("flush" in transformer) {
+ transformerDict["flush"] = transformer["flush"];
+ if (typeof transformerDict["flush"] !== "function")
+ @throwTypeError("transformer.flush should be a function");
+ }
+
+ if ("readableType" in transformer)
+ @throwRangeError("TransformStream transformer has a readableType");
+ if ("writableType" in transformer)
+ @throwRangeError("TransformStream transformer has a writableType");
+ }
+
+ const readableHighWaterMark = @extractHighWaterMark(readableStrategy, 0);
+ const readableSizeAlgorithm = @extractSizeAlgorithm(readableStrategy);
+
+ const writableHighWaterMark = @extractHighWaterMark(writableStrategy, 1);
+ const writableSizeAlgorithm = @extractSizeAlgorithm(writableStrategy);
+
+ const startPromiseCapability = @newPromiseCapability(@Promise);
+ @initializeTransformStream(this, startPromiseCapability.@promise, writableHighWaterMark, writableSizeAlgorithm, readableHighWaterMark, readableSizeAlgorithm);
+ @setUpTransformStreamDefaultControllerFromTransformer(this, transformer, transformerDict);
+
+ if ("start" in transformerDict) {
+ const controller = @getByIdDirectPrivate(this, "controller");
+ const startAlgorithm = () => @promiseInvokeOrNoopMethodNoCatch(transformer, transformerDict["start"], [controller]);
+ startAlgorithm().@then(() => {
+ // FIXME: We probably need to resolve start promise with the result of the start algorithm.
+ startPromiseCapability.@resolve.@call();
+ }, (error) => {
+ startPromiseCapability.@reject.@call(@undefined, error);
+ });
+ } else
+ startPromiseCapability.@resolve.@call();
+
+ return this;
+}
+
+@getter
+function readable()
+{
+ "use strict";
+
+ if (!@isTransformStream(this))
+ throw @makeThisTypeError("TransformStream", "readable");
+
+ return @getByIdDirectPrivate(this, "readable");
+}
+
+function writable()
+{
+ "use strict";
+
+ if (!@isTransformStream(this))
+ throw @makeThisTypeError("TransformStream", "writable");
+
+ return @getByIdDirectPrivate(this, "writable");
+}
diff --git a/src/bun.js/builtins/js/TransformStreamDefaultController.js b/src/bun.js/builtins/js/TransformStreamDefaultController.js
new file mode 100644
index 000000000..5ed7d0dfa
--- /dev/null
+++ b/src/bun.js/builtins/js/TransformStreamDefaultController.js
@@ -0,0 +1,76 @@
+/*
+ * Copyright (C) 2020 Apple Inc. All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ * 1. Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY APPLE INC. AND ITS CONTRIBUTORS ``AS IS''
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
+ * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+ * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL APPLE INC. OR ITS CONTRIBUTORS
+ * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
+ * THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+function initializeTransformStreamDefaultController()
+{
+ "use strict";
+
+ return this;
+}
+
+@getter
+function desiredSize()
+{
+ "use strict";
+
+ if (!@isTransformStreamDefaultController(this))
+ throw @makeThisTypeError("TransformStreamDefaultController", "enqueue");
+
+ const stream = @getByIdDirectPrivate(this, "stream");
+ const readable = @getByIdDirectPrivate(stream, "readable");
+ const readableController = @getByIdDirectPrivate(readable, "readableStreamController");
+
+ return @readableStreamDefaultControllerGetDesiredSize(readableController);
+}
+
+function enqueue(chunk)
+{
+ "use strict";
+
+ if (!@isTransformStreamDefaultController(this))
+ throw @makeThisTypeError("TransformStreamDefaultController", "enqueue");
+
+ @transformStreamDefaultControllerEnqueue(this, chunk);
+}
+
+function error(e)
+{
+ "use strict";
+
+ if (!@isTransformStreamDefaultController(this))
+ throw @makeThisTypeError("TransformStreamDefaultController", "error");
+
+ @transformStreamDefaultControllerError(this, e);
+}
+
+function terminate()
+{
+ "use strict";
+
+ if (!@isTransformStreamDefaultController(this))
+ throw @makeThisTypeError("TransformStreamDefaultController", "terminate");
+
+ @transformStreamDefaultControllerTerminate(this);
+}
diff --git a/src/bun.js/builtins/js/TransformStreamInternals.js b/src/bun.js/builtins/js/TransformStreamInternals.js
new file mode 100644
index 000000000..4263e3991
--- /dev/null
+++ b/src/bun.js/builtins/js/TransformStreamInternals.js
@@ -0,0 +1,350 @@
+/*
+ * Copyright (C) 2020 Apple Inc. All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ * 1. Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY APPLE INC. AND ITS CONTRIBUTORS ``AS IS''
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
+ * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+ * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL APPLE INC. OR ITS CONTRIBUTORS
+ * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
+ * THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+// @internal
+
+function isTransformStream(stream)
+{
+ "use strict";
+
+ return @isObject(stream) && !!@getByIdDirectPrivate(stream, "readable");
+}
+
+function isTransformStreamDefaultController(controller)
+{
+ "use strict";
+
+ return @isObject(controller) && !!@getByIdDirectPrivate(controller, "transformAlgorithm");
+}
+
+function createTransformStream(startAlgorithm, transformAlgorithm, flushAlgorithm, writableHighWaterMark, writableSizeAlgorithm, readableHighWaterMark, readableSizeAlgorithm)
+{
+ if (writableHighWaterMark === @undefined)
+ writableHighWaterMark = 1;
+ if (writableSizeAlgorithm === @undefined)
+ writableSizeAlgorithm = () => 1;
+ if (readableHighWaterMark === @undefined)
+ readableHighWaterMark = 0;
+ if (readableSizeAlgorithm === @undefined)
+ readableSizeAlgorithm = () => 1;
+ @assert(writableHighWaterMark >= 0);
+ @assert(readableHighWaterMark >= 0);
+
+ const transform = {};
+ @putByIdDirectPrivate(transform, "TransformStream", true);
+
+ const stream = new @TransformStream(transform);
+ const startPromiseCapability = @newPromiseCapability(@Promise);
+ @initializeTransformStream(stream, startPromiseCapability.@promise, writableHighWaterMark, writableSizeAlgorithm, readableHighWaterMark, readableSizeAlgorithm);
+
+ const controller = new @TransformStreamDefaultController();
+ @setUpTransformStreamDefaultController(stream, controller, transformAlgorithm, flushAlgorithm);
+
+ startAlgorithm().@then(() => {
+ startPromiseCapability.@resolve.@call();
+ }, (error) => {
+ startPromiseCapability.@reject.@call(@undefined, error);
+ });
+
+ return stream;
+}
+
+function initializeTransformStream(stream, startPromise, writableHighWaterMark, writableSizeAlgorithm, readableHighWaterMark, readableSizeAlgorithm)
+{
+ "use strict";
+
+ const startAlgorithm = () => { return startPromise; };
+ const writeAlgorithm = (chunk) => { return @transformStreamDefaultSinkWriteAlgorithm(stream, chunk); }
+ const abortAlgorithm = (reason) => { return @transformStreamDefaultSinkAbortAlgorithm(stream, reason); }
+ const closeAlgorithm = () => { return @transformStreamDefaultSinkCloseAlgorithm(stream); }
+ const writable = @createWritableStream(startAlgorithm, writeAlgorithm, closeAlgorithm, abortAlgorithm, writableHighWaterMark, writableSizeAlgorithm);
+
+ const pullAlgorithm = () => { return @transformStreamDefaultSourcePullAlgorithm(stream); };
+ const cancelAlgorithm = (reason) => {
+ @transformStreamErrorWritableAndUnblockWrite(stream, reason);
+ return @Promise.@resolve();
+ };
+ const underlyingSource = { };
+ @putByIdDirectPrivate(underlyingSource, "start", startAlgorithm);
+ @putByIdDirectPrivate(underlyingSource, "pull", pullAlgorithm);
+ @putByIdDirectPrivate(underlyingSource, "cancel", cancelAlgorithm);
+ const options = { };
+ @putByIdDirectPrivate(options, "size", readableSizeAlgorithm);
+ @putByIdDirectPrivate(options, "highWaterMark", readableHighWaterMark);
+ const readable = new @ReadableStream(underlyingSource, options);
+
+ // The writable to expose to JS through writable getter.
+ @putByIdDirectPrivate(stream, "writable", writable);
+ // The writable to use for the actual transform algorithms.
+ @putByIdDirectPrivate(stream, "internalWritable", @getInternalWritableStream(writable));
+
+ @putByIdDirectPrivate(stream, "readable", readable);
+ @putByIdDirectPrivate(stream, "backpressure", @undefined);
+ @putByIdDirectPrivate(stream, "backpressureChangePromise", @undefined);
+
+ @transformStreamSetBackpressure(stream, true);
+ @putByIdDirectPrivate(stream, "controller", @undefined);
+}
+
+function transformStreamError(stream, e)
+{
+ "use strict";
+
+ const readable = @getByIdDirectPrivate(stream, "readable");
+ const readableController = @getByIdDirectPrivate(readable, "readableStreamController");
+ @readableStreamDefaultControllerError(readableController, e);
+
+ @transformStreamErrorWritableAndUnblockWrite(stream, e);
+}
+
+function transformStreamErrorWritableAndUnblockWrite(stream, e)
+{
+ "use strict";
+
+ @transformStreamDefaultControllerClearAlgorithms(@getByIdDirectPrivate(stream, "controller"));
+
+ const writable = @getByIdDirectPrivate(stream, "internalWritable");
+ @writableStreamDefaultControllerErrorIfNeeded(@getByIdDirectPrivate(writable, "controller"), e);
+
+ if (@getByIdDirectPrivate(stream, "backpressure"))
+ @transformStreamSetBackpressure(stream, false);
+}
+
+function transformStreamSetBackpressure(stream, backpressure)
+{
+ "use strict";
+
+ @assert(@getByIdDirectPrivate(stream, "backpressure") !== backpressure);
+
+ const backpressureChangePromise = @getByIdDirectPrivate(stream, "backpressureChangePromise");
+ if (backpressureChangePromise !== @undefined)
+ backpressureChangePromise.@resolve.@call();
+
+ @putByIdDirectPrivate(stream, "backpressureChangePromise", @newPromiseCapability(@Promise));
+ @putByIdDirectPrivate(stream, "backpressure", backpressure);
+}
+
+function setUpTransformStreamDefaultController(stream, controller, transformAlgorithm, flushAlgorithm)
+{
+ "use strict";
+
+ @assert(@isTransformStream(stream));
+ @assert(@getByIdDirectPrivate(stream, "controller") === @undefined);
+
+ @putByIdDirectPrivate(controller, "stream", stream);
+ @putByIdDirectPrivate(stream, "controller", controller);
+ @putByIdDirectPrivate(controller, "transformAlgorithm", transformAlgorithm);
+ @putByIdDirectPrivate(controller, "flushAlgorithm", flushAlgorithm);
+}
+
+
+function setUpTransformStreamDefaultControllerFromTransformer(stream, transformer, transformerDict)
+{
+ "use strict";
+
+ const controller = new @TransformStreamDefaultController();
+ let transformAlgorithm = (chunk) => {
+ try {
+ @transformStreamDefaultControllerEnqueue(controller, chunk);
+ } catch (e) {
+ return @Promise.@reject(e);
+ }
+ return @Promise.@resolve();
+ };
+ let flushAlgorithm = () => { return @Promise.@resolve(); };
+
+ if ("transform" in transformerDict)
+ transformAlgorithm = (chunk) => {
+ return @promiseInvokeOrNoopMethod(transformer, transformerDict["transform"], [chunk, controller]);
+ };
+
+ if ("flush" in transformerDict) {
+ flushAlgorithm = () => {
+ return @promiseInvokeOrNoopMethod(transformer, transformerDict["flush"], [controller]);
+ };
+ }
+
+ @setUpTransformStreamDefaultController(stream, controller, transformAlgorithm, flushAlgorithm);
+}
+
+function transformStreamDefaultControllerClearAlgorithms(controller)
+{
+ "use strict";
+
+ // We set transformAlgorithm to true to allow GC but keep the isTransformStreamDefaultController check.
+ @putByIdDirectPrivate(controller, "transformAlgorithm", true);
+ @putByIdDirectPrivate(controller, "flushAlgorithm", @undefined);
+}
+
+function transformStreamDefaultControllerEnqueue(controller, chunk)
+{
+ "use strict";
+
+ const stream = @getByIdDirectPrivate(controller, "stream");
+ const readable = @getByIdDirectPrivate(stream, "readable");
+ const readableController = @getByIdDirectPrivate(readable, "readableStreamController");
+
+ @assert(readableController !== @undefined);
+ if (!@readableStreamDefaultControllerCanCloseOrEnqueue(readableController))
+ @throwTypeError("TransformStream.readable cannot close or enqueue");
+
+ try {
+ @readableStreamDefaultControllerEnqueue(readableController, chunk);
+ } catch (e) {
+ @transformStreamErrorWritableAndUnblockWrite(stream, e);
+ throw @getByIdDirectPrivate(readable, "storedError");
+ }
+
+ const backpressure = !@readableStreamDefaultControllerShouldCallPull(readableController);
+ if (backpressure !== @getByIdDirectPrivate(stream, "backpressure")) {
+ @assert(backpressure);
+ @transformStreamSetBackpressure(stream, true);
+ }
+}
+
+function transformStreamDefaultControllerError(controller, e)
+{
+ "use strict";
+
+ @transformStreamError(@getByIdDirectPrivate(controller, "stream"), e);
+}
+
+function transformStreamDefaultControllerPerformTransform(controller, chunk)
+{
+ "use strict";
+
+ const promiseCapability = @newPromiseCapability(@Promise);
+
+ const transformPromise = @getByIdDirectPrivate(controller, "transformAlgorithm").@call(@undefined, chunk);
+ transformPromise.@then(() => {
+ promiseCapability.@resolve();
+ }, (r) => {
+ @transformStreamError(@getByIdDirectPrivate(controller, "stream"), r);
+ promiseCapability.@reject.@call(@undefined, r);
+ });
+ return promiseCapability.@promise;
+}
+
+function transformStreamDefaultControllerTerminate(controller)
+{
+ "use strict";
+
+ const stream = @getByIdDirectPrivate(controller, "stream");
+ const readable = @getByIdDirectPrivate(stream, "readable");
+ const readableController = @getByIdDirectPrivate(readable, "readableStreamController");
+
+ // FIXME: Update readableStreamDefaultControllerClose to make this check.
+ if (@readableStreamDefaultControllerCanCloseOrEnqueue(readableController))
+ @readableStreamDefaultControllerClose(readableController);
+ const error = @makeTypeError("the stream has been terminated");
+ @transformStreamErrorWritableAndUnblockWrite(stream, error);
+}
+
+function transformStreamDefaultSinkWriteAlgorithm(stream, chunk)
+{
+ "use strict";
+
+ const writable = @getByIdDirectPrivate(stream, "internalWritable");
+
+ @assert(@getByIdDirectPrivate(writable, "state") === "writable");
+
+ const controller = @getByIdDirectPrivate(stream, "controller");
+
+ if (@getByIdDirectPrivate(stream, "backpressure")) {
+ const promiseCapability = @newPromiseCapability(@Promise);
+
+ const backpressureChangePromise = @getByIdDirectPrivate(stream, "backpressureChangePromise");
+ @assert(backpressureChangePromise !== @undefined);
+ backpressureChangePromise.@promise.@then(() => {
+ const state = @getByIdDirectPrivate(writable, "state");
+ if (state === "erroring") {
+ promiseCapability.@reject.@call(@undefined, @getByIdDirectPrivate(writable, "storedError"));
+ return;
+ }
+
+ @assert(state === "writable");
+ @transformStreamDefaultControllerPerformTransform(controller, chunk).@then(() => {
+ promiseCapability.@resolve();
+ }, (e) => {
+ promiseCapability.@reject.@call(@undefined, e);
+ });
+ }, (e) => {
+ promiseCapability.@reject.@call(@undefined, e);
+ });
+
+ return promiseCapability.@promise;
+ }
+ return @transformStreamDefaultControllerPerformTransform(controller, chunk);
+}
+
+function transformStreamDefaultSinkAbortAlgorithm(stream, reason)
+{
+ "use strict";
+
+ @transformStreamError(stream, reason);
+ return @Promise.@resolve();
+}
+
+function transformStreamDefaultSinkCloseAlgorithm(stream)
+{
+ "use strict";
+ const readable = @getByIdDirectPrivate(stream, "readable");
+ const controller = @getByIdDirectPrivate(stream, "controller");
+ const readableController = @getByIdDirectPrivate(readable, "readableStreamController");
+
+ const flushAlgorithm = @getByIdDirectPrivate(controller, "flushAlgorithm");
+ @assert(flushAlgorithm !== @undefined);
+ const flushPromise = @getByIdDirectPrivate(controller, "flushAlgorithm").@call();
+ @transformStreamDefaultControllerClearAlgorithms(controller);
+
+ const promiseCapability = @newPromiseCapability(@Promise);
+ flushPromise.@then(() => {
+ if (@getByIdDirectPrivate(readable, "state") === @streamErrored) {
+ promiseCapability.@reject.@call(@undefined, @getByIdDirectPrivate(readable, "storedError"));
+ return;
+ }
+
+ // FIXME: Update readableStreamDefaultControllerClose to make this check.
+ if (@readableStreamDefaultControllerCanCloseOrEnqueue(readableController))
+ @readableStreamDefaultControllerClose(readableController);
+ promiseCapability.@resolve();
+ }, (r) => {
+ @transformStreamError(@getByIdDirectPrivate(controller, "stream"), r);
+ promiseCapability.@reject.@call(@undefined, @getByIdDirectPrivate(readable, "storedError"));
+ });
+ return promiseCapability.@promise;
+}
+
+function transformStreamDefaultSourcePullAlgorithm(stream)
+{
+ "use strict";
+
+ @assert(@getByIdDirectPrivate(stream, "backpressure"));
+ @assert(@getByIdDirectPrivate(stream, "backpressureChangePromise") !== @undefined);
+
+ @transformStreamSetBackpressure(stream, false);
+
+ return @getByIdDirectPrivate(stream, "backpressureChangePromise").@promise;
+}
diff --git a/src/bun.js/builtins/js/WritableStreamDefaultController.js b/src/bun.js/builtins/js/WritableStreamDefaultController.js
new file mode 100644
index 000000000..8c42212e0
--- /dev/null
+++ b/src/bun.js/builtins/js/WritableStreamDefaultController.js
@@ -0,0 +1,56 @@
+/*
+ * Copyright (C) 2020 Apple Inc. All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ * 1. Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY APPLE INC. AND ITS CONTRIBUTORS ``AS IS''
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
+ * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+ * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL APPLE INC. OR ITS CONTRIBUTORS
+ * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
+ * THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+
+function initializeWritableStreamDefaultController()
+{
+ "use strict";
+
+ @putByIdDirectPrivate(this, "queue", @newQueue());
+ @putByIdDirectPrivate(this, "abortSteps", (reason) => {
+ const result = @getByIdDirectPrivate(this, "abortAlgorithm").@call(@undefined, reason);
+ @writableStreamDefaultControllerClearAlgorithms(this);
+ return result;
+ });
+
+ @putByIdDirectPrivate(this, "errorSteps", () => {
+ @resetQueue(@getByIdDirectPrivate(this, "queue"));
+ });
+
+ return this;
+}
+
+function error(e)
+{
+ "use strict";
+
+ if (@getByIdDirectPrivate(this, "abortSteps") === @undefined)
+ throw @makeThisTypeError("WritableStreamDefaultController", "error");
+
+ const stream = @getByIdDirectPrivate(this, "stream");
+ if (@getByIdDirectPrivate(stream, "state") !== "writable")
+ return;
+ @writableStreamDefaultControllerError(this, e);
+}
diff --git a/src/bun.js/builtins/js/WritableStreamDefaultWriter.js b/src/bun.js/builtins/js/WritableStreamDefaultWriter.js
new file mode 100644
index 000000000..69a953fc3
--- /dev/null
+++ b/src/bun.js/builtins/js/WritableStreamDefaultWriter.js
@@ -0,0 +1,135 @@
+/*
+ * Copyright (C) 2020 Apple Inc. All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ * 1. Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY APPLE INC. AND ITS CONTRIBUTORS ``AS IS''
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
+ * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+ * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL APPLE INC. OR ITS CONTRIBUTORS
+ * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
+ * THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+function initializeWritableStreamDefaultWriter(stream)
+{
+ "use strict";
+
+ // stream can be a WritableStream if WritableStreamDefaultWriter constructor is called directly from JS
+ // or an InternalWritableStream in other code paths.
+ const internalStream = @getInternalWritableStream(stream);
+ if (internalStream)
+ stream = internalStream;
+
+ if (!@isWritableStream(stream))
+ @throwTypeError("WritableStreamDefaultWriter constructor takes a WritableStream");
+
+ @setUpWritableStreamDefaultWriter(this, stream);
+ return this;
+}
+
+@getter
+function closed()
+{
+ "use strict";
+
+ if (!@isWritableStreamDefaultWriter(this))
+ return @Promise.@reject(@makeGetterTypeError("WritableStreamDefaultWriter", "closed"));
+
+ return @getByIdDirectPrivate(this, "closedPromise").@promise;
+}
+
+@getter
+function desiredSize()
+{
+ "use strict";
+
+ if (!@isWritableStreamDefaultWriter(this))
+ throw @makeThisTypeError("WritableStreamDefaultWriter", "desiredSize");
+
+ if (@getByIdDirectPrivate(this, "stream") === @undefined)
+ @throwTypeError("WritableStreamDefaultWriter has no stream");
+
+ return @writableStreamDefaultWriterGetDesiredSize(this);
+}
+
+@getter
+function ready()
+{
+ "use strict";
+
+ if (!@isWritableStreamDefaultWriter(this))
+ return @Promise.@reject(@makeThisTypeError("WritableStreamDefaultWriter", "ready"));
+
+ return @getByIdDirectPrivate(this, "readyPromise").@promise;
+}
+
+function abort(reason)
+{
+ "use strict";
+
+ if (!@isWritableStreamDefaultWriter(this))
+ return @Promise.@reject(@makeThisTypeError("WritableStreamDefaultWriter", "abort"));
+
+ if (@getByIdDirectPrivate(this, "stream") === @undefined)
+ return @Promise.@reject(@makeTypeError("WritableStreamDefaultWriter has no stream"));
+
+ return @writableStreamDefaultWriterAbort(this, reason);
+}
+
+function close()
+{
+ "use strict";
+
+ if (!@isWritableStreamDefaultWriter(this))
+ return @Promise.@reject(@makeThisTypeError("WritableStreamDefaultWriter", "close"));
+
+ const stream = @getByIdDirectPrivate(this, "stream");
+ if (stream === @undefined)
+ return @Promise.@reject(@makeTypeError("WritableStreamDefaultWriter has no stream"));
+
+ if (@writableStreamCloseQueuedOrInFlight(stream))
+ return @Promise.@reject(@makeTypeError("WritableStreamDefaultWriter is being closed"));
+
+ return @writableStreamDefaultWriterClose(this);
+}
+
+function releaseLock()
+{
+ "use strict";
+
+ if (!@isWritableStreamDefaultWriter(this))
+ throw @makeThisTypeError("WritableStreamDefaultWriter", "releaseLock");
+
+ const stream = @getByIdDirectPrivate(this, "stream");
+ if (stream === @undefined)
+ return;
+
+ @assert(@getByIdDirectPrivate(stream, "writer") !== @undefined);
+ @writableStreamDefaultWriterRelease(this);
+}
+
+function write(chunk)
+{
+ "use strict";
+
+ if (!@isWritableStreamDefaultWriter(this))
+ return @Promise.@reject(@makeThisTypeError("WritableStreamDefaultWriter", "write"));
+
+ if (@getByIdDirectPrivate(this, "stream") === @undefined)
+ return @Promise.@reject(@makeTypeError("WritableStreamDefaultWriter has no stream"));
+
+ return @writableStreamDefaultWriterWrite(this, chunk);
+}
diff --git a/src/bun.js/builtins/js/WritableStreamInternals.js b/src/bun.js/builtins/js/WritableStreamInternals.js
new file mode 100644
index 000000000..5a97155f2
--- /dev/null
+++ b/src/bun.js/builtins/js/WritableStreamInternals.js
@@ -0,0 +1,856 @@
+/*
+ * Copyright (C) 2015 Canon Inc.
+ * Copyright (C) 2015 Igalia
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ * 1. Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY APPLE INC. ``AS IS'' AND ANY
+ * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+ * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL APPLE INC. OR
+ * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
+ * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
+ * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
+ * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY
+ * OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+// @internal
+
+function isWritableStream(stream)
+{
+ "use strict";
+
+ return @isObject(stream) && !!@getByIdDirectPrivate(stream, "underlyingSink");
+}
+
+function isWritableStreamDefaultWriter(writer)
+{
+ "use strict";
+
+ return @isObject(writer) && !!@getByIdDirectPrivate(writer, "closedPromise");
+}
+
+function acquireWritableStreamDefaultWriter(stream)
+{
+ return new @WritableStreamDefaultWriter(stream);
+}
+
+// https://streams.spec.whatwg.org/#create-writable-stream
+function createWritableStream(startAlgorithm, writeAlgorithm, closeAlgorithm, abortAlgorithm, highWaterMark, sizeAlgorithm)
+{
+ @assert(typeof highWaterMark === "number" && !@isNaN(highWaterMark) && highWaterMark >= 0);
+
+ const internalStream = { };
+ @initializeWritableStreamSlots(internalStream, { });
+ const controller = new @WritableStreamDefaultController();
+
+ @setUpWritableStreamDefaultController(internalStream, controller, startAlgorithm, writeAlgorithm, closeAlgorithm, abortAlgorithm, highWaterMark, sizeAlgorithm);
+
+ return @createWritableStreamFromInternal(internalStream);
+}
+
+function createInternalWritableStreamFromUnderlyingSink(underlyingSink, strategy)
+{
+ "use strict";
+
+ const stream = { };
+
+ if (underlyingSink === @undefined)
+ underlyingSink = { };
+
+ if (strategy === @undefined)
+ strategy = { };
+
+ if (!@isObject(underlyingSink))
+ @throwTypeError("WritableStream constructor takes an object as first argument");
+
+ if ("type" in underlyingSink)
+ @throwRangeError("Invalid type is specified");
+
+ const sizeAlgorithm = @extractSizeAlgorithm(strategy);
+ const highWaterMark = @extractHighWaterMark(strategy, 1);
+
+ const underlyingSinkDict = { };
+ if ("start" in underlyingSink) {
+ underlyingSinkDict["start"] = underlyingSink["start"];
+ if (typeof underlyingSinkDict["start"] !== "function")
+ @throwTypeError("underlyingSink.start should be a function");
+ }
+ if ("write" in underlyingSink) {
+ underlyingSinkDict["write"] = underlyingSink["write"];
+ if (typeof underlyingSinkDict["write"] !== "function")
+ @throwTypeError("underlyingSink.write should be a function");
+ }
+ if ("close" in underlyingSink) {
+ underlyingSinkDict["close"] = underlyingSink["close"];
+ if (typeof underlyingSinkDict["close"] !== "function")
+ @throwTypeError("underlyingSink.close should be a function");
+ }
+ if ("abort" in underlyingSink) {
+ underlyingSinkDict["abort"] = underlyingSink["abort"];
+ if (typeof underlyingSinkDict["abort"] !== "function")
+ @throwTypeError("underlyingSink.abort should be a function");
+ }
+
+ @initializeWritableStreamSlots(stream, underlyingSink);
+ @setUpWritableStreamDefaultControllerFromUnderlyingSink(stream, underlyingSink, underlyingSinkDict, highWaterMark, sizeAlgorithm);
+
+ return stream;
+}
+
+function initializeWritableStreamSlots(stream, underlyingSink)
+{ "use strict";
+
+ @putByIdDirectPrivate(stream, "state", "writable");
+ @putByIdDirectPrivate(stream, "storedError", @undefined);
+ @putByIdDirectPrivate(stream, "writer", @undefined);
+ @putByIdDirectPrivate(stream, "controller", @undefined);
+ @putByIdDirectPrivate(stream, "inFlightWriteRequest", @undefined);
+ @putByIdDirectPrivate(stream, "closeRequest", @undefined);
+ @putByIdDirectPrivate(stream, "inFlightCloseRequest", @undefined);
+ @putByIdDirectPrivate(stream, "pendingAbortRequest", @undefined);
+ @putByIdDirectPrivate(stream, "writeRequests", @createFIFO());
+ @putByIdDirectPrivate(stream, "backpressure", false);
+ @putByIdDirectPrivate(stream, "underlyingSink", underlyingSink);
+}
+
+function writableStreamCloseForBindings(stream)
+{ "use strict";
+
+ if (@isWritableStreamLocked(stream))
+ return @Promise.@reject(@makeTypeError("WritableStream.close method can only be used on non locked WritableStream"));
+
+ if (@writableStreamCloseQueuedOrInFlight(stream))
+ return @Promise.@reject(@makeTypeError("WritableStream.close method can only be used on a being close WritableStream"));
+
+ return @writableStreamClose(stream);
+}
+
+function writableStreamAbortForBindings(stream, reason)
+{ "use strict";
+
+ if (@isWritableStreamLocked(stream))
+ return @Promise.@reject(@makeTypeError("WritableStream.abort method can only be used on non locked WritableStream"));
+
+ return @writableStreamAbort(stream, reason);
+}
+
+function isWritableStreamLocked(stream)
+{ "use strict";
+
+ return @getByIdDirectPrivate(stream, "writer") !== @undefined;
+}
+
+function setUpWritableStreamDefaultWriter(writer, stream)
+{ "use strict";
+
+ if (@isWritableStreamLocked(stream))
+ @throwTypeError("WritableStream is locked");
+
+ @putByIdDirectPrivate(writer, "stream", stream);
+ @putByIdDirectPrivate(stream, "writer", writer);
+
+ const readyPromiseCapability = @newPromiseCapability(@Promise);
+ const closedPromiseCapability = @newPromiseCapability(@Promise);
+ @putByIdDirectPrivate(writer, "readyPromise", readyPromiseCapability);
+ @putByIdDirectPrivate(writer, "closedPromise", closedPromiseCapability);
+
+ const state = @getByIdDirectPrivate(stream, "state");
+ if (state === "writable") {
+ if (@writableStreamCloseQueuedOrInFlight(stream) || !@getByIdDirectPrivate(stream, "backpressure"))
+ readyPromiseCapability.@resolve.@call();
+ } else if (state === "erroring") {
+ readyPromiseCapability.@reject.@call(@undefined, @getByIdDirectPrivate(stream, "storedError"));
+ @markPromiseAsHandled(readyPromiseCapability.@promise);
+ } else if (state === "closed") {
+ readyPromiseCapability.@resolve.@call();
+ closedPromiseCapability.@resolve.@call();
+ } else {
+ @assert(state === "errored");
+ const storedError = @getByIdDirectPrivate(stream, "storedError");
+ readyPromiseCapability.@reject.@call(@undefined, storedError);
+ @markPromiseAsHandled(readyPromiseCapability.@promise);
+ closedPromiseCapability.@reject.@call(@undefined, storedError);
+ @markPromiseAsHandled(closedPromiseCapability.@promise);
+ }
+}
+
+function writableStreamAbort(stream, reason)
+{
+ "use strict";
+ const state = @getByIdDirectPrivate(stream, "state");
+ if (state === "closed" || state === "errored")
+ return @Promise.@resolve();
+
+ const pendingAbortRequest = @getByIdDirectPrivate(stream, "pendingAbortRequest");
+ if (pendingAbortRequest !== @undefined)
+ return pendingAbortRequest.promise.@promise;
+
+ @assert(state === "writable" || state === "erroring");
+ let wasAlreadyErroring = false;
+ if (state === "erroring") {
+ wasAlreadyErroring = true;
+ reason = @undefined;
+ }
+
+ const abortPromiseCapability = @newPromiseCapability(@Promise);
+ @putByIdDirectPrivate(stream, "pendingAbortRequest", { promise : abortPromiseCapability, reason : reason, wasAlreadyErroring : wasAlreadyErroring });
+
+ if (!wasAlreadyErroring)
+ @writableStreamStartErroring(stream, reason);
+ return abortPromiseCapability.@promise;
+}
+
+function writableStreamClose(stream)
+{
+ "use strict";
+
+ const state = @getByIdDirectPrivate(stream, "state");
+ if (state === "closed" || state === "errored")
+ return @Promise.@reject(@makeTypeError("Cannot close a writable stream that is closed or errored"));
+
+ @assert(state === "writable" || state === "erroring");
+ @assert(!@writableStreamCloseQueuedOrInFlight(stream));
+
+ const closePromiseCapability = @newPromiseCapability(@Promise);
+ @putByIdDirectPrivate(stream, "closeRequest", closePromiseCapability);
+
+ const writer = @getByIdDirectPrivate(stream, "writer");
+ if (writer !== @undefined && @getByIdDirectPrivate(stream, "backpressure") && state === "writable")
+ @getByIdDirectPrivate(writer, "readyPromise").@resolve.@call();
+
+ @writableStreamDefaultControllerClose(@getByIdDirectPrivate(stream, "controller"));
+
+ return closePromiseCapability.@promise;
+}
+
+function writableStreamAddWriteRequest(stream)
+{
+ "use strict";
+
+ @assert(@isWritableStreamLocked(stream))
+ @assert(@getByIdDirectPrivate(stream, "state") === "writable");
+
+ const writePromiseCapability = @newPromiseCapability(@Promise);
+ const writeRequests = @getByIdDirectPrivate(stream, "writeRequests");
+ writeRequests.push(writePromiseCapability);
+ return writePromiseCapability.@promise;
+}
+
+function writableStreamCloseQueuedOrInFlight(stream)
+{
+ "use strict";
+
+ return @getByIdDirectPrivate(stream, "closeRequest") !== @undefined || @getByIdDirectPrivate(stream, "inFlightCloseRequest") !== @undefined;
+}
+
+function writableStreamDealWithRejection(stream, error)
+{
+ "use strict";
+
+ const state = @getByIdDirectPrivate(stream, "state");
+ if (state === "writable") {
+ @writableStreamStartErroring(stream, error);
+ return;
+ }
+
+ @assert(state === "erroring");
+ @writableStreamFinishErroring(stream);
+}
+
+function writableStreamFinishErroring(stream)
+{
+ "use strict";
+
+ @assert(@getByIdDirectPrivate(stream, "state") === "erroring");
+ @assert(!@writableStreamHasOperationMarkedInFlight(stream));
+
+ @putByIdDirectPrivate(stream, "state", "errored");
+
+ const controller = @getByIdDirectPrivate(stream, "controller");
+ @getByIdDirectPrivate(controller, "errorSteps").@call();
+
+ const storedError = @getByIdDirectPrivate(stream, "storedError");
+ const requests = @getByIdDirectPrivate(stream, "writeRequests");
+ for (var request = requests.shift(); request; request = requests.shift())
+ request.@reject.@call(@undefined, storedError);
+
+ // TODO: is this still necessary?
+ @putByIdDirectPrivate(stream, "writeRequests", @createFIFO());
+
+ const abortRequest = @getByIdDirectPrivate(stream, "pendingAbortRequest");
+ if (abortRequest === @undefined) {
+ @writableStreamRejectCloseAndClosedPromiseIfNeeded(stream);
+ return;
+ }
+
+ @putByIdDirectPrivate(stream, "pendingAbortRequest", @undefined);
+ if (abortRequest.wasAlreadyErroring) {
+ abortRequest.promise.@reject.@call(@undefined, storedError);
+ @writableStreamRejectCloseAndClosedPromiseIfNeeded(stream);
+ return;
+ }
+
+ @getByIdDirectPrivate(controller, "abortSteps").@call(@undefined, abortRequest.reason).@then(() => {
+ abortRequest.promise.@resolve.@call();
+ @writableStreamRejectCloseAndClosedPromiseIfNeeded(stream);
+ }, (reason) => {
+ abortRequest.promise.@reject.@call(@undefined, reason);
+ @writableStreamRejectCloseAndClosedPromiseIfNeeded(stream);
+ });
+}
+
+function writableStreamFinishInFlightClose(stream)
+{
+ "use strict";
+
+ const inFlightCloseRequest = @getByIdDirectPrivate(stream, "inFlightCloseRequest");
+ inFlightCloseRequest.@resolve.@call();
+
+ @putByIdDirectPrivate(stream, "inFlightCloseRequest", @undefined);
+
+ const state = @getByIdDirectPrivate(stream, "state");
+ @assert(state === "writable" || state === "erroring");
+
+ if (state === "erroring") {
+ @putByIdDirectPrivate(stream, "storedError", @undefined);
+ const abortRequest = @getByIdDirectPrivate(stream, "pendingAbortRequest");
+ if (abortRequest !== @undefined) {
+ abortRequest.promise.@resolve.@call();
+ @putByIdDirectPrivate(stream, "pendingAbortRequest", @undefined);
+ }
+ }
+
+ @putByIdDirectPrivate(stream, "state", "closed");
+
+ const writer = @getByIdDirectPrivate(stream, "writer");
+ if (writer !== @undefined)
+ @getByIdDirectPrivate(writer, "closedPromise").@resolve.@call();
+
+ @assert(@getByIdDirectPrivate(stream, "pendingAbortRequest") === @undefined);
+ @assert(@getByIdDirectPrivate(stream, "storedError") === @undefined);
+}
+
+function writableStreamFinishInFlightCloseWithError(stream, error)
+{
+ "use strict";
+
+ const inFlightCloseRequest = @getByIdDirectPrivate(stream, "inFlightCloseRequest");
+ @assert(inFlightCloseRequest !== @undefined);
+ inFlightCloseRequest.@reject.@call(@undefined, error);
+
+ @putByIdDirectPrivate(stream, "inFlightCloseRequest", @undefined);
+
+ const state = @getByIdDirectPrivate(stream, "state");
+ @assert(state === "writable" || state === "erroring");
+
+ const abortRequest = @getByIdDirectPrivate(stream, "pendingAbortRequest");
+ if (abortRequest !== @undefined) {
+ abortRequest.promise.@reject.@call(@undefined, error);
+ @putByIdDirectPrivate(stream, "pendingAbortRequest", @undefined);
+ }
+
+ @writableStreamDealWithRejection(stream, error);
+}
+
+function writableStreamFinishInFlightWrite(stream)
+{
+ "use strict";
+
+ const inFlightWriteRequest = @getByIdDirectPrivate(stream, "inFlightWriteRequest");
+ @assert(inFlightWriteRequest !== @undefined);
+ inFlightWriteRequest.@resolve.@call();
+
+ @putByIdDirectPrivate(stream, "inFlightWriteRequest", @undefined);
+}
+
+function writableStreamFinishInFlightWriteWithError(stream, error)
+{
+ "use strict";
+
+ const inFlightWriteRequest = @getByIdDirectPrivate(stream, "inFlightWriteRequest");
+ @assert(inFlightWriteRequest !== @undefined);
+ inFlightWriteRequest.@reject.@call(@undefined, error);
+
+ @putByIdDirectPrivate(stream, "inFlightWriteRequest", @undefined);
+
+ const state = @getByIdDirectPrivate(stream, "state");
+ @assert(state === "writable" || state === "erroring");
+
+ @writableStreamDealWithRejection(stream, error);
+}
+
+function writableStreamHasOperationMarkedInFlight(stream)
+{
+ "use strict";
+
+ return @getByIdDirectPrivate(stream, "inFlightWriteRequest") !== @undefined || @getByIdDirectPrivate(stream, "inFlightCloseRequest") !== @undefined;
+}
+
+function writableStreamMarkCloseRequestInFlight(stream)
+{
+ "use strict";
+
+ const closeRequest = @getByIdDirectPrivate(stream, "closeRequest");
+ @assert(@getByIdDirectPrivate(stream, "inFlightCloseRequest") === @undefined);
+ @assert(closeRequest !== @undefined);
+
+ @putByIdDirectPrivate(stream, "inFlightCloseRequest", closeRequest);
+ @putByIdDirectPrivate(stream, "closeRequest", @undefined);
+}
+
+function writableStreamMarkFirstWriteRequestInFlight(stream)
+{
+ "use strict";
+
+ const writeRequests = @getByIdDirectPrivate(stream, "writeRequests");
+ @assert(@getByIdDirectPrivate(stream, "inFlightWriteRequest") === @undefined);
+ @assert(writeRequests.isNotEmpty());
+
+ const writeRequest = writeRequests.shift();
+ @putByIdDirectPrivate(stream, "inFlightWriteRequest", writeRequest);
+}
+
+function writableStreamRejectCloseAndClosedPromiseIfNeeded(stream)
+{
+ "use strict";
+
+ @assert(@getByIdDirectPrivate(stream, "state") === "errored");
+
+ const storedError = @getByIdDirectPrivate(stream, "storedError");
+
+ const closeRequest = @getByIdDirectPrivate(stream, "closeRequest");
+ if (closeRequest !== @undefined) {
+ @assert(@getByIdDirectPrivate(stream, "inFlightCloseRequest") === @undefined);
+ closeRequest.@reject.@call(@undefined, storedError);
+ @putByIdDirectPrivate(stream, "closeRequest", @undefined);
+ }
+
+ const writer = @getByIdDirectPrivate(stream, "writer");
+ if (writer !== @undefined) {
+ const closedPromise = @getByIdDirectPrivate(writer, "closedPromise");
+ closedPromise.@reject.@call(@undefined, storedError);
+ @markPromiseAsHandled(closedPromise.@promise);
+ }
+}
+
+function writableStreamStartErroring(stream, reason)
+{
+ "use strict";
+
+ @assert(@getByIdDirectPrivate(stream, "storedError") === @undefined);
+ @assert(@getByIdDirectPrivate(stream, "state") === "writable");
+
+ const controller = @getByIdDirectPrivate(stream, "controller");
+ @assert(controller !== @undefined);
+
+ @putByIdDirectPrivate(stream, "state", "erroring");
+ @putByIdDirectPrivate(stream, "storedError", reason);
+
+ const writer = @getByIdDirectPrivate(stream, "writer");
+ if (writer !== @undefined)
+ @writableStreamDefaultWriterEnsureReadyPromiseRejected(writer, reason);
+
+ if (!@writableStreamHasOperationMarkedInFlight(stream) && @getByIdDirectPrivate(controller, "started") === 1)
+ @writableStreamFinishErroring(stream);
+}
+
+function writableStreamUpdateBackpressure(stream, backpressure)
+{
+ "use strict";
+ @assert(@getByIdDirectPrivate(stream, "state") === "writable");
+ @assert(!@writableStreamCloseQueuedOrInFlight(stream));
+
+ const writer = @getByIdDirectPrivate(stream, "writer");
+ if (writer !== @undefined && backpressure !== @getByIdDirectPrivate(stream, "backpressure")) {
+ if (backpressure)
+ @putByIdDirectPrivate(writer, "readyPromise", @newPromiseCapability(@Promise));
+ else
+ @getByIdDirectPrivate(writer, "readyPromise").@resolve.@call();
+ }
+ @putByIdDirectPrivate(stream, "backpressure", backpressure);
+}
+
+function writableStreamDefaultWriterAbort(writer, reason)
+{
+ "use strict";
+ const stream = @getByIdDirectPrivate(writer, "stream");
+ @assert(stream !== @undefined);
+ return @writableStreamAbort(stream, reason);
+}
+
+function writableStreamDefaultWriterClose(writer)
+{
+ "use strict";
+ const stream = @getByIdDirectPrivate(writer, "stream");
+ @assert(stream !== @undefined);
+ return @writableStreamClose(stream);
+}
+
+function writableStreamDefaultWriterCloseWithErrorPropagation(writer)
+{
+ "use strict";
+ const stream = @getByIdDirectPrivate(writer, "stream");
+ @assert(stream !== @undefined);
+
+ const state = @getByIdDirectPrivate(stream, "state");
+
+ if (@writableStreamCloseQueuedOrInFlight(stream) || state === "closed")
+ return @Promise.@resolve();
+
+ if (state === "errored")
+ return @Promise.@reject(@getByIdDirectPrivate(stream, "storedError"));
+
+ @assert(state === "writable" || state === "erroring");
+ return @writableStreamDefaultWriterClose(writer);
+}
+
+function writableStreamDefaultWriterEnsureClosedPromiseRejected(writer, error)
+{
+ "use strict";
+ let closedPromiseCapability = @getByIdDirectPrivate(writer, "closedPromise");
+ let closedPromise = closedPromiseCapability.@promise;
+
+ if ((@getPromiseInternalField(closedPromise, @promiseFieldFlags) & @promiseStateMask) !== @promiseStatePending) {
+ closedPromiseCapability = @newPromiseCapability(@Promise);
+ closedPromise = closedPromiseCapability.@promise;
+ @putByIdDirectPrivate(writer, "closedPromise", closedPromiseCapability);
+ }
+
+ closedPromiseCapability.@reject.@call(@undefined, error);
+ @markPromiseAsHandled(closedPromise);
+}
+
+function writableStreamDefaultWriterEnsureReadyPromiseRejected(writer, error)
+{
+ "use strict";
+ let readyPromiseCapability = @getByIdDirectPrivate(writer, "readyPromise");
+ let readyPromise = readyPromiseCapability.@promise;
+
+ if ((@getPromiseInternalField(readyPromise, @promiseFieldFlags) & @promiseStateMask) !== @promiseStatePending) {
+ readyPromiseCapability = @newPromiseCapability(@Promise);
+ readyPromise = readyPromiseCapability.@promise;
+ @putByIdDirectPrivate(writer, "readyPromise", readyPromiseCapability);
+ }
+
+ readyPromiseCapability.@reject.@call(@undefined, error);
+ @markPromiseAsHandled(readyPromise);
+}
+
+function writableStreamDefaultWriterGetDesiredSize(writer)
+{
+ "use strict";
+ const stream = @getByIdDirectPrivate(writer, "stream");
+ @assert(stream !== @undefined);
+
+ const state = @getByIdDirectPrivate(stream, "state");
+
+ if (state === "errored" || state === "erroring")
+ return null;
+
+ if (state === "closed")
+ return 0;
+
+ return @writableStreamDefaultControllerGetDesiredSize(@getByIdDirectPrivate(stream, "controller"));
+}
+
+function writableStreamDefaultWriterRelease(writer)
+{
+ "use strict";
+
+ const stream = @getByIdDirectPrivate(writer, "stream");
+ @assert(stream !== @undefined);
+ @assert(@getByIdDirectPrivate(stream, "writer") === writer);
+
+ const releasedError = @makeTypeError("writableStreamDefaultWriterRelease");
+
+ @writableStreamDefaultWriterEnsureReadyPromiseRejected(writer, releasedError);
+ @writableStreamDefaultWriterEnsureClosedPromiseRejected(writer, releasedError);
+
+ @putByIdDirectPrivate(stream, "writer", @undefined);
+ @putByIdDirectPrivate(writer, "stream", @undefined);
+}
+
+function writableStreamDefaultWriterWrite(writer, chunk)
+{
+ "use strict";
+
+ const stream = @getByIdDirectPrivate(writer, "stream");
+ @assert(stream !== @undefined);
+
+ const controller = @getByIdDirectPrivate(stream, "controller");
+ @assert(controller !== @undefined);
+ const chunkSize = @writableStreamDefaultControllerGetChunkSize(controller, chunk);
+
+ if (stream !== @getByIdDirectPrivate(writer, "stream"))
+ return @Promise.@reject(@makeTypeError("writer is not stream's writer"));
+
+ const state = @getByIdDirectPrivate(stream, "state");
+ if (state === "errored")
+ return @Promise.@reject(@getByIdDirectPrivate(stream, "storedError"));
+
+ if (@writableStreamCloseQueuedOrInFlight(stream) || state === "closed")
+ return @Promise.@reject(@makeTypeError("stream is closing or closed"));
+
+ if (@writableStreamCloseQueuedOrInFlight(stream) || state === "closed")
+ return @Promise.@reject(@makeTypeError("stream is closing or closed"));
+
+ if (state === "erroring")
+ return @Promise.@reject(@getByIdDirectPrivate(stream, "storedError"));
+
+ @assert(state === "writable");
+
+ const promise = @writableStreamAddWriteRequest(stream);
+ @writableStreamDefaultControllerWrite(controller, chunk, chunkSize);
+ return promise;
+}
+
+function setUpWritableStreamDefaultController(stream, controller, startAlgorithm, writeAlgorithm, closeAlgorithm, abortAlgorithm, highWaterMark, sizeAlgorithm)
+{
+ "use strict";
+
+ @assert(@isWritableStream(stream));
+ @assert(@getByIdDirectPrivate(stream, "controller") === @undefined);
+
+ @putByIdDirectPrivate(controller, "stream", stream);
+ @putByIdDirectPrivate(stream, "controller", controller);
+
+ @resetQueue(@getByIdDirectPrivate(controller, "queue"));
+
+ @putByIdDirectPrivate(controller, "started", -1);
+ @putByIdDirectPrivate(controller, "startAlgorithm", startAlgorithm);
+ @putByIdDirectPrivate(controller, "strategySizeAlgorithm", sizeAlgorithm);
+ @putByIdDirectPrivate(controller, "strategyHWM", highWaterMark);
+ @putByIdDirectPrivate(controller, "writeAlgorithm", writeAlgorithm);
+ @putByIdDirectPrivate(controller, "closeAlgorithm", closeAlgorithm);
+ @putByIdDirectPrivate(controller, "abortAlgorithm", abortAlgorithm);
+
+ const backpressure = @writableStreamDefaultControllerGetBackpressure(controller);
+ @writableStreamUpdateBackpressure(stream, backpressure);
+
+ @writableStreamDefaultControllerStart(controller);
+}
+
+function writableStreamDefaultControllerStart(controller) {
+ "use strict";
+
+ if (@getByIdDirectPrivate(controller, "started") !== -1)
+ return;
+
+ @putByIdDirectPrivate(controller, "started", 0);
+
+ const startAlgorithm = @getByIdDirectPrivate(controller, "startAlgorithm");
+ @putByIdDirectPrivate(controller, "startAlgorithm", @undefined);
+ const stream = @getByIdDirectPrivate(controller, "stream");
+ return @Promise.@resolve(startAlgorithm.@call()).@then(() => {
+ const state = @getByIdDirectPrivate(stream, "state");
+ @assert(state === "writable" || state === "erroring");
+ @putByIdDirectPrivate(controller, "started", 1);
+ @writableStreamDefaultControllerAdvanceQueueIfNeeded(controller);
+ }, (error) => {
+ const state = @getByIdDirectPrivate(stream, "state");
+ @assert(state === "writable" || state === "erroring");
+ @putByIdDirectPrivate(controller, "started", 1);
+ @writableStreamDealWithRejection(stream, error);
+ });
+}
+
+function setUpWritableStreamDefaultControllerFromUnderlyingSink(stream, underlyingSink, underlyingSinkDict, highWaterMark, sizeAlgorithm)
+{
+ "use strict";
+ const controller = new @WritableStreamDefaultController();
+
+ let startAlgorithm = () => { };
+ let writeAlgorithm = () => { return @Promise.@resolve(); };
+ let closeAlgorithm = () => { return @Promise.@resolve(); };
+ let abortAlgorithm = () => { return @Promise.@resolve(); };
+
+ if ("start" in underlyingSinkDict) {
+ const startMethod = underlyingSinkDict["start"];
+ startAlgorithm = () => @promiseInvokeOrNoopMethodNoCatch(underlyingSink, startMethod, [controller]);
+ }
+ if ("write" in underlyingSinkDict) {
+ const writeMethod = underlyingSinkDict["write"];
+ writeAlgorithm = (chunk) => @promiseInvokeOrNoopMethod(underlyingSink, writeMethod, [chunk, controller]);
+ }
+ if ("close" in underlyingSinkDict) {
+ const closeMethod = underlyingSinkDict["close"];
+ closeAlgorithm = () => @promiseInvokeOrNoopMethod(underlyingSink, closeMethod, []);
+ }
+ if ("abort" in underlyingSinkDict) {
+ const abortMethod = underlyingSinkDict["abort"];
+ abortAlgorithm = (reason) => @promiseInvokeOrNoopMethod(underlyingSink, abortMethod, [reason]);
+ }
+
+ @setUpWritableStreamDefaultController(stream, controller, startAlgorithm, writeAlgorithm, closeAlgorithm, abortAlgorithm, highWaterMark, sizeAlgorithm);
+}
+
+function writableStreamDefaultControllerAdvanceQueueIfNeeded(controller)
+{
+ "use strict";
+ const stream = @getByIdDirectPrivate(controller, "stream");
+
+ if (@getByIdDirectPrivate(controller, "started") !== 1)
+ return;
+
+ @assert(stream !== @undefined);
+ if (@getByIdDirectPrivate(stream, "inFlightWriteRequest") !== @undefined)
+ return;
+
+ const state = @getByIdDirectPrivate(stream, "state");
+ @assert(state !== "closed" || state !== "errored");
+ if (state === "erroring") {
+ @writableStreamFinishErroring(stream);
+ return;
+ }
+
+ if (@getByIdDirectPrivate(controller, "queue").content?.isEmpty() ?? false)
+ return;
+
+ const value = @peekQueueValue(@getByIdDirectPrivate(controller, "queue"));
+ if (value === @isCloseSentinel)
+ @writableStreamDefaultControllerProcessClose(controller);
+ else
+ @writableStreamDefaultControllerProcessWrite(controller, value);
+}
+
+function isCloseSentinel()
+{
+}
+
+function writableStreamDefaultControllerClearAlgorithms(controller)
+{
+ "use strict";
+ @putByIdDirectPrivate(controller, "writeAlgorithm", @undefined);
+ @putByIdDirectPrivate(controller, "closeAlgorithm", @undefined);
+ @putByIdDirectPrivate(controller, "abortAlgorithm", @undefined);
+ @putByIdDirectPrivate(controller, "strategySizeAlgorithm", @undefined);
+}
+
+function writableStreamDefaultControllerClose(controller)
+{
+ "use strict";
+ @enqueueValueWithSize(@getByIdDirectPrivate(controller, "queue"), @isCloseSentinel, 0);
+ @writableStreamDefaultControllerAdvanceQueueIfNeeded(controller);
+}
+
+function writableStreamDefaultControllerError(controller, error)
+{
+ "use strict";
+ const stream = @getByIdDirectPrivate(controller, "stream");
+ @assert(stream !== @undefined);
+ @assert(@getByIdDirectPrivate(stream, "state") === "writable");
+
+ @writableStreamDefaultControllerClearAlgorithms(controller);
+ @writableStreamStartErroring(stream, error);
+}
+
+function writableStreamDefaultControllerErrorIfNeeded(controller, error)
+{
+ "use strict";
+ const stream = @getByIdDirectPrivate(controller, "stream");
+ if (@getByIdDirectPrivate(stream, "state") === "writable")
+ @writableStreamDefaultControllerError(controller, error);
+}
+
+function writableStreamDefaultControllerGetBackpressure(controller)
+{
+ "use strict";
+ const desiredSize = @writableStreamDefaultControllerGetDesiredSize(controller);
+ return desiredSize <= 0;
+}
+
+function writableStreamDefaultControllerGetChunkSize(controller, chunk)
+{
+ "use strict";
+ try {
+ return @getByIdDirectPrivate(controller, "strategySizeAlgorithm").@call(@undefined, chunk);
+ } catch (e) {
+ @writableStreamDefaultControllerErrorIfNeeded(controller, e);
+ return 1;
+ }
+}
+
+function writableStreamDefaultControllerGetDesiredSize(controller)
+{
+ "use strict";
+ return @getByIdDirectPrivate(controller, "strategyHWM") - @getByIdDirectPrivate(controller, "queue").size;
+}
+
+function writableStreamDefaultControllerProcessClose(controller)
+{
+ "use strict";
+ const stream = @getByIdDirectPrivate(controller, "stream");
+
+ @writableStreamMarkCloseRequestInFlight(stream);
+ @dequeueValue(@getByIdDirectPrivate(controller, "queue"));
+
+ @assert(@getByIdDirectPrivate(controller, "queue").content?.isEmpty());
+
+ const sinkClosePromise = @getByIdDirectPrivate(controller, "closeAlgorithm").@call();
+ @writableStreamDefaultControllerClearAlgorithms(controller);
+
+ sinkClosePromise.@then(() => {
+ @writableStreamFinishInFlightClose(stream);
+ }, (reason) => {
+ @writableStreamFinishInFlightCloseWithError(stream, reason);
+ });
+}
+
+function writableStreamDefaultControllerProcessWrite(controller, chunk)
+{
+ "use strict";
+ const stream = @getByIdDirectPrivate(controller, "stream");
+
+ @writableStreamMarkFirstWriteRequestInFlight(stream);
+
+ const sinkWritePromise = @getByIdDirectPrivate(controller, "writeAlgorithm").@call(@undefined, chunk);
+
+ sinkWritePromise.@then(() => {
+ @writableStreamFinishInFlightWrite(stream);
+ const state = @getByIdDirectPrivate(stream, "state");
+ @assert(state === "writable" || state === "erroring");
+
+ @dequeueValue(@getByIdDirectPrivate(controller, "queue"));
+ if (!@writableStreamCloseQueuedOrInFlight(stream) && state === "writable") {
+ const backpressure = @writableStreamDefaultControllerGetBackpressure(controller);
+ @writableStreamUpdateBackpressure(stream, backpressure);
+ }
+ @writableStreamDefaultControllerAdvanceQueueIfNeeded(controller);
+ }, (reason) => {
+ const state = @getByIdDirectPrivate(stream, "state");
+ if (state === "writable")
+ @writableStreamDefaultControllerClearAlgorithms(controller);
+
+ @writableStreamFinishInFlightWriteWithError(stream, reason);
+ });
+}
+
+function writableStreamDefaultControllerWrite(controller, chunk, chunkSize)
+{
+ "use strict";
+ try {
+ @enqueueValueWithSize(@getByIdDirectPrivate(controller, "queue"), chunk, chunkSize);
+
+ const stream = @getByIdDirectPrivate(controller, "stream");
+
+ const state = @getByIdDirectPrivate(stream, "state");
+ if (!@writableStreamCloseQueuedOrInFlight(stream) && state === "writable") {
+ const backpressure = @writableStreamDefaultControllerGetBackpressure(controller);
+ @writableStreamUpdateBackpressure(stream, backpressure);
+ }
+ @writableStreamDefaultControllerAdvanceQueueIfNeeded(controller);
+ } catch (e) {
+ @writableStreamDefaultControllerErrorIfNeeded(controller, e);
+ }
+}