aboutsummaryrefslogtreecommitdiff
path: root/src/bun.js/builtins/ts
diff options
context:
space:
mode:
authorGravatar dave caruso <me@paperdave.net> 2023-05-22 18:51:05 -0700
committerGravatar GitHub <noreply@github.com> 2023-05-22 18:51:05 -0700
commitfc40c690ea30a632a8d0d9490321c50ec898d8a5 (patch)
tree6e3ca0bb2c02347006a6b2a09c4aa156b86bd770 /src/bun.js/builtins/ts
parent23d42dc2377440dedc9d8e423f1ea077507d62c8 (diff)
downloadbun-fc40c690ea30a632a8d0d9490321c50ec898d8a5.tar.gz
bun-fc40c690ea30a632a8d0d9490321c50ec898d8a5.tar.zst
bun-fc40c690ea30a632a8d0d9490321c50ec898d8a5.zip
Write out builtins with TypeScript + Minify them (#2999)
* start work drafting how builtins will work * work on ts builtin * builtins stuff so far * builtins * done for today * continue work * working on it * bindings so far * well, it builds. doesnt run * IT RUNS * still lots of ts errors but it is functional * sloppy mode
Diffstat (limited to 'src/bun.js/builtins/ts')
-rw-r--r--src/bun.js/builtins/ts/BundlerPlugin.ts370
-rw-r--r--src/bun.js/builtins/ts/ByteLengthQueuingStrategy.ts42
-rw-r--r--src/bun.js/builtins/ts/ConsoleObject.ts84
-rw-r--r--src/bun.js/builtins/ts/CountQueuingStrategy.ts42
-rw-r--r--src/bun.js/builtins/ts/ImportMetaObject.ts152
-rw-r--r--src/bun.js/builtins/ts/JSBufferConstructor.ts67
-rw-r--r--src/bun.js/builtins/ts/JSBufferPrototype.ts495
-rw-r--r--src/bun.js/builtins/ts/ProcessObjectInternals.ts676
-rw-r--r--src/bun.js/builtins/ts/ReadableByteStreamController.ts93
-rw-r--r--src/bun.js/builtins/ts/ReadableByteStreamInternals.ts656
-rw-r--r--src/bun.js/builtins/ts/ReadableStream.ts416
-rw-r--r--src/bun.js/builtins/ts/ReadableStreamBYOBReader.ts80
-rw-r--r--src/bun.js/builtins/ts/ReadableStreamBYOBRequest.ts66
-rw-r--r--src/bun.js/builtins/ts/ReadableStreamDefaultController.ts63
-rw-r--r--src/bun.js/builtins/ts/ReadableStreamDefaultReader.ts185
-rw-r--r--src/bun.js/builtins/ts/ReadableStreamInternals.ts1801
-rw-r--r--src/bun.js/builtins/ts/StreamInternals.ts268
-rw-r--r--src/bun.js/builtins/ts/TransformStream.ts106
-rw-r--r--src/bun.js/builtins/ts/TransformStreamDefaultController.ts60
-rw-r--r--src/bun.js/builtins/ts/TransformStreamInternals.ts348
-rw-r--r--src/bun.js/builtins/ts/WritableStreamDefaultController.ts48
-rw-r--r--src/bun.js/builtins/ts/WritableStreamDefaultWriter.ts104
-rw-r--r--src/bun.js/builtins/ts/WritableStreamInternals.ts790
23 files changed, 7012 insertions, 0 deletions
diff --git a/src/bun.js/builtins/ts/BundlerPlugin.ts b/src/bun.js/builtins/ts/BundlerPlugin.ts
new file mode 100644
index 000000000..831a6614e
--- /dev/null
+++ b/src/bun.js/builtins/ts/BundlerPlugin.ts
@@ -0,0 +1,370 @@
+import type {
+ AnyFunction,
+ BuildConfig,
+ BunPlugin,
+ OnLoadCallback,
+ OnLoadResult,
+ OnLoadResultObject,
+ OnLoadResultSourceCode,
+ OnResolveCallback,
+ PluginBuilder,
+ PluginConstraints,
+} from "bun";
+
+// This API expects 4 functions:
+// It should be generic enough to reuse for Bun.plugin() eventually, too.
+interface BundlerPlugin {
+ onLoad: Map<string, [RegExp, OnLoadCallback][]>;
+ onResolve: Map<string, [RegExp, OnResolveCallback][]>;
+ onLoadAsync(
+ internalID,
+ sourceCode: string | Uint8Array | ArrayBuffer | DataView | null,
+ loaderKey: number | null,
+ ): void;
+ onResolveAsync(internalID, a, b, c): void;
+ addError(internalID, error, number): void;
+ addFilter(filter, namespace, number): void;
+}
+
+// Extra types
+type Setup = BunPlugin["setup"];
+type MinifyObj = Exclude<BuildConfig["minify"], boolean>;
+interface BuildConfigExt extends BuildConfig {
+ // we support esbuild-style entryPoints
+ entryPoints?: string[];
+ // plugins is guaranteed to not be null
+ plugins: BunPlugin[];
+}
+interface PluginBuilderExt extends PluginBuilder {
+ // these functions aren't implemented yet, so we dont publicly expose them
+ resolve: AnyFunction;
+ onStart: AnyFunction;
+ onEnd: AnyFunction;
+ onDispose: AnyFunction;
+ // we partially support initialOptions. it's read-only and a subset of
+ // all options mapped to their esbuild names
+ initialOptions: any;
+ // we set this to an empty object
+ esbuild: any;
+}
+
+export function runSetupFunction(this: BundlerPlugin, setup: Setup, config: BuildConfigExt) {
+ var onLoadPlugins = new Map<string, [RegExp, AnyFunction][]>();
+ var onResolvePlugins = new Map<string, [RegExp, AnyFunction][]>();
+
+ function validate(filterObject: PluginConstraints, callback, map) {
+ if (!filterObject || !$isObject(filterObject)) {
+ throw new TypeError('Expected an object with "filter" RegExp');
+ }
+
+ if (!callback || !$isCallable(callback)) {
+ throw new TypeError("callback must be a function");
+ }
+
+ var { filter, namespace = "file" } = filterObject;
+
+ if (!filter) {
+ throw new TypeError('Expected an object with "filter" RegExp');
+ }
+
+ if (!$isRegExpObject(filter)) {
+ throw new TypeError("filter must be a RegExp");
+ }
+
+ if (namespace && !(typeof namespace === "string")) {
+ throw new TypeError("namespace must be a string");
+ }
+
+ if ((namespace?.length ?? 0) === 0) {
+ namespace = "file";
+ }
+
+ if (!/^([/$a-zA-Z0-9_\\-]+)$/.test(namespace)) {
+ throw new TypeError("namespace can only contain $a-zA-Z0-9_\\-");
+ }
+
+ var callbacks = map.$get(namespace);
+
+ if (!callbacks) {
+ map.$set(namespace, [[filter, callback]]);
+ } else {
+ $arrayPush(callbacks, [filter, callback]);
+ }
+ }
+
+ function onLoad(filterObject, callback) {
+ validate(filterObject, callback, onLoadPlugins);
+ }
+
+ function onResolve(filterObject, callback) {
+ validate(filterObject, callback, onResolvePlugins);
+ }
+
+ const processSetupResult = () => {
+ var anyOnLoad = false,
+ anyOnResolve = false;
+
+ for (var [namespace, callbacks] of onLoadPlugins.entries()) {
+ for (var [filter] of callbacks) {
+ this.addFilter(filter, namespace, 1);
+ anyOnLoad = true;
+ }
+ }
+
+ for (var [namespace, callbacks] of onResolvePlugins.entries()) {
+ for (var [filter] of callbacks) {
+ this.addFilter(filter, namespace, 0);
+ anyOnResolve = true;
+ }
+ }
+
+ if (anyOnResolve) {
+ var onResolveObject = this.onResolve;
+ if (!onResolveObject) {
+ this.onResolve = onResolvePlugins;
+ } else {
+ for (var [namespace, callbacks] of onResolvePlugins.entries()) {
+ var existing = onResolveObject.$get(namespace) as [RegExp, AnyFunction][];
+
+ if (!existing) {
+ onResolveObject.$set(namespace, callbacks);
+ } else {
+ onResolveObject.$set(namespace, existing.concat(callbacks));
+ }
+ }
+ }
+ }
+
+ if (anyOnLoad) {
+ var onLoadObject = this.onLoad;
+ if (!onLoadObject) {
+ this.onLoad = onLoadPlugins;
+ } else {
+ for (var [namespace, callbacks] of onLoadPlugins.entries()) {
+ var existing = onLoadObject.$get(namespace) as [RegExp, AnyFunction][];
+
+ if (!existing) {
+ onLoadObject.$set(namespace, callbacks);
+ } else {
+ onLoadObject.$set(namespace, existing.concat(callbacks));
+ }
+ }
+ }
+ }
+
+ return anyOnLoad || anyOnResolve;
+ };
+
+ var setupResult = setup({
+ config: config,
+ onDispose: notImplementedIssueFn(2771, "On-dispose callbacks"),
+ onEnd: notImplementedIssueFn(2771, "On-end callbacks"),
+ onLoad,
+ onResolve,
+ onStart: notImplementedIssueFn(2771, "On-start callbacks"),
+ resolve: notImplementedIssueFn(2771, "build.resolve()"),
+ // esbuild's options argument is different, we provide some interop
+ initialOptions: {
+ ...config,
+ bundle: true,
+ entryPoints: config.entrypoints ?? config.entryPoints ?? [],
+ minify: typeof config.minify === "boolean" ? config.minify : false,
+ minifyIdentifiers: config.minify === true || (config.minify as MinifyObj)?.identifiers,
+ minifyWhitespace: config.minify === true || (config.minify as MinifyObj)?.whitespace,
+ minifySyntax: config.minify === true || (config.minify as MinifyObj)?.syntax,
+ outbase: config.root,
+ platform: config.target === "bun" ? "node" : config.target,
+ },
+ esbuild: {},
+ } satisfies PluginBuilderExt as PluginBuilder);
+
+ if (setupResult && $isPromise(setupResult)) {
+ if ($getPromiseInternalField(setupResult, $promiseFieldFlags) & $promiseStateFulfilled) {
+ setupResult = $getPromiseInternalField(setupResult, $promiseFieldReactionsOrResult);
+ } else {
+ return setupResult.$then(processSetupResult);
+ }
+ }
+
+ return processSetupResult();
+}
+
+export function runOnResolvePlugins(this: BundlerPlugin, specifier, inputNamespace, importer, internalID, kindId) {
+ // Must be kept in sync with ImportRecord.label
+ const kind = $ImportKindIdToLabel[kindId];
+
+ var promiseResult: any = (async (inputPath, inputNamespace, importer, kind) => {
+ var { onResolve, onLoad } = this;
+ var results = onResolve.$get(inputNamespace);
+ if (!results) {
+ this.onResolveAsync(internalID, null, null, null);
+ return null;
+ }
+
+ for (let [filter, callback] of results) {
+ if (filter.test(inputPath)) {
+ var result = callback({
+ path: inputPath,
+ importer,
+ namespace: inputNamespace,
+ // resolveDir
+ kind,
+ // pluginData
+ });
+
+ while (
+ result &&
+ $isPromise(result) &&
+ ($getPromiseInternalField(result, $promiseFieldFlags) & $promiseStateMask) === $promiseStateFulfilled
+ ) {
+ result = $getPromiseInternalField(result, $promiseFieldReactionsOrResult);
+ }
+
+ if (result && $isPromise(result)) {
+ result = await result;
+ }
+
+ if (!result || !$isObject(result)) {
+ continue;
+ }
+
+ var { path, namespace: userNamespace = inputNamespace, external } = result;
+ if (!(typeof path === "string") || !(typeof userNamespace === "string")) {
+ throw new TypeError("onResolve plugins must return an object with a string 'path' and string 'loader' field");
+ }
+
+ if (!path) {
+ continue;
+ }
+
+ if (!userNamespace) {
+ userNamespace = inputNamespace;
+ }
+ if (typeof external !== "boolean" && !$isUndefinedOrNull(external)) {
+ throw new TypeError('onResolve plugins "external" field must be boolean or unspecified');
+ }
+
+ if (!external) {
+ if (userNamespace === "file") {
+ if (process.platform !== "win32") {
+ if (path[0] !== "/" || path.includes("..")) {
+ throw new TypeError('onResolve plugin "path" must be absolute when the namespace is "file"');
+ }
+ } else {
+ // TODO: Windows
+ }
+ }
+ if (userNamespace === "dataurl") {
+ if (!path.startsWith("data:")) {
+ throw new TypeError('onResolve plugin "path" must start with "data:" when the namespace is "dataurl"');
+ }
+ }
+
+ if (userNamespace && userNamespace !== "file" && (!onLoad || !onLoad.$has(userNamespace))) {
+ throw new TypeError(`Expected onLoad plugin for namespace ${userNamespace} to exist`);
+ }
+ }
+ this.onResolveAsync(internalID, path, userNamespace, external);
+ return null;
+ }
+ }
+
+ this.onResolveAsync(internalID, null, null, null);
+ return null;
+ })(specifier, inputNamespace, importer, kind);
+
+ while (
+ promiseResult &&
+ $isPromise(promiseResult) &&
+ ($getPromiseInternalField(promiseResult, $promiseFieldFlags) & $promiseStateMask) === $promiseStateFulfilled
+ ) {
+ promiseResult = $getPromiseInternalField(promiseResult, $promiseFieldReactionsOrResult);
+ }
+
+ if (promiseResult && $isPromise(promiseResult)) {
+ promiseResult.then(
+ () => {},
+ e => {
+ this.addError(internalID, e, 0);
+ },
+ );
+ }
+}
+
+export function runOnLoadPlugins(this: BundlerPlugin, internalID, path, namespace, defaultLoaderId) {
+ const LOADERS_MAP = $LoaderLabelToId;
+ const loaderName = $LoaderIdToLabel[defaultLoaderId];
+
+ var promiseResult = (async (internalID, path, namespace, defaultLoader) => {
+ var results = this.onLoad.$get(namespace);
+ if (!results) {
+ this.onLoadAsync(internalID, null, null);
+ return null;
+ }
+
+ for (let [filter, callback] of results) {
+ if (filter.test(path)) {
+ var result = callback({
+ path,
+ namespace,
+ // suffix
+ // pluginData
+ loader: defaultLoader,
+ });
+
+ while (
+ result &&
+ $isPromise(result) &&
+ ($getPromiseInternalField(result, $promiseFieldFlags) & $promiseStateMask) === $promiseStateFulfilled
+ ) {
+ result = $getPromiseInternalField(result, $promiseFieldReactionsOrResult);
+ }
+
+ if (result && $isPromise(result)) {
+ result = await result;
+ }
+
+ if (!result || !$isObject(result)) {
+ continue;
+ }
+
+ var { contents, loader = defaultLoader } = result as OnLoadResultSourceCode & OnLoadResultObject;
+ if (!(typeof contents === "string") && !$isTypedArrayView(contents)) {
+ throw new TypeError('onLoad plugins must return an object with "contents" as a string or Uint8Array');
+ }
+
+ if (!(typeof loader === "string")) {
+ throw new TypeError('onLoad plugins must return an object with "loader" as a string');
+ }
+
+ const chosenLoader = LOADERS_MAP[loader];
+ if (chosenLoader === undefined) {
+ throw new TypeError(`Loader ${loader} is not supported.`);
+ }
+
+ this.onLoadAsync(internalID, contents, chosenLoader);
+ return null;
+ }
+ }
+
+ this.onLoadAsync(internalID, null, null);
+ return null;
+ })(internalID, path, namespace, loaderName);
+
+ while (
+ promiseResult &&
+ $isPromise(promiseResult) &&
+ ($getPromiseInternalField(promiseResult, $promiseFieldFlags) & $promiseStateMask) === $promiseStateFulfilled
+ ) {
+ promiseResult = $getPromiseInternalField(promiseResult, $promiseFieldReactionsOrResult);
+ }
+
+ if (promiseResult && $isPromise(promiseResult)) {
+ promiseResult.then(
+ () => {},
+ e => {
+ this.addError(internalID, e, 1);
+ },
+ );
+ }
+}
diff --git a/src/bun.js/builtins/ts/ByteLengthQueuingStrategy.ts b/src/bun.js/builtins/ts/ByteLengthQueuingStrategy.ts
new file mode 100644
index 000000000..fc3f3d998
--- /dev/null
+++ b/src/bun.js/builtins/ts/ByteLengthQueuingStrategy.ts
@@ -0,0 +1,42 @@
+/*
+ * Copyright (C) 2015 Canon Inc.
+ * Copyright (C) 2015 Igalia S.L.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ * 1. Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY APPLE INC. ``AS IS'' AND ANY
+ * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+ * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL APPLE INC. OR
+ * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
+ * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
+ * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
+ * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY
+ * OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+$getter;
+export function highWaterMark(this: any) {
+ const highWaterMark = $getByIdDirectPrivate(this, "highWaterMark");
+ if (highWaterMark === undefined)
+ throw new TypeError("ByteLengthQueuingStrategy.highWaterMark getter called on incompatible |this| value.");
+
+ return highWaterMark;
+}
+
+export function size(chunk) {
+ return chunk.byteLength;
+}
+
+export function initializeByteLengthQueuingStrategy(this: any, parameters: any) {
+ $putByIdDirectPrivate(this, "highWaterMark", $extractHighWaterMarkFromQueuingStrategyInit(parameters));
+}
diff --git a/src/bun.js/builtins/ts/ConsoleObject.ts b/src/bun.js/builtins/ts/ConsoleObject.ts
new file mode 100644
index 000000000..45746459a
--- /dev/null
+++ b/src/bun.js/builtins/ts/ConsoleObject.ts
@@ -0,0 +1,84 @@
+$overriddenName = "[Symbol.asyncIterator]";
+export function asyncIterator(this: Console) {
+ const Iterator = async function* ConsoleAsyncIterator() {
+ const stream = Bun.stdin.stream();
+ var reader = stream.getReader();
+
+ // TODO: use builtin
+ var decoder = new (globalThis as any).TextDecoder("utf-8", { fatal: false }) as TextDecoder;
+ var deferredError;
+ var indexOf = Bun.indexOfLine;
+
+ try {
+ while (true) {
+ var done, value;
+ var pendingChunk;
+ const firstResult = reader.readMany();
+ if ($isPromise(firstResult)) {
+ ({ done, value } = await firstResult);
+ } else {
+ ({ done, value } = firstResult);
+ }
+
+ if (done) {
+ if (pendingChunk) {
+ yield decoder.decode(pendingChunk);
+ }
+ return;
+ }
+
+ var actualChunk;
+ // we assume it was given line-by-line
+ for (const chunk of value) {
+ actualChunk = chunk;
+ if (pendingChunk) {
+ actualChunk = Buffer.concat([pendingChunk, chunk]);
+ pendingChunk = null;
+ }
+
+ var last = 0;
+ // TODO: "\r", 0x4048, 0x4049, 0x404A, 0x404B, 0x404C, 0x404D, 0x404E, 0x404F
+ var i = indexOf(actualChunk, last);
+ while (i !== -1) {
+ yield decoder.decode(actualChunk.subarray(last, i));
+ last = i + 1;
+ i = indexOf(actualChunk, last);
+ }
+
+ pendingChunk = actualChunk.subarray(last);
+ }
+ }
+ } catch (e) {
+ deferredError = e;
+ } finally {
+ reader.releaseLock();
+
+ if (deferredError) {
+ throw deferredError;
+ }
+ }
+ };
+
+ const symbol = globalThis.Symbol.asyncIterator;
+ this[symbol] = Iterator;
+ return Iterator();
+}
+
+export function write(this: Console, input) {
+ var writer = $getByIdDirectPrivate(this, "writer");
+ if (!writer) {
+ var length = $toLength(input?.length ?? 0);
+ writer = Bun.stdout.writer({ highWaterMark: length > 65536 ? length : 65536 });
+ $putByIdDirectPrivate(this, "writer", writer);
+ }
+
+ var wrote = writer.write(input);
+
+ const count = $argumentCount();
+ for (var i = 1; i < count; i++) {
+ wrote += writer.write($argument(i));
+ }
+
+ writer.flush(true);
+ return wrote;
+}
diff --git a/src/bun.js/builtins/ts/CountQueuingStrategy.ts b/src/bun.js/builtins/ts/CountQueuingStrategy.ts
new file mode 100644
index 000000000..a72dca1ca
--- /dev/null
+++ b/src/bun.js/builtins/ts/CountQueuingStrategy.ts
@@ -0,0 +1,42 @@
+/*
+ * Copyright (C) 2015 Canon Inc.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ * 1. Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY APPLE INC. ``AS IS'' AND ANY
+ * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+ * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL APPLE INC. OR
+ * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
+ * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
+ * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
+ * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY
+ * OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+$getter;
+export function highWaterMark(this: any) {
+ const highWaterMark = $getByIdDirectPrivate(this, "highWaterMark");
+
+ if (highWaterMark === undefined)
+ throw new TypeError("CountQueuingStrategy.highWaterMark getter called on incompatible |this| value.");
+
+ return highWaterMark;
+}
+
+export function size() {
+ return 1;
+}
+
+export function initializeCountQueuingStrategy(this: any, parameters: any) {
+ $putByIdDirectPrivate(this, "highWaterMark", $extractHighWaterMarkFromQueuingStrategyInit(parameters));
+}
diff --git a/src/bun.js/builtins/ts/ImportMetaObject.ts b/src/bun.js/builtins/ts/ImportMetaObject.ts
new file mode 100644
index 000000000..6c66075c6
--- /dev/null
+++ b/src/bun.js/builtins/ts/ImportMetaObject.ts
@@ -0,0 +1,152 @@
+type ImportMetaObject = Partial<ImportMeta>;
+
+export function loadCJS2ESM(this: ImportMetaObject, resolvedSpecifier: string) {
+ var loader = Loader;
+ var queue = $createFIFO();
+ var key = resolvedSpecifier;
+ while (key) {
+ // we need to explicitly check because state could be $ModuleFetch
+ // it will throw this error if we do not:
+ // $throwTypeError("Requested module is already fetched.");
+ var entry = loader.registry.$get(key);
+
+ if (!entry || !entry.state || entry.state <= $ModuleFetch) {
+ $fulfillModuleSync(key);
+ entry = loader.registry.$get(key)!;
+ }
+
+ // entry.fetch is a Promise<SourceCode>
+ // SourceCode is not a string, it's a JSC::SourceCode object
+ // this pulls it out of the promise without delaying by a tick
+ // the promise is already fullfilled by $fullfillModuleSync
+ var sourceCodeObject = $getPromiseInternalField(entry.fetch, $promiseFieldReactionsOrResult);
+ // parseModule() returns a Promise, but the value is already fulfilled
+ // so we just pull it out of the promise here once again
+ // But, this time we do it a little more carefully because this is a JSC function call and not bun source code
+ var moduleRecordPromise = loader.parseModule(key, sourceCodeObject);
+ var module = entry.module;
+ if (!module && moduleRecordPromise && $isPromise(moduleRecordPromise)) {
+ var reactionsOrResult = $getPromiseInternalField(moduleRecordPromise, $promiseFieldReactionsOrResult);
+ var flags = $getPromiseInternalField(moduleRecordPromise, $promiseFieldFlags);
+ var state = flags & $promiseStateMask;
+ // this branch should never happen, but just to be safe
+ if (state === $promiseStatePending || (reactionsOrResult && $isPromise(reactionsOrResult))) {
+ throw new TypeError(`require() async module "${key}" is unsupported`);
+ } else if (state === $promiseStateRejected) {
+ // TODO: use SyntaxError but preserve the specifier
+ throw new TypeError(`${reactionsOrResult?.message ?? "An error occurred"} while parsing module \"${key}\"`);
+ }
+ entry.module = module = reactionsOrResult;
+ } else if (moduleRecordPromise && !module) {
+ entry.module = module = moduleRecordPromise as LoaderModule;
+ }
+
+ // This is very similar to "requestInstantiate" in ModuleLoader.js in JavaScriptCore.
+ $setStateToMax(entry, $ModuleLink);
+ var dependenciesMap = module.dependenciesMap;
+ var requestedModules = loader.requestedModules(module);
+ var dependencies = $newArrayWithSize<string>(requestedModules.length);
+ for (var i = 0, length = requestedModules.length; i < length; ++i) {
+ var depName = requestedModules[i];
+ // optimization: if it starts with a slash then it's an absolute path
+ // we don't need to run the resolver a 2nd time
+ var depKey = depName[0] === "/" ? depName : loader.resolve(depName, key);
+ var depEntry = loader.ensureRegistered(depKey);
+ if (depEntry.state < $ModuleLink) {
+ queue.push(depKey);
+ }
+
+ $putByValDirect(dependencies, i, depEntry);
+ dependenciesMap.$set(depName, depEntry);
+ }
+
+ entry.dependencies = dependencies;
+ // All dependencies resolved, set instantiate and satisfy field directly.
+ entry.instantiate = Promise.resolve(entry);
+ entry.satisfy = Promise.resolve(entry);
+ key = queue.shift();
+ while (key && (loader.registry.$get(key)?.state ?? $ModuleFetch) >= $ModuleLink) {
+ key = queue.shift();
+ }
+ }
+
+ var linkAndEvaluateResult = loader.linkAndEvaluateModule(resolvedSpecifier, undefined);
+ if (linkAndEvaluateResult && $isPromise(linkAndEvaluateResult)) {
+ // if you use top-level await, or any dependencies use top-level await, then we throw here
+ // this means the module will still actually load eventually, but that's okay.
+ throw new TypeError(`require() async module \"${resolvedSpecifier}\" is unsupported`);
+ }
+
+ return loader.registry.$get(resolvedSpecifier);
+}
+
+export function requireESM(this: ImportMetaObject, resolved) {
+ var entry = Loader.registry.$get(resolved);
+
+ if (!entry || !entry.evaluated) {
+ entry = $loadCJS2ESM(resolved);
+ }
+
+ if (!entry || !entry.evaluated || !entry.module) {
+ throw new TypeError(`require() failed to evaluate module "${resolved}". This is an internal consistentency error.`);
+ }
+ var exports = Loader.getModuleNamespaceObject(entry.module);
+ var commonJS = exports.default;
+ var cjs = commonJS?.[$commonJSSymbol];
+ if (cjs === 0) {
+ return commonJS;
+ } else if (cjs && $isCallable(commonJS)) {
+ return commonJS();
+ }
+
+ return exports;
+}
+
+export function internalRequire(this: ImportMetaObject, resolved) {
+ var cached = $requireMap.$get(resolved);
+ const last5 = resolved.substring(resolved.length - 5);
+ if (cached) {
+ if (last5 === ".node") {
+ return cached.exports;
+ }
+ return cached;
+ }
+
+ // TODO: remove this hardcoding
+ if (last5 === ".json") {
+ var fs = (globalThis[Symbol.for("_fs")] ||= Bun.fs());
+ var exports = JSON.parse(fs.readFileSync(resolved, "utf8"));
+ $requireMap.$set(resolved, exports);
+ return exports;
+ } else if (last5 === ".node") {
+ var module = { exports: {} };
+ process.dlopen(module, resolved);
+ $requireMap.$set(resolved, module);
+ return module.exports;
+ } else if (last5 === ".toml") {
+ var fs = (globalThis[Symbol.for("_fs")] ||= Bun.fs());
+ var exports = Bun.TOML.parse(fs.readFileSync(resolved, "utf8"));
+ $requireMap.$set(resolved, exports);
+ return exports;
+ } else {
+ var exports = $requireESM(resolved);
+ $requireMap.$set(resolved, exports);
+ return exports;
+ }
+}
+
+$sloppy;
+export function require(this: ImportMetaObject, name) {
+ var from = this?.path ?? arguments.callee.path;
+
+ if (typeof name !== "string") {
+ throw new TypeError("require(name) must be a string");
+ }
+
+ return $internalRequire($resolveSync(name, from));
+}
+
+$getter;
+export function main(this: ImportMetaObject) {
+ return this.path === Bun.main;
+}
diff --git a/src/bun.js/builtins/ts/JSBufferConstructor.ts b/src/bun.js/builtins/ts/JSBufferConstructor.ts
new file mode 100644
index 000000000..debc62d51
--- /dev/null
+++ b/src/bun.js/builtins/ts/JSBufferConstructor.ts
@@ -0,0 +1,67 @@
+export function from(items) {
+ if ($isUndefinedOrNull(items)) {
+ throw new TypeError(
+ "The first argument must be one of type string, Buffer, ArrayBuffer, Array, or Array-like Object.",
+ );
+ }
+
+ // TODO: figure out why private symbol not found
+ if (
+ typeof items === "string" ||
+ (typeof items === "object" &&
+ ($isTypedArrayView(items) ||
+ items instanceof ArrayBuffer ||
+ items instanceof SharedArrayBuffer ||
+ items instanceof String))
+ ) {
+ switch ($argumentCount()) {
+ case 1: {
+ return new $Buffer(items);
+ }
+ case 2: {
+ return new $Buffer(items, $argument(1));
+ }
+ default: {
+ return new $Buffer(items, $argument(1), $argument(2));
+ }
+ }
+ }
+
+ var arrayLike = $toObject(
+ items,
+ "The first argument must be of type string or an instance of Buffer, ArrayBuffer, or Array or an Array-like Object.",
+ ) as ArrayLike<any>;
+
+ if (!$isJSArray(arrayLike)) {
+ const toPrimitive = $tryGetByIdWithWellKnownSymbol(items, "toPrimitive");
+
+ if (toPrimitive) {
+ const primitive = toPrimitive.$call(items, "string");
+
+ if (typeof primitive === "string") {
+ switch ($argumentCount()) {
+ case 1: {
+ return new $Buffer(primitive);
+ }
+ case 2: {
+ return new $Buffer(primitive, $argument(1));
+ }
+ default: {
+ return new $Buffer(primitive, $argument(1), $argument(2));
+ }
+ }
+ }
+ }
+
+ if (!("length" in arrayLike) || $isCallable(arrayLike)) {
+ throw new TypeError(
+ "The first argument must be of type string or an instance of Buffer, ArrayBuffer, or Array or an Array-like Object.",
+ );
+ }
+ }
+
+ // Don't pass the second argument because Node's Buffer.from doesn't accept
+ // a function and Uint8Array.from requires it if it exists
+ // That means we cannot use $tailCallFowrardArguments here, sadly
+ return new $Buffer(Uint8Array.from(arrayLike).buffer);
+}
diff --git a/src/bun.js/builtins/ts/JSBufferPrototype.ts b/src/bun.js/builtins/ts/JSBufferPrototype.ts
new file mode 100644
index 000000000..97b25b9b2
--- /dev/null
+++ b/src/bun.js/builtins/ts/JSBufferPrototype.ts
@@ -0,0 +1,495 @@
+// The fastest way as of April 2022 is to use DataView.
+// DataView has intrinsics that cause inlining
+
+interface BufferExt extends Buffer {
+ $dataView?: DataView;
+
+ toString(encoding?: BufferEncoding, start?: number, end?: number): string;
+ toString(offset: number, length: number, encoding?: BufferEncoding): string;
+}
+
+export function setBigUint64(this: BufferExt, offset, value, le) {
+ return (this.$dataView ||= new DataView(this.buffer, this.byteOffset, this.byteLength)).setBigUint64(
+ offset,
+ value,
+ le,
+ );
+}
+export function readInt8(this: BufferExt, offset) {
+ return (this.$dataView ||= new DataView(this.buffer, this.byteOffset, this.byteLength)).getInt8(offset);
+}
+export function readUInt8(this: BufferExt, offset) {
+ return (this.$dataView ||= new DataView(this.buffer, this.byteOffset, this.byteLength)).getUint8(offset);
+}
+export function readInt16LE(this: BufferExt, offset) {
+ return (this.$dataView ||= new DataView(this.buffer, this.byteOffset, this.byteLength)).getInt16(offset, true);
+}
+export function readInt16BE(this: BufferExt, offset) {
+ return (this.$dataView ||= new DataView(this.buffer, this.byteOffset, this.byteLength)).getInt16(offset, false);
+}
+export function readUInt16LE(this: BufferExt, offset) {
+ return (this.$dataView ||= new DataView(this.buffer, this.byteOffset, this.byteLength)).getUint16(offset, true);
+}
+export function readUInt16BE(this: BufferExt, offset) {
+ return (this.$dataView ||= new DataView(this.buffer, this.byteOffset, this.byteLength)).getUint16(offset, false);
+}
+export function readInt32LE(this: BufferExt, offset) {
+ return (this.$dataView ||= new DataView(this.buffer, this.byteOffset, this.byteLength)).getInt32(offset, true);
+}
+export function readInt32BE(this: BufferExt, offset) {
+ return (this.$dataView ||= new DataView(this.buffer, this.byteOffset, this.byteLength)).getInt32(offset, false);
+}
+export function readUInt32LE(this: BufferExt, offset) {
+ return (this.$dataView ||= new DataView(this.buffer, this.byteOffset, this.byteLength)).getUint32(offset, true);
+}
+export function readUInt32BE(this: BufferExt, offset) {
+ return (this.$dataView ||= new DataView(this.buffer, this.byteOffset, this.byteLength)).getUint32(offset, false);
+}
+
+export function readIntLE(this: BufferExt, offset, byteLength) {
+ const view = (this.$dataView ||= new DataView(this.buffer, this.byteOffset, this.byteLength));
+ switch (byteLength) {
+ case 1: {
+ return view.getInt8(offset);
+ }
+ case 2: {
+ return view.getInt16(offset, true);
+ }
+ case 3: {
+ const val = view.getUint16(offset, true) + view.getUint8(offset + 2) * 2 ** 16;
+ return val | ((val & (2 ** 23)) * 0x1fe);
+ }
+ case 4: {
+ return view.getInt32(offset, true);
+ }
+ case 5: {
+ const last = view.getUint8(offset + 4);
+ return (last | ((last & (2 ** 7)) * 0x1fffffe)) * 2 ** 32 + view.getUint32(offset, true);
+ }
+ case 6: {
+ const last = view.getUint16(offset + 4, true);
+ return (last | ((last & (2 ** 15)) * 0x1fffe)) * 2 ** 32 + view.getUint32(offset, true);
+ }
+ }
+ throw new RangeError("byteLength must be >= 1 and <= 6");
+}
+export function readIntBE(this: BufferExt, offset, byteLength) {
+ const view = (this.$dataView ||= new DataView(this.buffer, this.byteOffset, this.byteLength));
+ switch (byteLength) {
+ case 1: {
+ return view.getInt8(offset);
+ }
+ case 2: {
+ return view.getInt16(offset, false);
+ }
+ case 3: {
+ const val = view.getUint16(offset + 1, false) + view.getUint8(offset) * 2 ** 16;
+ return val | ((val & (2 ** 23)) * 0x1fe);
+ }
+ case 4: {
+ return view.getInt32(offset, false);
+ }
+ case 5: {
+ const last = view.getUint8(offset);
+ return (last | ((last & (2 ** 7)) * 0x1fffffe)) * 2 ** 32 + view.getUint32(offset + 1, false);
+ }
+ case 6: {
+ const last = view.getUint16(offset, false);
+ return (last | ((last & (2 ** 15)) * 0x1fffe)) * 2 ** 32 + view.getUint32(offset + 2, false);
+ }
+ }
+ throw new RangeError("byteLength must be >= 1 and <= 6");
+}
+export function readUIntLE(this: BufferExt, offset, byteLength) {
+ const view = (this.$dataView ||= new DataView(this.buffer, this.byteOffset, this.byteLength));
+ switch (byteLength) {
+ case 1: {
+ return view.getUint8(offset);
+ }
+ case 2: {
+ return view.getUint16(offset, true);
+ }
+ case 3: {
+ return view.getUint16(offset, true) + view.getUint8(offset + 2) * 2 ** 16;
+ }
+ case 4: {
+ return view.getUint32(offset, true);
+ }
+ case 5: {
+ return view.getUint8(offset + 4) * 2 ** 32 + view.getUint32(offset, true);
+ }
+ case 6: {
+ return view.getUint16(offset + 4, true) * 2 ** 32 + view.getUint32(offset, true);
+ }
+ }
+ throw new RangeError("byteLength must be >= 1 and <= 6");
+}
+export function readUIntBE(this: BufferExt, offset, byteLength) {
+ const view = (this.$dataView ||= new DataView(this.buffer, this.byteOffset, this.byteLength));
+ switch (byteLength) {
+ case 1: {
+ return view.getUint8(offset);
+ }
+ case 2: {
+ return view.getUint16(offset, false);
+ }
+ case 3: {
+ return view.getUint16(offset + 1, false) + view.getUint8(offset) * 2 ** 16;
+ }
+ case 4: {
+ return view.getUint32(offset, false);
+ }
+ case 5: {
+ const last = view.getUint8(offset);
+ return (last | ((last & (2 ** 7)) * 0x1fffffe)) * 2 ** 32 + view.getUint32(offset + 1, false);
+ }
+ case 6: {
+ const last = view.getUint16(offset, false);
+ return (last | ((last & (2 ** 15)) * 0x1fffe)) * 2 ** 32 + view.getUint32(offset + 2, false);
+ }
+ }
+ throw new RangeError("byteLength must be >= 1 and <= 6");
+}
+
+export function readFloatLE(this: BufferExt, offset) {
+ return (this.$dataView ||= new DataView(this.buffer, this.byteOffset, this.byteLength)).getFloat32(offset, true);
+}
+export function readFloatBE(this: BufferExt, offset) {
+ return (this.$dataView ||= new DataView(this.buffer, this.byteOffset, this.byteLength)).getFloat32(offset, false);
+}
+export function readDoubleLE(this: BufferExt, offset) {
+ return (this.$dataView ||= new DataView(this.buffer, this.byteOffset, this.byteLength)).getFloat64(offset, true);
+}
+export function readDoubleBE(this: BufferExt, offset) {
+ return (this.$dataView ||= new DataView(this.buffer, this.byteOffset, this.byteLength)).getFloat64(offset, false);
+}
+export function readBigInt64LE(this: BufferExt, offset) {
+ return (this.$dataView ||= new DataView(this.buffer, this.byteOffset, this.byteLength)).getBigInt64(offset, true);
+}
+export function readBigInt64BE(this: BufferExt, offset) {
+ return (this.$dataView ||= new DataView(this.buffer, this.byteOffset, this.byteLength)).getBigInt64(offset, false);
+}
+export function readBigUInt64LE(this: BufferExt, offset) {
+ return (this.$dataView ||= new DataView(this.buffer, this.byteOffset, this.byteLength)).getBigUint64(offset, true);
+}
+export function readBigUInt64BE(this: BufferExt, offset) {
+ return (this.$dataView ||= new DataView(this.buffer, this.byteOffset, this.byteLength)).getBigUint64(offset, false);
+}
+
+export function writeInt8(this: BufferExt, value, offset) {
+ (this.$dataView ||= new DataView(this.buffer, this.byteOffset, this.byteLength)).setInt8(offset, value);
+ return offset + 1;
+}
+export function writeUInt8(this: BufferExt, value, offset) {
+ (this.$dataView ||= new DataView(this.buffer, this.byteOffset, this.byteLength)).setUint8(offset, value);
+ return offset + 1;
+}
+export function writeInt16LE(this: BufferExt, value, offset) {
+ (this.$dataView ||= new DataView(this.buffer, this.byteOffset, this.byteLength)).setInt16(offset, value, true);
+ return offset + 2;
+}
+export function writeInt16BE(this: BufferExt, value, offset) {
+ (this.$dataView ||= new DataView(this.buffer, this.byteOffset, this.byteLength)).setInt16(offset, value, false);
+ return offset + 2;
+}
+export function writeUInt16LE(this: BufferExt, value, offset) {
+ (this.$dataView ||= new DataView(this.buffer, this.byteOffset, this.byteLength)).setUint16(offset, value, true);
+ return offset + 2;
+}
+export function writeUInt16BE(this: BufferExt, value, offset) {
+ (this.$dataView ||= new DataView(this.buffer, this.byteOffset, this.byteLength)).setUint16(offset, value, false);
+ return offset + 2;
+}
+export function writeInt32LE(this: BufferExt, value, offset) {
+ (this.$dataView ||= new DataView(this.buffer, this.byteOffset, this.byteLength)).setInt32(offset, value, true);
+ return offset + 4;
+}
+export function writeInt32BE(this: BufferExt, value, offset) {
+ (this.$dataView ||= new DataView(this.buffer, this.byteOffset, this.byteLength)).setInt32(offset, value, false);
+ return offset + 4;
+}
+export function writeUInt32LE(this: BufferExt, value, offset) {
+ (this.$dataView ||= new DataView(this.buffer, this.byteOffset, this.byteLength)).setUint32(offset, value, true);
+ return offset + 4;
+}
+export function writeUInt32BE(this: BufferExt, value, offset) {
+ (this.$dataView ||= new DataView(this.buffer, this.byteOffset, this.byteLength)).setUint32(offset, value, false);
+ return offset + 4;
+}
+
+export function writeIntLE(this: BufferExt, value, offset, byteLength) {
+ const view = (this.$dataView ||= new DataView(this.buffer, this.byteOffset, this.byteLength));
+ switch (byteLength) {
+ case 1: {
+ view.setInt8(offset, value);
+ break;
+ }
+ case 2: {
+ view.setInt16(offset, value, true);
+ break;
+ }
+ case 3: {
+ view.setUint16(offset, value & 0xffff, true);
+ view.setInt8(offset + 2, Math.floor(value * 2 ** -16));
+ break;
+ }
+ case 4: {
+ view.setInt32(offset, value, true);
+ break;
+ }
+ case 5: {
+ view.setUint32(offset, value | 0, true);
+ view.setInt8(offset + 4, Math.floor(value * 2 ** -32));
+ break;
+ }
+ case 6: {
+ view.setUint32(offset, value | 0, true);
+ view.setInt16(offset + 4, Math.floor(value * 2 ** -32), true);
+ break;
+ }
+ default: {
+ throw new RangeError("byteLength must be >= 1 and <= 6");
+ }
+ }
+ return offset + byteLength;
+}
+export function writeIntBE(this: BufferExt, value, offset, byteLength) {
+ const view = (this.$dataView ||= new DataView(this.buffer, this.byteOffset, this.byteLength));
+ switch (byteLength) {
+ case 1: {
+ view.setInt8(offset, value);
+ break;
+ }
+ case 2: {
+ view.setInt16(offset, value, false);
+ break;
+ }
+ case 3: {
+ view.setUint16(offset + 1, value & 0xffff, false);
+ view.setInt8(offset, Math.floor(value * 2 ** -16));
+ break;
+ }
+ case 4: {
+ view.setInt32(offset, value, false);
+ break;
+ }
+ case 5: {
+ view.setUint32(offset + 1, value | 0, false);
+ view.setInt8(offset, Math.floor(value * 2 ** -32));
+ break;
+ }
+ case 6: {
+ view.setUint32(offset + 2, value | 0, false);
+ view.setInt16(offset, Math.floor(value * 2 ** -32), false);
+ break;
+ }
+ default: {
+ throw new RangeError("byteLength must be >= 1 and <= 6");
+ }
+ }
+ return offset + byteLength;
+}
+export function writeUIntLE(this: BufferExt, value, offset, byteLength) {
+ const view = (this.$dataView ||= new DataView(this.buffer, this.byteOffset, this.byteLength));
+ switch (byteLength) {
+ case 1: {
+ view.setUint8(offset, value);
+ break;
+ }
+ case 2: {
+ view.setUint16(offset, value, true);
+ break;
+ }
+ case 3: {
+ view.setUint16(offset, value & 0xffff, true);
+ view.setUint8(offset + 2, Math.floor(value * 2 ** -16));
+ break;
+ }
+ case 4: {
+ view.setUint32(offset, value, true);
+ break;
+ }
+ case 5: {
+ view.setUint32(offset, value | 0, true);
+ view.setUint8(offset + 4, Math.floor(value * 2 ** -32));
+ break;
+ }
+ case 6: {
+ view.setUint32(offset, value | 0, true);
+ view.setUint16(offset + 4, Math.floor(value * 2 ** -32), true);
+ break;
+ }
+ default: {
+ throw new RangeError("byteLength must be >= 1 and <= 6");
+ }
+ }
+ return offset + byteLength;
+}
+export function writeUIntBE(this: BufferExt, value, offset, byteLength) {
+ const view = (this.$dataView ||= new DataView(this.buffer, this.byteOffset, this.byteLength));
+ switch (byteLength) {
+ case 1: {
+ view.setUint8(offset, value);
+ break;
+ }
+ case 2: {
+ view.setUint16(offset, value, false);
+ break;
+ }
+ case 3: {
+ view.setUint16(offset + 1, value & 0xffff, false);
+ view.setUint8(offset, Math.floor(value * 2 ** -16));
+ break;
+ }
+ case 4: {
+ view.setUint32(offset, value, false);
+ break;
+ }
+ case 5: {
+ view.setUint32(offset + 1, value | 0, false);
+ view.setUint8(offset, Math.floor(value * 2 ** -32));
+ break;
+ }
+ case 6: {
+ view.setUint32(offset + 2, value | 0, false);
+ view.setUint16(offset, Math.floor(value * 2 ** -32), false);
+ break;
+ }
+ default: {
+ throw new RangeError("byteLength must be >= 1 and <= 6");
+ }
+ }
+ return offset + byteLength;
+}
+
+export function writeFloatLE(this: BufferExt, value, offset) {
+ (this.$dataView ||= new DataView(this.buffer, this.byteOffset, this.byteLength)).setFloat32(offset, value, true);
+ return offset + 4;
+}
+
+export function writeFloatBE(this: BufferExt, value, offset) {
+ (this.$dataView ||= new DataView(this.buffer, this.byteOffset, this.byteLength)).setFloat32(offset, value, false);
+ return offset + 4;
+}
+
+export function writeDoubleLE(this: BufferExt, value, offset) {
+ (this.$dataView ||= new DataView(this.buffer, this.byteOffset, this.byteLength)).setFloat64(offset, value, true);
+ return offset + 8;
+}
+
+export function writeDoubleBE(this: BufferExt, value, offset) {
+ (this.$dataView ||= new DataView(this.buffer, this.byteOffset, this.byteLength)).setFloat64(offset, value, false);
+ return offset + 8;
+}
+
+export function writeBigInt64LE(this: BufferExt, value, offset) {
+ (this.$dataView ||= new DataView(this.buffer, this.byteOffset, this.byteLength)).setBigInt64(offset, value, true);
+ return offset + 8;
+}
+
+export function writeBigInt64BE(this: BufferExt, value, offset) {
+ (this.$dataView ||= new DataView(this.buffer, this.byteOffset, this.byteLength)).setBigInt64(offset, value, false);
+ return offset + 8;
+}
+
+export function writeBigUInt64LE(this: BufferExt, value, offset) {
+ (this.$dataView ||= new DataView(this.buffer, this.byteOffset, this.byteLength)).setBigUint64(offset, value, true);
+ return offset + 8;
+}
+
+export function writeBigUInt64BE(this: BufferExt, value, offset) {
+ (this.$dataView ||= new DataView(this.buffer, this.byteOffset, this.byteLength)).setBigUint64(offset, value, false);
+ return offset + 8;
+}
+
+export function utf8Write(this: BufferExt, text, offset, length) {
+ return this.write(text, offset, length, "utf8");
+}
+export function ucs2Write(this: BufferExt, text, offset, length) {
+ return this.write(text, offset, length, "ucs2");
+}
+export function utf16leWrite(this: BufferExt, text, offset, length) {
+ return this.write(text, offset, length, "utf16le");
+}
+export function latin1Write(this: BufferExt, text, offset, length) {
+ return this.write(text, offset, length, "latin1");
+}
+export function asciiWrite(this: BufferExt, text, offset, length) {
+ return this.write(text, offset, length, "ascii");
+}
+export function base64Write(this: BufferExt, text, offset, length) {
+ return this.write(text, offset, length, "base64");
+}
+export function base64urlWrite(this: BufferExt, text, offset, length) {
+ return this.write(text, offset, length, "base64url");
+}
+export function hexWrite(this: BufferExt, text, offset, length) {
+ return this.write(text, offset, length, "hex");
+}
+
+export function utf8Slice(this: BufferExt, offset, length) {
+ return this.toString(offset, length, "utf8");
+}
+export function ucs2Slice(this: BufferExt, offset, length) {
+ return this.toString(offset, length, "ucs2");
+}
+export function utf16leSlice(this: BufferExt, offset, length) {
+ return this.toString(offset, length, "utf16le");
+}
+export function latin1Slice(this: BufferExt, offset, length) {
+ return this.toString(offset, length, "latin1");
+}
+export function asciiSlice(this: BufferExt, offset, length) {
+ return this.toString(offset, length, "ascii");
+}
+export function base64Slice(this: BufferExt, offset, length) {
+ return this.toString(offset, length, "base64");
+}
+export function base64urlSlice(this: BufferExt, offset, length) {
+ return this.toString(offset, length, "base64url");
+}
+export function hexSlice(this: BufferExt, offset, length) {
+ return this.toString(offset, length, "hex");
+}
+
+export function toJSON(this: BufferExt) {
+ const type = "Buffer";
+ const data = Array.from(this);
+ return { type, data };
+}
+
+export function slice(this: BufferExt, start, end) {
+ var { buffer, byteOffset, byteLength } = this;
+
+ function adjustOffset(offset, length) {
+ // Use Math.trunc() to convert offset to an integer value that can be larger
+ // than an Int32. Hence, don't use offset | 0 or similar techniques.
+ offset = $trunc(offset);
+ if (offset === 0 || isNaN(offset)) {
+ return 0;
+ } else if (offset < 0) {
+ offset += length;
+ return offset > 0 ? offset : 0;
+ } else {
+ return offset < length ? offset : length;
+ }
+ }
+
+ var start_ = adjustOffset(start, byteLength);
+ var end_ = end !== undefined ? adjustOffset(end, byteLength) : byteLength;
+ return new $Buffer(buffer, byteOffset + start_, end_ > start_ ? end_ - start_ : 0);
+}
+
+$getter;
+export function parent(this: BufferExt) {
+ return $isObject(this) && this instanceof $Buffer ? this.buffer : undefined;
+}
+
+$getter;
+export function offset(this: BufferExt) {
+ return $isObject(this) && this instanceof $Buffer ? this.byteOffset : undefined;
+}
+
+export function inspect(this: BufferExt, recurseTimes, ctx) {
+ return Bun.inspect(this);
+}
diff --git a/src/bun.js/builtins/ts/ProcessObjectInternals.ts b/src/bun.js/builtins/ts/ProcessObjectInternals.ts
new file mode 100644
index 000000000..8b24e68ba
--- /dev/null
+++ b/src/bun.js/builtins/ts/ProcessObjectInternals.ts
@@ -0,0 +1,676 @@
+/*
+ * Copyright 2023 Codeblog Corp. All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ * 1. Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY APPLE INC. ``AS IS'' AND ANY
+ * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+ * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL APPLE INC. OR
+ * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
+ * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
+ * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
+ * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY
+ * OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+export function binding(bindingName) {
+ if (bindingName !== "constants")
+ throw new TypeError(
+ "process.binding() is not supported in Bun. If that breaks something, please file an issue and include a reproducible code sample.",
+ );
+
+ var cache = globalThis.Symbol.for("process.bindings.constants");
+ var constants = globalThis[cache];
+ if (!constants) {
+ // TODO: make this less hacky.
+ // This calls require("node:fs").constants
+ // except, outside an ESM module.
+ const { constants: fs } = globalThis[globalThis.Symbol.for("Bun.lazy")]("createImportMeta", "node:process").require(
+ "node:fs",
+ );
+ constants = {
+ fs,
+ zlib: {},
+ crypto: {},
+ os: Bun._Os().constants,
+ };
+ globalThis[cache] = constants;
+ }
+ return constants;
+}
+
+export function getStdioWriteStream(fd_, rawRequire) {
+ var module = { path: "node:process", require: rawRequire };
+ var require = path => module.require(path);
+
+ function createStdioWriteStream(fd_) {
+ var { Duplex, eos, destroy } = require("node:stream");
+ var StdioWriteStream = class StdioWriteStream extends Duplex {
+ #writeStream;
+ #readStream;
+
+ #readable = true;
+ #writable = true;
+ #fdPath;
+
+ #onClose;
+ #onDrain;
+ #onFinish;
+ #onReadable;
+ #isTTY;
+
+ get isTTY() {
+ return (this.#isTTY ??= require("node:tty").isatty(fd_));
+ }
+
+ get fd() {
+ return fd_;
+ }
+
+ constructor(fd) {
+ super({ readable: true, writable: true });
+ this.#fdPath = `/dev/fd/${fd}`;
+ }
+
+ #onFinished(err) {
+ const cb = this.#onClose;
+ this.#onClose = null;
+
+ if (cb) {
+ cb(err);
+ } else if (err) {
+ this.destroy(err);
+ } else if (!this.#readable && !this.#writable) {
+ this.destroy();
+ }
+ }
+
+ _destroy(err, callback) {
+ if (!err && this.#onClose !== null) {
+ var AbortError = class AbortError extends Error {
+ code: string;
+ name: string;
+ constructor(message = "The operation was aborted", options = void 0) {
+ if (options !== void 0 && typeof options !== "object") {
+ throw new Error(`Invalid AbortError options:\n\n${JSON.stringify(options, null, 2)}`);
+ }
+ super(message, options);
+ this.code = "ABORT_ERR";
+ this.name = "AbortError";
+ }
+ };
+ err = new AbortError();
+ }
+
+ this.#onDrain = null;
+ this.#onFinish = null;
+ if (this.#onClose === null) {
+ callback(err);
+ } else {
+ this.#onClose = callback;
+ if (this.#writeStream) destroy(this.#writeStream, err);
+ if (this.#readStream) destroy(this.#readStream, err);
+ }
+ }
+
+ _write(chunk, encoding, callback) {
+ if (!this.#writeStream) {
+ var { createWriteStream } = require("node:fs");
+ var stream = (this.#writeStream = createWriteStream(this.#fdPath));
+
+ stream.on("finish", () => {
+ if (this.#onFinish) {
+ const cb = this.#onFinish;
+ this.#onFinish = null;
+ cb();
+ }
+ });
+
+ stream.on("drain", () => {
+ if (this.#onDrain) {
+ const cb = this.#onDrain;
+ this.#onDrain = null;
+ cb();
+ }
+ });
+
+ eos(stream, err => {
+ this.#writable = false;
+ if (err) {
+ destroy(stream, err);
+ }
+ this.#onFinished(err);
+ });
+ }
+ if (stream.write(chunk, encoding)) {
+ callback();
+ } else {
+ this.#onDrain = callback;
+ }
+ }
+
+ _final(callback) {
+ this.#writeStream && this.#writeStream.end();
+ this.#onFinish = callback;
+ }
+
+ #loadReadStream() {
+ var { createReadStream } = require("node:fs");
+
+ var readStream = (this.#readStream = createReadStream(this.#fdPath));
+
+ readStream.on("readable", () => {
+ if (this.#onReadable) {
+ const cb = this.#onReadable;
+ this.#onReadable = null;
+ cb();
+ } else {
+ this.read();
+ }
+ });
+
+ readStream.on("end", () => {
+ this.push(null);
+ });
+
+ eos(readStream, err => {
+ this.#readable = false;
+ if (err) {
+ destroy(readStream, err);
+ }
+ this.#onFinished(err);
+ });
+ return readStream;
+ }
+
+ _read() {
+ var stream = this.#readStream;
+ if (!stream) {
+ stream = this.#loadReadStream();
+ }
+
+ while (true) {
+ const buf = stream.read();
+ if (buf === null || !this.push(buf)) {
+ return;
+ }
+ }
+ }
+ };
+ return new StdioWriteStream(fd_);
+ }
+
+ var { EventEmitter } = require("node:events");
+
+ function isFastEncoding(encoding) {
+ if (!encoding) return true;
+
+ var normalied = encoding.toLowerCase();
+ return normalied === "utf8" || normalied === "utf-8" || normalied === "buffer" || normalied === "binary";
+ }
+
+ var readline;
+
+ var FastStdioWriteStream = class StdioWriteStream extends EventEmitter {
+ #fd;
+ #innerStream;
+ #writer;
+ #isTTY;
+
+ bytesWritten = 0;
+
+ setDefaultEncoding(encoding) {
+ if (this.#innerStream || !isFastEncoding(encoding)) {
+ this.#ensureInnerStream();
+ return this.#innerStream.setDefaultEncoding(encoding);
+ }
+ }
+
+ #createWriter() {
+ switch (this.#fd) {
+ case 1: {
+ var writer = Bun.stdout.writer({ highWaterMark: 0 });
+ writer.unref();
+ return writer;
+ }
+
+ case 2: {
+ var writer = Bun.stderr.writer({ highWaterMark: 0 });
+ writer.unref();
+ return writer;
+ }
+ default: {
+ throw new Error("Unsupported writer");
+ }
+ }
+ }
+
+ #getWriter() {
+ return (this.#writer ??= this.#createWriter());
+ }
+
+ constructor(fd_) {
+ super();
+ this.#fd = fd_;
+ }
+
+ get fd() {
+ return this.#fd;
+ }
+
+ get isTTY() {
+ return (this.#isTTY ??= require("node:tty").isatty(this.#fd));
+ }
+
+ cursorTo(x, y, callback) {
+ return (readline ??= require("readline")).cursorTo(this, x, y, callback);
+ }
+
+ moveCursor(dx, dy, callback) {
+ return (readline ??= require("readline")).moveCursor(this, dx, dy, callback);
+ }
+
+ clearLine(dir, callback) {
+ return (readline ??= require("readline")).clearLine(this, dir, callback);
+ }
+
+ clearScreenDown(callback) {
+ return (readline ??= require("readline")).clearScreenDown(this, callback);
+ }
+
+ // TODO: once implemented this.columns and this.rows should be uncommented
+ // getWindowSize() {
+ // return [this.columns, this.rows];
+ // }
+
+ ref() {
+ this.#getWriter().ref();
+ }
+
+ unref() {
+ this.#getWriter().unref();
+ }
+
+ on(event, listener) {
+ if (event === "close" || event === "finish") {
+ this.#ensureInnerStream();
+ return this.#innerStream.on(event, listener);
+ }
+
+ if (event === "drain") {
+ return super.on("drain", listener);
+ }
+
+ if (event === "error") {
+ return super.on("error", listener);
+ }
+
+ return super.on(event, listener);
+ }
+
+ get _writableState() {
+ this.#ensureInnerStream();
+ return this.#innerStream._writableState;
+ }
+
+ get _readableState() {
+ this.#ensureInnerStream();
+ return this.#innerStream._readableState;
+ }
+
+ pipe(destination) {
+ this.#ensureInnerStream();
+ return this.#innerStream.pipe(destination);
+ }
+
+ unpipe(destination) {
+ this.#ensureInnerStream();
+ return this.#innerStream.unpipe(destination);
+ }
+
+ #ensureInnerStream() {
+ if (this.#innerStream) return;
+ this.#innerStream = createStdioWriteStream(this.#fd);
+ const events = this.eventNames();
+ for (const event of events) {
+ this.#innerStream.on(event, (...args) => {
+ this.emit(event, ...args);
+ });
+ }
+ }
+
+ #write1(chunk) {
+ var writer = this.#getWriter();
+ const writeResult = writer.write(chunk);
+ this.bytesWritten += writeResult;
+ const flushResult = writer.flush(false);
+ return !!(writeResult || flushResult);
+ }
+
+ #writeWithEncoding(chunk, encoding) {
+ if (!isFastEncoding(encoding)) {
+ this.#ensureInnerStream();
+ return this.#innerStream.write(chunk, encoding);
+ }
+
+ return this.#write1(chunk);
+ }
+
+ #performCallback(cb, err?: any) {
+ if (err) {
+ this.emit("error", err);
+ }
+
+ try {
+ cb(err ? err : null);
+ } catch (err2) {
+ this.emit("error", err2);
+ }
+ }
+
+ #writeWithCallbackAndEncoding(chunk, encoding, callback) {
+ if (!isFastEncoding(encoding)) {
+ this.#ensureInnerStream();
+ return this.#innerStream.write(chunk, encoding, callback);
+ }
+
+ var writer = this.#getWriter();
+ const writeResult = writer.write(chunk);
+ const flushResult = writer.flush(true);
+ if (flushResult?.then) {
+ flushResult.then(
+ () => {
+ this.#performCallback(callback);
+ this.emit("drain");
+ },
+ err => this.#performCallback(callback, err),
+ );
+ return false;
+ }
+
+ queueMicrotask(() => {
+ this.#performCallback(callback);
+ });
+
+ return !!(writeResult || flushResult);
+ }
+
+ write(chunk, encoding, callback) {
+ const result = this._write(chunk, encoding, callback);
+
+ if (result) {
+ this.emit("drain");
+ }
+
+ return result;
+ }
+
+ get hasColors() {
+ return Bun.tty[this.#fd].hasColors;
+ }
+
+ _write(chunk, encoding, callback) {
+ var inner = this.#innerStream;
+ if (inner) {
+ return inner.write(chunk, encoding, callback);
+ }
+
+ switch (arguments.length) {
+ case 0: {
+ var error = new Error("Invalid arguments");
+ error.code = "ERR_INVALID_ARG_TYPE";
+ throw error;
+ }
+ case 1: {
+ return this.#write1(chunk);
+ }
+ case 2: {
+ if (typeof encoding === "function") {
+ return this.#writeWithCallbackAndEncoding(chunk, "", encoding);
+ } else if (typeof encoding === "string") {
+ return this.#writeWithEncoding(chunk, encoding);
+ }
+ }
+ default: {
+ if (
+ (typeof encoding !== "undefined" && typeof encoding !== "string") ||
+ (typeof callback !== "undefined" && typeof callback !== "function")
+ ) {
+ var error = new Error("Invalid arguments");
+ error.code = "ERR_INVALID_ARG_TYPE";
+ throw error;
+ }
+
+ if (typeof callback === "undefined") {
+ return this.#writeWithEncoding(chunk, encoding);
+ }
+
+ return this.#writeWithCallbackAndEncoding(chunk, encoding, callback);
+ }
+ }
+ }
+
+ destroy() {
+ return this;
+ }
+
+ end() {
+ return this;
+ }
+ };
+
+ return new FastStdioWriteStream(fd_);
+}
+
+export function getStdinStream(fd_, rawRequire, Bun) {
+ var module = { path: "node:process", require: rawRequire };
+ var require = path => module.require(path);
+
+ var { Duplex, eos, destroy } = require("node:stream");
+
+ var StdinStream = class StdinStream extends Duplex {
+ #reader;
+ // TODO: investigate https://github.com/oven-sh/bun/issues/1607
+
+ #readRef;
+ #writeStream;
+
+ #readable = true;
+ #unrefOnRead = false;
+ #writable = true;
+
+ #onFinish;
+ #onClose;
+ #onDrain;
+
+ get isTTY() {
+ return require("tty").isatty(fd_);
+ }
+
+ get fd() {
+ return fd_;
+ }
+
+ constructor() {
+ super({ readable: true, writable: true });
+ }
+
+ #onFinished(err?) {
+ const cb = this.#onClose;
+ this.#onClose = null;
+
+ if (cb) {
+ cb(err);
+ } else if (err) {
+ this.destroy(err);
+ } else if (!this.#readable && !this.#writable) {
+ this.destroy();
+ }
+ }
+
+ _destroy(err, callback) {
+ if (!err && this.#onClose !== null) {
+ var AbortError = class AbortError extends Error {
+ constructor(message = "The operation was aborted", options = void 0) {
+ if (options !== void 0 && typeof options !== "object") {
+ throw new Error(`Invalid AbortError options:\n\n${JSON.stringify(options, null, 2)}`);
+ }
+ super(message, options);
+ this.code = "ABORT_ERR";
+ this.name = "AbortError";
+ }
+ };
+ err = new AbortError();
+ }
+
+ if (this.#onClose === null) {
+ callback(err);
+ } else {
+ this.#onClose = callback;
+ if (this.#writeStream) destroy(this.#writeStream, err);
+ }
+ }
+
+ setRawMode(mode) {}
+
+ on(name, callback) {
+ // Streams don't generally required to present any data when only
+ // `readable` events are present, i.e. `readableFlowing === false`
+ //
+ // However, Node.js has a this quirk whereby `process.stdin.read()`
+ // blocks under TTY mode, thus looping `.read()` in this particular
+ // case would not result in truncation.
+ //
+ // Therefore the following hack is only specific to `process.stdin`
+ // and does not apply to the underlying Stream implementation.
+ if (name === "readable") {
+ this.ref();
+ this.#unrefOnRead = true;
+ }
+ return super.on(name, callback);
+ }
+
+ pause() {
+ this.unref();
+ return super.pause();
+ }
+
+ resume() {
+ this.ref();
+ return super.resume();
+ }
+
+ ref() {
+ this.#reader ??= Bun.stdin.stream().getReader();
+ this.#readRef ??= setInterval(() => {}, 1 << 30);
+ }
+
+ unref() {
+ if (this.#readRef) {
+ clearInterval(this.#readRef);
+ this.#readRef = null;
+ }
+ }
+
+ async #readInternal() {
+ try {
+ var done, value;
+ const read = this.#reader.readMany();
+
+ // read same-tick if possible
+ if (!read?.then) {
+ ({ done, value } = read);
+ } else {
+ ({ done, value } = await read);
+ }
+
+ if (!done) {
+ this.push(value[0]);
+
+ // shouldn't actually happen, but just in case
+ const length = value.length;
+ for (let i = 1; i < length; i++) {
+ this.push(value[i]);
+ }
+ } else {
+ this.push(null);
+ this.pause();
+ this.#readable = false;
+ this.#onFinished();
+ }
+ } catch (err) {
+ this.#readable = false;
+ this.#onFinished(err);
+ }
+ }
+
+ _read(size) {
+ if (this.#unrefOnRead) {
+ this.unref();
+ this.#unrefOnRead = false;
+ }
+ this.#readInternal();
+ }
+
+ #constructWriteStream() {
+ var { createWriteStream } = require("node:fs");
+ var writeStream = (this.#writeStream = createWriteStream("/dev/fd/0"));
+
+ writeStream.on("finish", () => {
+ if (this.#onFinish) {
+ const cb = this.#onFinish;
+ this.#onFinish = null;
+ cb();
+ }
+ });
+
+ writeStream.on("drain", () => {
+ if (this.#onDrain) {
+ const cb = this.#onDrain;
+ this.#onDrain = null;
+ cb();
+ }
+ });
+
+ eos(writeStream, err => {
+ this.#writable = false;
+ if (err) {
+ destroy(writeStream, err);
+ }
+ this.#onFinished(err);
+ });
+
+ return writeStream;
+ }
+
+ _write(chunk, encoding, callback) {
+ var writeStream = this.#writeStream;
+ if (!writeStream) {
+ writeStream = this.#constructWriteStream();
+ }
+
+ if (writeStream.write(chunk, encoding)) {
+ callback();
+ } else {
+ this.#onDrain = callback;
+ }
+ }
+
+ _final(callback) {
+ this.#writeStream.end();
+ this.#onFinish = (...args) => callback(...args);
+ }
+ };
+
+ return new StdinStream();
+}
diff --git a/src/bun.js/builtins/ts/ReadableByteStreamController.ts b/src/bun.js/builtins/ts/ReadableByteStreamController.ts
new file mode 100644
index 000000000..888f241bc
--- /dev/null
+++ b/src/bun.js/builtins/ts/ReadableByteStreamController.ts
@@ -0,0 +1,93 @@
+/*
+ * Copyright (C) 2016 Canon Inc.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ * 1. Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY APPLE INC. ``AS IS'' AND ANY
+ * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+ * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL APPLE INC. OR
+ * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
+ * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
+ * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
+ * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY
+ * OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+export function initializeReadableByteStreamController(this, stream, underlyingByteSource, highWaterMark) {
+ if (arguments.length !== 4 && arguments[3] !== $isReadableStream)
+ throw new TypeError("ReadableByteStreamController constructor should not be called directly");
+
+ return $privateInitializeReadableByteStreamController.$call(this, stream, underlyingByteSource, highWaterMark);
+}
+
+export function enqueue(this, chunk) {
+ if (!$isReadableByteStreamController(this)) throw $makeThisTypeError("ReadableByteStreamController", "enqueue");
+
+ if ($getByIdDirectPrivate(this, "closeRequested"))
+ throw new TypeError("ReadableByteStreamController is requested to close");
+
+ if ($getByIdDirectPrivate($getByIdDirectPrivate(this, "controlledReadableStream"), "state") !== $streamReadable)
+ throw new TypeError("ReadableStream is not readable");
+
+ if (!$isObject(chunk) || !ArrayBuffer.$isView(chunk)) throw new TypeError("Provided chunk is not a TypedArray");
+
+ return $readableByteStreamControllerEnqueue(this, chunk);
+}
+
+export function error(this, error) {
+ if (!$isReadableByteStreamController(this)) throw $makeThisTypeError("ReadableByteStreamController", "error");
+
+ if ($getByIdDirectPrivate($getByIdDirectPrivate(this, "controlledReadableStream"), "state") !== $streamReadable)
+ throw new TypeError("ReadableStream is not readable");
+
+ $readableByteStreamControllerError(this, error);
+}
+
+export function close(this) {
+ if (!$isReadableByteStreamController(this)) throw $makeThisTypeError("ReadableByteStreamController", "close");
+
+ if ($getByIdDirectPrivate(this, "closeRequested")) throw new TypeError("Close has already been requested");
+
+ if ($getByIdDirectPrivate($getByIdDirectPrivate(this, "controlledReadableStream"), "state") !== $streamReadable)
+ throw new TypeError("ReadableStream is not readable");
+
+ $readableByteStreamControllerClose(this);
+}
+
+$getter;
+export function byobRequest(this) {
+ if (!$isReadableByteStreamController(this)) throw $makeGetterTypeError("ReadableByteStreamController", "byobRequest");
+
+ var request = $getByIdDirectPrivate(this, "byobRequest");
+ if (request === undefined) {
+ var pending = $getByIdDirectPrivate(this, "pendingPullIntos");
+ const firstDescriptor = pending.peek();
+ if (firstDescriptor) {
+ const view = new Uint8Array(
+ firstDescriptor.buffer,
+ firstDescriptor.byteOffset + firstDescriptor.bytesFilled,
+ firstDescriptor.byteLength - firstDescriptor.bytesFilled,
+ );
+ $putByIdDirectPrivate(this, "byobRequest", new ReadableStreamBYOBRequest(this, view, $isReadableStream));
+ }
+ }
+
+ return $getByIdDirectPrivate(this, "byobRequest");
+}
+
+$getter;
+export function desiredSize(this) {
+ if (!$isReadableByteStreamController(this)) throw $makeGetterTypeError("ReadableByteStreamController", "desiredSize");
+
+ return $readableByteStreamControllerGetDesiredSize(this);
+}
diff --git a/src/bun.js/builtins/ts/ReadableByteStreamInternals.ts b/src/bun.js/builtins/ts/ReadableByteStreamInternals.ts
new file mode 100644
index 000000000..f44c385b4
--- /dev/null
+++ b/src/bun.js/builtins/ts/ReadableByteStreamInternals.ts
@@ -0,0 +1,656 @@
+/*
+ * Copyright (C) 2016 Canon Inc. All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ * 1. Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY APPLE INC. ``AS IS'' AND ANY
+ * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+ * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL APPLE INC. OR
+ * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
+ * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
+ * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
+ * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY
+ * OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+// @internal
+
+export function privateInitializeReadableByteStreamController(this, stream, underlyingByteSource, highWaterMark) {
+ if (!$isReadableStream(stream)) throw new TypeError("ReadableByteStreamController needs a ReadableStream");
+
+ // readableStreamController is initialized with null value.
+ if ($getByIdDirectPrivate(stream, "readableStreamController") !== null)
+ throw new TypeError("ReadableStream already has a controller");
+
+ $putByIdDirectPrivate(this, "controlledReadableStream", stream);
+ $putByIdDirectPrivate(this, "underlyingByteSource", underlyingByteSource);
+ $putByIdDirectPrivate(this, "pullAgain", false);
+ $putByIdDirectPrivate(this, "pulling", false);
+ $readableByteStreamControllerClearPendingPullIntos(this);
+ $putByIdDirectPrivate(this, "queue", $newQueue());
+ $putByIdDirectPrivate(this, "started", 0);
+ $putByIdDirectPrivate(this, "closeRequested", false);
+
+ let hwm = $toNumber(highWaterMark);
+ if (isNaN(hwm) || hwm < 0) throw new RangeError("highWaterMark value is negative or not a number");
+ $putByIdDirectPrivate(this, "strategyHWM", hwm);
+
+ let autoAllocateChunkSize = underlyingByteSource.autoAllocateChunkSize;
+ if (autoAllocateChunkSize !== undefined) {
+ autoAllocateChunkSize = $toNumber(autoAllocateChunkSize);
+ if (autoAllocateChunkSize <= 0 || autoAllocateChunkSize === Infinity || autoAllocateChunkSize === -Infinity)
+ throw new RangeError("autoAllocateChunkSize value is negative or equal to positive or negative infinity");
+ }
+ $putByIdDirectPrivate(this, "autoAllocateChunkSize", autoAllocateChunkSize);
+ $putByIdDirectPrivate(this, "pendingPullIntos", $createFIFO());
+
+ const controller = this;
+ $promiseInvokeOrNoopNoCatch($getByIdDirectPrivate(controller, "underlyingByteSource"), "start", [controller]).$then(
+ () => {
+ $putByIdDirectPrivate(controller, "started", 1);
+ $assert(!$getByIdDirectPrivate(controller, "pulling"));
+ $assert(!$getByIdDirectPrivate(controller, "pullAgain"));
+ $readableByteStreamControllerCallPullIfNeeded(controller);
+ },
+ error => {
+ if ($getByIdDirectPrivate(stream, "state") === $streamReadable)
+ $readableByteStreamControllerError(controller, error);
+ },
+ );
+
+ $putByIdDirectPrivate(this, "cancel", $readableByteStreamControllerCancel);
+ $putByIdDirectPrivate(this, "pull", $readableByteStreamControllerPull);
+
+ return this;
+}
+
+export function readableStreamByteStreamControllerStart(this, controller) {
+ $putByIdDirectPrivate(controller, "start", undefined);
+}
+
+export function privateInitializeReadableStreamBYOBRequest(this, controller, view) {
+ $putByIdDirectPrivate(this, "associatedReadableByteStreamController", controller);
+ $putByIdDirectPrivate(this, "view", view);
+}
+
+export function isReadableByteStreamController(controller) {
+ // Same test mechanism as in isReadableStreamDefaultController (ReadableStreamInternals.js).
+ // See corresponding function for explanations.
+ return $isObject(controller) && !!$getByIdDirectPrivate(controller, "underlyingByteSource");
+}
+
+export function isReadableStreamBYOBRequest(byobRequest) {
+ // Same test mechanism as in isReadableStreamDefaultController (ReadableStreamInternals.js).
+ // See corresponding function for explanations.
+ return $isObject(byobRequest) && !!$getByIdDirectPrivate(byobRequest, "associatedReadableByteStreamController");
+}
+
+export function isReadableStreamBYOBReader(reader) {
+ // Spec tells to return true only if reader has a readIntoRequests internal slot.
+ // However, since it is a private slot, it cannot be checked using hasOwnProperty().
+ // Since readIntoRequests is initialized with an empty array, the following test is ok.
+ return $isObject(reader) && !!$getByIdDirectPrivate(reader, "readIntoRequests");
+}
+
+export function readableByteStreamControllerCancel(controller, reason) {
+ var pendingPullIntos = $getByIdDirectPrivate(controller, "pendingPullIntos");
+ var first = pendingPullIntos.peek();
+ if (first) first.bytesFilled = 0;
+
+ $putByIdDirectPrivate(controller, "queue", $newQueue());
+ return $promiseInvokeOrNoop($getByIdDirectPrivate(controller, "underlyingByteSource"), "cancel", [reason]);
+}
+
+export function readableByteStreamControllerError(controller, e) {
+ $assert(
+ $getByIdDirectPrivate($getByIdDirectPrivate(controller, "controlledReadableStream"), "state") === $streamReadable,
+ );
+ $readableByteStreamControllerClearPendingPullIntos(controller);
+ $putByIdDirectPrivate(controller, "queue", $newQueue());
+ $readableStreamError($getByIdDirectPrivate(controller, "controlledReadableStream"), e);
+}
+
+export function readableByteStreamControllerClose(controller) {
+ $assert(!$getByIdDirectPrivate(controller, "closeRequested"));
+ $assert(
+ $getByIdDirectPrivate($getByIdDirectPrivate(controller, "controlledReadableStream"), "state") === $streamReadable,
+ );
+
+ if ($getByIdDirectPrivate(controller, "queue").size > 0) {
+ $putByIdDirectPrivate(controller, "closeRequested", true);
+ return;
+ }
+
+ var first = $getByIdDirectPrivate(controller, "pendingPullIntos")?.peek();
+ if (first) {
+ if (first.bytesFilled > 0) {
+ const e = $makeTypeError("Close requested while there remain pending bytes");
+ $readableByteStreamControllerError(controller, e);
+ throw e;
+ }
+ }
+
+ $readableStreamClose($getByIdDirectPrivate(controller, "controlledReadableStream"));
+}
+
+export function readableByteStreamControllerClearPendingPullIntos(controller) {
+ $readableByteStreamControllerInvalidateBYOBRequest(controller);
+ var existing = $getByIdDirectPrivate(controller, "pendingPullIntos");
+ if (existing !== undefined) {
+ existing.clear();
+ } else {
+ $putByIdDirectPrivate(controller, "pendingPullIntos", $createFIFO());
+ }
+}
+
+export function readableByteStreamControllerGetDesiredSize(controller) {
+ const stream = $getByIdDirectPrivate(controller, "controlledReadableStream");
+ const state = $getByIdDirectPrivate(stream, "state");
+
+ if (state === $streamErrored) return null;
+ if (state === $streamClosed) return 0;
+
+ return $getByIdDirectPrivate(controller, "strategyHWM") - $getByIdDirectPrivate(controller, "queue").size;
+}
+
+export function readableStreamHasBYOBReader(stream) {
+ const reader = $getByIdDirectPrivate(stream, "reader");
+ return reader !== undefined && $isReadableStreamBYOBReader(reader);
+}
+
+export function readableStreamHasDefaultReader(stream) {
+ const reader = $getByIdDirectPrivate(stream, "reader");
+ return reader !== undefined && $isReadableStreamDefaultReader(reader);
+}
+
+export function readableByteStreamControllerHandleQueueDrain(controller) {
+ $assert(
+ $getByIdDirectPrivate($getByIdDirectPrivate(controller, "controlledReadableStream"), "state") === $streamReadable,
+ );
+ if (!$getByIdDirectPrivate(controller, "queue").size && $getByIdDirectPrivate(controller, "closeRequested"))
+ $readableStreamClose($getByIdDirectPrivate(controller, "controlledReadableStream"));
+ else $readableByteStreamControllerCallPullIfNeeded(controller);
+}
+
+export function readableByteStreamControllerPull(controller) {
+ const stream = $getByIdDirectPrivate(controller, "controlledReadableStream");
+ $assert($readableStreamHasDefaultReader(stream));
+ if ($getByIdDirectPrivate(controller, "queue").content?.isNotEmpty()) {
+ const entry = $getByIdDirectPrivate(controller, "queue").content.shift();
+ $getByIdDirectPrivate(controller, "queue").size -= entry.byteLength;
+ $readableByteStreamControllerHandleQueueDrain(controller);
+ let view;
+ try {
+ view = new Uint8Array(entry.buffer, entry.byteOffset, entry.byteLength);
+ } catch (error) {
+ return Promise.$reject(error);
+ }
+ return $createFulfilledPromise({ value: view, done: false });
+ }
+
+ if ($getByIdDirectPrivate(controller, "autoAllocateChunkSize") !== undefined) {
+ let buffer;
+ try {
+ buffer = $createUninitializedArrayBuffer($getByIdDirectPrivate(controller, "autoAllocateChunkSize"));
+ } catch (error) {
+ return Promise.$reject(error);
+ }
+ const pullIntoDescriptor = {
+ buffer,
+ byteOffset: 0,
+ byteLength: $getByIdDirectPrivate(controller, "autoAllocateChunkSize"),
+ bytesFilled: 0,
+ elementSize: 1,
+ ctor: Uint8Array,
+ readerType: "default",
+ };
+ $getByIdDirectPrivate(controller, "pendingPullIntos").push(pullIntoDescriptor);
+ }
+
+ const promise = $readableStreamAddReadRequest(stream);
+ $readableByteStreamControllerCallPullIfNeeded(controller);
+ return promise;
+}
+
+export function readableByteStreamControllerShouldCallPull(controller) {
+ const stream = $getByIdDirectPrivate(controller, "controlledReadableStream");
+
+ if ($getByIdDirectPrivate(stream, "state") !== $streamReadable) return false;
+ if ($getByIdDirectPrivate(controller, "closeRequested")) return false;
+ if (!($getByIdDirectPrivate(controller, "started") > 0)) return false;
+ const reader = $getByIdDirectPrivate(stream, "reader");
+
+ if (
+ reader &&
+ ($getByIdDirectPrivate(reader, "readRequests")?.isNotEmpty() || !!$getByIdDirectPrivate(reader, "bunNativePtr"))
+ )
+ return true;
+ if (
+ $readableStreamHasBYOBReader(stream) &&
+ $getByIdDirectPrivate($getByIdDirectPrivate(stream, "reader"), "readIntoRequests")?.isNotEmpty()
+ )
+ return true;
+ if ($readableByteStreamControllerGetDesiredSize(controller) > 0) return true;
+ return false;
+}
+
+export function readableByteStreamControllerCallPullIfNeeded(controller) {
+ if (!$readableByteStreamControllerShouldCallPull(controller)) return;
+
+ if ($getByIdDirectPrivate(controller, "pulling")) {
+ $putByIdDirectPrivate(controller, "pullAgain", true);
+ return;
+ }
+
+ $assert(!$getByIdDirectPrivate(controller, "pullAgain"));
+ $putByIdDirectPrivate(controller, "pulling", true);
+ $promiseInvokeOrNoop($getByIdDirectPrivate(controller, "underlyingByteSource"), "pull", [controller]).$then(
+ () => {
+ $putByIdDirectPrivate(controller, "pulling", false);
+ if ($getByIdDirectPrivate(controller, "pullAgain")) {
+ $putByIdDirectPrivate(controller, "pullAgain", false);
+ $readableByteStreamControllerCallPullIfNeeded(controller);
+ }
+ },
+ error => {
+ if (
+ $getByIdDirectPrivate($getByIdDirectPrivate(controller, "controlledReadableStream"), "state") ===
+ $streamReadable
+ )
+ $readableByteStreamControllerError(controller, error);
+ },
+ );
+}
+
+export function transferBufferToCurrentRealm(buffer) {
+ // FIXME: Determine what should be done here exactly (what is already existing in current
+ // codebase and what has to be added). According to spec, Transfer operation should be
+ // performed in order to transfer buffer to current realm. For the moment, simply return
+ // received buffer.
+ return buffer;
+}
+
+export function readableStreamReaderKind(reader) {
+ if (!!$getByIdDirectPrivate(reader, "readRequests")) return $getByIdDirectPrivate(reader, "bunNativePtr") ? 3 : 1;
+
+ if (!!$getByIdDirectPrivate(reader, "readIntoRequests")) return 2;
+
+ return 0;
+}
+
+export function readableByteStreamControllerEnqueue(controller, chunk) {
+ const stream = $getByIdDirectPrivate(controller, "controlledReadableStream");
+ $assert(!$getByIdDirectPrivate(controller, "closeRequested"));
+ $assert($getByIdDirectPrivate(stream, "state") === $streamReadable);
+
+ switch (
+ $getByIdDirectPrivate(stream, "reader") ? $readableStreamReaderKind($getByIdDirectPrivate(stream, "reader")) : 0
+ ) {
+ /* default reader */
+ case 1: {
+ if (!$getByIdDirectPrivate($getByIdDirectPrivate(stream, "reader"), "readRequests")?.isNotEmpty())
+ $readableByteStreamControllerEnqueueChunk(
+ controller,
+ $transferBufferToCurrentRealm(chunk.buffer),
+ chunk.byteOffset,
+ chunk.byteLength,
+ );
+ else {
+ $assert(!$getByIdDirectPrivate(controller, "queue").content.size());
+ const transferredView =
+ chunk.constructor === Uint8Array ? chunk : new Uint8Array(chunk.buffer, chunk.byteOffset, chunk.byteLength);
+ $readableStreamFulfillReadRequest(stream, transferredView, false);
+ }
+ break;
+ }
+
+ /* BYOB */
+ case 2: {
+ $readableByteStreamControllerEnqueueChunk(
+ controller,
+ $transferBufferToCurrentRealm(chunk.buffer),
+ chunk.byteOffset,
+ chunk.byteLength,
+ );
+ $readableByteStreamControllerProcessPullDescriptors(controller);
+ break;
+ }
+
+ /* NativeReader */
+ case 3: {
+ // reader.$enqueueNative($getByIdDirectPrivate(reader, "bunNativePtr"), chunk);
+
+ break;
+ }
+
+ default: {
+ $assert(!$isReadableStreamLocked(stream));
+ $readableByteStreamControllerEnqueueChunk(
+ controller,
+ $transferBufferToCurrentRealm(chunk.buffer),
+ chunk.byteOffset,
+ chunk.byteLength,
+ );
+ break;
+ }
+ }
+}
+
+// Spec name: readableByteStreamControllerEnqueueChunkToQueue.
+export function readableByteStreamControllerEnqueueChunk(controller, buffer, byteOffset, byteLength) {
+ $getByIdDirectPrivate(controller, "queue").content.push({
+ buffer: buffer,
+ byteOffset: byteOffset,
+ byteLength: byteLength,
+ });
+ $getByIdDirectPrivate(controller, "queue").size += byteLength;
+}
+
+export function readableByteStreamControllerRespondWithNewView(controller, view) {
+ $assert($getByIdDirectPrivate(controller, "pendingPullIntos").isNotEmpty());
+
+ let firstDescriptor = $getByIdDirectPrivate(controller, "pendingPullIntos").peek();
+
+ if (firstDescriptor.byteOffset + firstDescriptor.bytesFilled !== view.byteOffset)
+ throw new RangeError("Invalid value for view.byteOffset");
+
+ if (firstDescriptor.byteLength !== view.byteLength) throw new RangeError("Invalid value for view.byteLength");
+
+ firstDescriptor.buffer = view.buffer;
+ $readableByteStreamControllerRespondInternal(controller, view.byteLength);
+}
+
+export function readableByteStreamControllerRespond(controller, bytesWritten) {
+ bytesWritten = $toNumber(bytesWritten);
+
+ if (isNaN(bytesWritten) || bytesWritten === Infinity || bytesWritten < 0)
+ throw new RangeError("bytesWritten has an incorrect value");
+
+ $assert($getByIdDirectPrivate(controller, "pendingPullIntos").isNotEmpty());
+
+ $readableByteStreamControllerRespondInternal(controller, bytesWritten);
+}
+
+export function readableByteStreamControllerRespondInternal(controller, bytesWritten) {
+ let firstDescriptor = $getByIdDirectPrivate(controller, "pendingPullIntos").peek();
+ let stream = $getByIdDirectPrivate(controller, "controlledReadableStream");
+
+ if ($getByIdDirectPrivate(stream, "state") === $streamClosed) {
+ if (bytesWritten !== 0) throw new TypeError("bytesWritten is different from 0 even though stream is closed");
+ $readableByteStreamControllerRespondInClosedState(controller, firstDescriptor);
+ } else {
+ $assert($getByIdDirectPrivate(stream, "state") === $streamReadable);
+ $readableByteStreamControllerRespondInReadableState(controller, bytesWritten, firstDescriptor);
+ }
+}
+
+export function readableByteStreamControllerRespondInReadableState(controller, bytesWritten, pullIntoDescriptor) {
+ if (pullIntoDescriptor.bytesFilled + bytesWritten > pullIntoDescriptor.byteLength)
+ throw new RangeError("bytesWritten value is too great");
+
+ $assert(
+ $getByIdDirectPrivate(controller, "pendingPullIntos").isEmpty() ||
+ $getByIdDirectPrivate(controller, "pendingPullIntos").peek() === pullIntoDescriptor,
+ );
+ $readableByteStreamControllerInvalidateBYOBRequest(controller);
+ pullIntoDescriptor.bytesFilled += bytesWritten;
+
+ if (pullIntoDescriptor.bytesFilled < pullIntoDescriptor.elementSize) return;
+
+ $readableByteStreamControllerShiftPendingDescriptor(controller);
+ const remainderSize = pullIntoDescriptor.bytesFilled % pullIntoDescriptor.elementSize;
+
+ if (remainderSize > 0) {
+ const end = pullIntoDescriptor.byteOffset + pullIntoDescriptor.bytesFilled;
+ const remainder = $cloneArrayBuffer(pullIntoDescriptor.buffer, end - remainderSize, remainderSize);
+ $readableByteStreamControllerEnqueueChunk(controller, remainder, 0, remainder.byteLength);
+ }
+
+ pullIntoDescriptor.buffer = $transferBufferToCurrentRealm(pullIntoDescriptor.buffer);
+ pullIntoDescriptor.bytesFilled -= remainderSize;
+ $readableByteStreamControllerCommitDescriptor(
+ $getByIdDirectPrivate(controller, "controlledReadableStream"),
+ pullIntoDescriptor,
+ );
+ $readableByteStreamControllerProcessPullDescriptors(controller);
+}
+
+export function readableByteStreamControllerRespondInClosedState(controller, firstDescriptor) {
+ firstDescriptor.buffer = $transferBufferToCurrentRealm(firstDescriptor.buffer);
+ $assert(firstDescriptor.bytesFilled === 0);
+
+ if ($readableStreamHasBYOBReader($getByIdDirectPrivate(controller, "controlledReadableStream"))) {
+ while (
+ $getByIdDirectPrivate(
+ $getByIdDirectPrivate($getByIdDirectPrivate(controller, "controlledReadableStream"), "reader"),
+ "readIntoRequests",
+ )?.isNotEmpty()
+ ) {
+ let pullIntoDescriptor = $readableByteStreamControllerShiftPendingDescriptor(controller);
+ $readableByteStreamControllerCommitDescriptor(
+ $getByIdDirectPrivate(controller, "controlledReadableStream"),
+ pullIntoDescriptor,
+ );
+ }
+ }
+}
+
+// Spec name: readableByteStreamControllerProcessPullIntoDescriptorsUsingQueue (shortened for readability).
+export function readableByteStreamControllerProcessPullDescriptors(controller) {
+ $assert(!$getByIdDirectPrivate(controller, "closeRequested"));
+ while ($getByIdDirectPrivate(controller, "pendingPullIntos").isNotEmpty()) {
+ if ($getByIdDirectPrivate(controller, "queue").size === 0) return;
+ let pullIntoDescriptor = $getByIdDirectPrivate(controller, "pendingPullIntos").peek();
+ if ($readableByteStreamControllerFillDescriptorFromQueue(controller, pullIntoDescriptor)) {
+ $readableByteStreamControllerShiftPendingDescriptor(controller);
+ $readableByteStreamControllerCommitDescriptor(
+ $getByIdDirectPrivate(controller, "controlledReadableStream"),
+ pullIntoDescriptor,
+ );
+ }
+ }
+}
+
+// Spec name: readableByteStreamControllerFillPullIntoDescriptorFromQueue (shortened for readability).
+export function readableByteStreamControllerFillDescriptorFromQueue(controller, pullIntoDescriptor) {
+ const currentAlignedBytes =
+ pullIntoDescriptor.bytesFilled - (pullIntoDescriptor.bytesFilled % pullIntoDescriptor.elementSize);
+ const maxBytesToCopy =
+ $getByIdDirectPrivate(controller, "queue").size < pullIntoDescriptor.byteLength - pullIntoDescriptor.bytesFilled
+ ? $getByIdDirectPrivate(controller, "queue").size
+ : pullIntoDescriptor.byteLength - pullIntoDescriptor.bytesFilled;
+ const maxBytesFilled = pullIntoDescriptor.bytesFilled + maxBytesToCopy;
+ const maxAlignedBytes = maxBytesFilled - (maxBytesFilled % pullIntoDescriptor.elementSize);
+ let totalBytesToCopyRemaining = maxBytesToCopy;
+ let ready = false;
+
+ if (maxAlignedBytes > currentAlignedBytes) {
+ totalBytesToCopyRemaining = maxAlignedBytes - pullIntoDescriptor.bytesFilled;
+ ready = true;
+ }
+
+ while (totalBytesToCopyRemaining > 0) {
+ let headOfQueue = $getByIdDirectPrivate(controller, "queue").content.peek();
+ const bytesToCopy =
+ totalBytesToCopyRemaining < headOfQueue.byteLength ? totalBytesToCopyRemaining : headOfQueue.byteLength;
+ // Copy appropriate part of pullIntoDescriptor.buffer to headOfQueue.buffer.
+ // Remark: this implementation is not completely aligned on the definition of CopyDataBlockBytes
+ // operation of ECMAScript (the case of Shared Data Block is not considered here, but it doesn't seem to be an issue).
+ const destStart = pullIntoDescriptor.byteOffset + pullIntoDescriptor.bytesFilled;
+ // FIXME: As indicated in comments of bug 172717, access to set is not safe. However, using prototype.$set.$call does
+ // not work ($set is undefined). A safe way to do that is needed.
+ new Uint8Array(pullIntoDescriptor.buffer).set(
+ new Uint8Array(headOfQueue.buffer, headOfQueue.byteOffset, bytesToCopy),
+ destStart,
+ );
+
+ if (headOfQueue.byteLength === bytesToCopy) $getByIdDirectPrivate(controller, "queue").content.shift();
+ else {
+ headOfQueue.byteOffset += bytesToCopy;
+ headOfQueue.byteLength -= bytesToCopy;
+ }
+
+ $getByIdDirectPrivate(controller, "queue").size -= bytesToCopy;
+ $assert(
+ $getByIdDirectPrivate(controller, "pendingPullIntos").isEmpty() ||
+ $getByIdDirectPrivate(controller, "pendingPullIntos").peek() === pullIntoDescriptor,
+ );
+ $readableByteStreamControllerInvalidateBYOBRequest(controller);
+ pullIntoDescriptor.bytesFilled += bytesToCopy;
+ totalBytesToCopyRemaining -= bytesToCopy;
+ }
+
+ if (!ready) {
+ $assert($getByIdDirectPrivate(controller, "queue").size === 0);
+ $assert(pullIntoDescriptor.bytesFilled > 0);
+ $assert(pullIntoDescriptor.bytesFilled < pullIntoDescriptor.elementSize);
+ }
+
+ return ready;
+}
+
+// Spec name: readableByteStreamControllerShiftPendingPullInto (renamed for consistency).
+export function readableByteStreamControllerShiftPendingDescriptor(controller) {
+ let descriptor = $getByIdDirectPrivate(controller, "pendingPullIntos").shift();
+ $readableByteStreamControllerInvalidateBYOBRequest(controller);
+ return descriptor;
+}
+
+export function readableByteStreamControllerInvalidateBYOBRequest(controller) {
+ if ($getByIdDirectPrivate(controller, "byobRequest") === undefined) return;
+ const byobRequest = $getByIdDirectPrivate(controller, "byobRequest");
+ $putByIdDirectPrivate(byobRequest, "associatedReadableByteStreamController", undefined);
+ $putByIdDirectPrivate(byobRequest, "view", undefined);
+ $putByIdDirectPrivate(controller, "byobRequest", undefined);
+}
+
+// Spec name: readableByteStreamControllerCommitPullIntoDescriptor (shortened for readability).
+export function readableByteStreamControllerCommitDescriptor(stream, pullIntoDescriptor) {
+ $assert($getByIdDirectPrivate(stream, "state") !== $streamErrored);
+ let done = false;
+ if ($getByIdDirectPrivate(stream, "state") === $streamClosed) {
+ $assert(!pullIntoDescriptor.bytesFilled);
+ done = true;
+ }
+ let filledView = $readableByteStreamControllerConvertDescriptor(pullIntoDescriptor);
+ if (pullIntoDescriptor.readerType === "default") $readableStreamFulfillReadRequest(stream, filledView, done);
+ else {
+ $assert(pullIntoDescriptor.readerType === "byob");
+ $readableStreamFulfillReadIntoRequest(stream, filledView, done);
+ }
+}
+
+// Spec name: readableByteStreamControllerConvertPullIntoDescriptor (shortened for readability).
+export function readableByteStreamControllerConvertDescriptor(pullIntoDescriptor) {
+ $assert(pullIntoDescriptor.bytesFilled <= pullIntoDescriptor.byteLength);
+ $assert(pullIntoDescriptor.bytesFilled % pullIntoDescriptor.elementSize === 0);
+
+ return new pullIntoDescriptor.ctor(
+ pullIntoDescriptor.buffer,
+ pullIntoDescriptor.byteOffset,
+ pullIntoDescriptor.bytesFilled / pullIntoDescriptor.elementSize,
+ );
+}
+
+export function readableStreamFulfillReadIntoRequest(stream, chunk, done) {
+ const readIntoRequest = $getByIdDirectPrivate($getByIdDirectPrivate(stream, "reader"), "readIntoRequests").shift();
+ $fulfillPromise(readIntoRequest, { value: chunk, done: done });
+}
+
+export function readableStreamBYOBReaderRead(reader, view) {
+ const stream = $getByIdDirectPrivate(reader, "ownerReadableStream");
+ $assert(!!stream);
+
+ $putByIdDirectPrivate(stream, "disturbed", true);
+ if ($getByIdDirectPrivate(stream, "state") === $streamErrored)
+ return Promise.$reject($getByIdDirectPrivate(stream, "storedError"));
+
+ return $readableByteStreamControllerPullInto($getByIdDirectPrivate(stream, "readableStreamController"), view);
+}
+
+export function readableByteStreamControllerPullInto(controller, view) {
+ const stream = $getByIdDirectPrivate(controller, "controlledReadableStream");
+ let elementSize = 1;
+ // Spec describes that in the case where view is a TypedArray, elementSize
+ // should be set to the size of an element (e.g. 2 for UInt16Array). For
+ // DataView, BYTES_PER_ELEMENT is undefined, contrary to the same property
+ // for TypedArrays.
+ // FIXME: Getting BYTES_PER_ELEMENT like this is not safe (property is read-only
+ // but can be modified if the prototype is redefined). A safe way of getting
+ // it would be to determine which type of ArrayBufferView view is an instance
+ // of based on typed arrays private variables. However, this is not possible due
+ // to bug 167697, which prevents access to typed arrays through their private
+ // names unless public name has already been met before.
+ if (view.BYTES_PER_ELEMENT !== undefined) elementSize = view.BYTES_PER_ELEMENT;
+
+ // FIXME: Getting constructor like this is not safe. A safe way of getting
+ // it would be to determine which type of ArrayBufferView view is an instance
+ // of, and to assign appropriate constructor based on this (e.g. ctor =
+ // $Uint8Array). However, this is not possible due to bug 167697, which
+ // prevents access to typed arrays through their private names unless public
+ // name has already been met before.
+ const ctor = view.constructor;
+
+ const pullIntoDescriptor = {
+ buffer: view.buffer,
+ byteOffset: view.byteOffset,
+ byteLength: view.byteLength,
+ bytesFilled: 0,
+ elementSize,
+ ctor,
+ readerType: "byob",
+ };
+
+ var pending = $getByIdDirectPrivate(controller, "pendingPullIntos");
+ if (pending?.isNotEmpty()) {
+ pullIntoDescriptor.buffer = $transferBufferToCurrentRealm(pullIntoDescriptor.buffer);
+ pending.push(pullIntoDescriptor);
+ return $readableStreamAddReadIntoRequest(stream);
+ }
+
+ if ($getByIdDirectPrivate(stream, "state") === $streamClosed) {
+ const emptyView = new ctor(pullIntoDescriptor.buffer, pullIntoDescriptor.byteOffset, 0);
+ return $createFulfilledPromise({ value: emptyView, done: true });
+ }
+
+ if ($getByIdDirectPrivate(controller, "queue").size > 0) {
+ if ($readableByteStreamControllerFillDescriptorFromQueue(controller, pullIntoDescriptor)) {
+ const filledView = $readableByteStreamControllerConvertDescriptor(pullIntoDescriptor);
+ $readableByteStreamControllerHandleQueueDrain(controller);
+ return $createFulfilledPromise({ value: filledView, done: false });
+ }
+ if ($getByIdDirectPrivate(controller, "closeRequested")) {
+ const e = $makeTypeError("Closing stream has been requested");
+ $readableByteStreamControllerError(controller, e);
+ return Promise.$reject(e);
+ }
+ }
+
+ pullIntoDescriptor.buffer = $transferBufferToCurrentRealm(pullIntoDescriptor.buffer);
+ $getByIdDirectPrivate(controller, "pendingPullIntos").push(pullIntoDescriptor);
+ const promise = $readableStreamAddReadIntoRequest(stream);
+ $readableByteStreamControllerCallPullIfNeeded(controller);
+ return promise;
+}
+
+export function readableStreamAddReadIntoRequest(stream) {
+ $assert($isReadableStreamBYOBReader($getByIdDirectPrivate(stream, "reader")));
+ $assert(
+ $getByIdDirectPrivate(stream, "state") === $streamReadable ||
+ $getByIdDirectPrivate(stream, "state") === $streamClosed,
+ );
+
+ const readRequest = $newPromise();
+ $getByIdDirectPrivate($getByIdDirectPrivate(stream, "reader"), "readIntoRequests").push(readRequest);
+
+ return readRequest;
+}
diff --git a/src/bun.js/builtins/ts/ReadableStream.ts b/src/bun.js/builtins/ts/ReadableStream.ts
new file mode 100644
index 000000000..22453403d
--- /dev/null
+++ b/src/bun.js/builtins/ts/ReadableStream.ts
@@ -0,0 +1,416 @@
+/*
+ * Copyright (C) 2015 Canon Inc.
+ * Copyright (C) 2015 Igalia.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ * 1. Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY APPLE INC. ``AS IS'' AND ANY
+ * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+ * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL APPLE INC. OR
+ * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
+ * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
+ * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
+ * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY
+ * OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+export function initializeReadableStream(this: any, underlyingSource: UnderlyingSource, strategy: any) {
+ if (underlyingSource === undefined)
+ underlyingSource = { $bunNativeType: 0, $bunNativePtr: 0, $lazy: false } as UnderlyingSource;
+ if (strategy === undefined) strategy = {};
+
+ if (!$isObject(underlyingSource)) throw new TypeError("ReadableStream constructor takes an object as first argument");
+
+ if (strategy !== undefined && !$isObject(strategy))
+ throw new TypeError("ReadableStream constructor takes an object as second argument, if any");
+
+ $putByIdDirectPrivate(this, "state", $streamReadable);
+
+ $putByIdDirectPrivate(this, "reader", undefined);
+
+ $putByIdDirectPrivate(this, "storedError", undefined);
+
+ $putByIdDirectPrivate(this, "disturbed", false);
+
+ // Initialized with null value to enable distinction with undefined case.
+ $putByIdDirectPrivate(this, "readableStreamController", null);
+ $putByIdDirectPrivate(this, "bunNativeType", $getByIdDirectPrivate(underlyingSource, "bunNativeType") ?? 0);
+ $putByIdDirectPrivate(this, "bunNativePtr", $getByIdDirectPrivate(underlyingSource, "bunNativePtr") ?? 0);
+
+ const isDirect = underlyingSource.type === "direct";
+ // direct streams are always lazy
+ const isUnderlyingSourceLazy = !!underlyingSource.$lazy;
+ const isLazy = isDirect || isUnderlyingSourceLazy;
+
+ // FIXME: We should introduce https://streams.spec.whatwg.org/#create-readable-stream.
+ // For now, we emulate this with underlyingSource with private properties.
+ if ($getByIdDirectPrivate(underlyingSource, "pull") !== undefined && !isLazy) {
+ const size = $getByIdDirectPrivate(strategy, "size");
+ const highWaterMark = $getByIdDirectPrivate(strategy, "highWaterMark");
+ $putByIdDirectPrivate(this, "highWaterMark", highWaterMark);
+ $putByIdDirectPrivate(this, "underlyingSource", undefined);
+ $setupReadableStreamDefaultController(
+ this,
+ underlyingSource,
+ size,
+ highWaterMark !== undefined ? highWaterMark : 1,
+ $getByIdDirectPrivate(underlyingSource, "start"),
+ $getByIdDirectPrivate(underlyingSource, "pull"),
+ $getByIdDirectPrivate(underlyingSource, "cancel"),
+ );
+
+ return this;
+ }
+ if (isDirect) {
+ $putByIdDirectPrivate(this, "underlyingSource", underlyingSource);
+ $putByIdDirectPrivate(this, "highWaterMark", $getByIdDirectPrivate(strategy, "highWaterMark"));
+ $putByIdDirectPrivate(this, "start", () => $createReadableStreamController(this, underlyingSource, strategy));
+ } else if (isLazy) {
+ const autoAllocateChunkSize = underlyingSource.autoAllocateChunkSize;
+ $putByIdDirectPrivate(this, "highWaterMark", undefined);
+ $putByIdDirectPrivate(this, "underlyingSource", undefined);
+ $putByIdDirectPrivate(
+ this,
+ "highWaterMark",
+ autoAllocateChunkSize || $getByIdDirectPrivate(strategy, "highWaterMark"),
+ );
+
+ $putByIdDirectPrivate(this, "start", () => {
+ const instance = $lazyLoadStream(this, autoAllocateChunkSize);
+ if (instance) {
+ $createReadableStreamController(this, instance, strategy);
+ }
+ });
+ } else {
+ $putByIdDirectPrivate(this, "underlyingSource", undefined);
+ $putByIdDirectPrivate(this, "highWaterMark", $getByIdDirectPrivate(strategy, "highWaterMark"));
+ $putByIdDirectPrivate(this, "start", undefined);
+ $createReadableStreamController(this, underlyingSource, strategy);
+ }
+
+ return this;
+}
+
+$linkTimeConstant;
+export function readableStreamToArray(stream) {
+ // this is a direct stream
+ var underlyingSource = $getByIdDirectPrivate(stream, "underlyingSource");
+ if (underlyingSource !== undefined) {
+ return $readableStreamToArrayDirect(stream, underlyingSource);
+ }
+
+ return $readableStreamIntoArray(stream);
+}
+
+$linkTimeConstant;
+export function readableStreamToText(stream) {
+ // this is a direct stream
+ var underlyingSource = $getByIdDirectPrivate(stream, "underlyingSource");
+ if (underlyingSource !== undefined) {
+ return $readableStreamToTextDirect(stream, underlyingSource);
+ }
+
+ return $readableStreamIntoText(stream);
+}
+
+$linkTimeConstant;
+export function readableStreamToArrayBuffer(stream) {
+ // this is a direct stream
+ var underlyingSource = $getByIdDirectPrivate(stream, "underlyingSource");
+
+ if (underlyingSource !== undefined) {
+ return $readableStreamToArrayBufferDirect(stream, underlyingSource);
+ }
+
+ return (Bun.readableStreamToArray(stream) as Promise<any>).$then(Bun.concatArrayBuffers);
+}
+
+$linkTimeConstant;
+export function readableStreamToJSON(stream) {
+ return Bun.readableStreamToText(stream).$then(globalThis.JSON.parse);
+}
+
+$linkTimeConstant;
+export function readableStreamToBlob(stream) {
+ return Promise.resolve(Bun.readableStreamToArray(stream)).$then(array => new Blob(array));
+}
+
+$linkTimeConstant;
+export function consumeReadableStream(nativePtr, nativeType, inputStream) {
+ const symbol = globalThis.Symbol.for("Bun.consumeReadableStreamPrototype");
+ var cached = globalThis[symbol];
+ if (!cached) {
+ cached = globalThis[symbol] = [];
+ }
+ var Prototype = cached[nativeType];
+ if (Prototype === undefined) {
+ var [doRead, doError, doReadMany, doClose, onClose, deinit] =
+ globalThis[globalThis.Symbol.for("Bun.lazy")](nativeType);
+
+ Prototype = class NativeReadableStreamSink {
+ handleError: any;
+ handleClosed: any;
+ processResult: any;
+
+ constructor(reader, ptr) {
+ this.#ptr = ptr;
+ this.#reader = reader;
+ this.#didClose = false;
+
+ this.handleError = this._handleError.bind(this);
+ this.handleClosed = this._handleClosed.bind(this);
+ this.processResult = this._processResult.bind(this);
+
+ reader.closed.then(this.handleClosed, this.handleError);
+ }
+
+ _handleClosed() {
+ if (this.#didClose) return;
+ this.#didClose = true;
+ var ptr = this.#ptr;
+ this.#ptr = 0;
+ doClose(ptr);
+ deinit(ptr);
+ }
+
+ _handleError(error) {
+ if (this.#didClose) return;
+ this.#didClose = true;
+ var ptr = this.#ptr;
+ this.#ptr = 0;
+ doError(ptr, error);
+ deinit(ptr);
+ }
+
+ #ptr;
+ #didClose = false;
+ #reader;
+
+ _handleReadMany({ value, done, size }) {
+ if (done) {
+ this.handleClosed();
+ return;
+ }
+
+ if (this.#didClose) return;
+
+ doReadMany(this.#ptr, value, done, size);
+ }
+
+ read() {
+ if (!this.#ptr) return $throwTypeError("ReadableStreamSink is already closed");
+
+ return this.processResult(this.#reader.read());
+ }
+
+ _processResult(result) {
+ if (result && $isPromise(result)) {
+ const flags = $getPromiseInternalField(result, $promiseFieldFlags);
+ if (flags & $promiseStateFulfilled) {
+ const fulfilledValue = $getPromiseInternalField(result, $promiseFieldReactionsOrResult);
+ if (fulfilledValue) {
+ result = fulfilledValue;
+ }
+ }
+ }
+
+ if (result && $isPromise(result)) {
+ result.then(this.processResult, this.handleError);
+ return null;
+ }
+
+ if (result.done) {
+ this.handleClosed();
+ return 0;
+ } else if (result.value) {
+ return result.value;
+ } else {
+ return -1;
+ }
+ }
+
+ readMany() {
+ if (!this.#ptr) return $throwTypeError("ReadableStreamSink is already closed");
+ return this.processResult(this.#reader.readMany());
+ }
+ };
+
+ const minlength = nativeType + 1;
+ if (cached.length < minlength) {
+ cached.length = minlength;
+ }
+ $putByValDirect(cached, nativeType, Prototype);
+ }
+
+ if ($isReadableStreamLocked(inputStream)) {
+ throw new TypeError("Cannot start reading from a locked stream");
+ }
+
+ return new Prototype(inputStream.getReader(), nativePtr);
+}
+
+$linkTimeConstant;
+export function createEmptyReadableStream() {
+ var stream = new ReadableStream({
+ pull() {},
+ } as any);
+ $readableStreamClose(stream);
+ return stream;
+}
+
+$linkTimeConstant;
+export function createNativeReadableStream(nativePtr, nativeType, autoAllocateChunkSize) {
+ return new ReadableStream({
+ $lazy: true,
+ $bunNativeType: nativeType,
+ $bunNativePtr: nativePtr,
+ autoAllocateChunkSize: autoAllocateChunkSize,
+ });
+}
+
+export function cancel(this, reason) {
+ if (!$isReadableStream(this)) return Promise.$reject($makeThisTypeError("ReadableStream", "cancel"));
+
+ if ($isReadableStreamLocked(this)) return Promise.$reject($makeTypeError("ReadableStream is locked"));
+
+ return $readableStreamCancel(this, reason);
+}
+
+export function getReader(this, options) {
+ if (!$isReadableStream(this)) throw $makeThisTypeError("ReadableStream", "getReader");
+
+ const mode = $toDictionary(options, {}, "ReadableStream.getReader takes an object as first argument").mode;
+ if (mode === undefined) {
+ var start_ = $getByIdDirectPrivate(this, "start");
+ if (start_) {
+ $putByIdDirectPrivate(this, "start", undefined);
+ start_();
+ }
+
+ return new ReadableStreamDefaultReader(this);
+ }
+ // String conversion is required by spec, hence double equals.
+ if (mode == "byob") {
+ return new ReadableStreamBYOBReader(this);
+ }
+
+ throw new TypeError("Invalid mode is specified");
+}
+
+export function pipeThrough(this, streams, options) {
+ const transforms = streams;
+
+ const readable = transforms["readable"];
+ if (!$isReadableStream(readable)) throw $makeTypeError("readable should be ReadableStream");
+
+ const writable = transforms["writable"];
+ const internalWritable = $getInternalWritableStream(writable);
+ if (!$isWritableStream(internalWritable)) throw $makeTypeError("writable should be WritableStream");
+
+ let preventClose = false;
+ let preventAbort = false;
+ let preventCancel = false;
+ let signal;
+ if (!$isUndefinedOrNull(options)) {
+ if (!$isObject(options)) throw $makeTypeError("options must be an object");
+
+ preventAbort = !!options["preventAbort"];
+ preventCancel = !!options["preventCancel"];
+ preventClose = !!options["preventClose"];
+
+ signal = options["signal"];
+ if (signal !== undefined && !$isAbortSignal(signal)) throw $makeTypeError("options.signal must be AbortSignal");
+ }
+
+ if (!$isReadableStream(this)) throw $makeThisTypeError("ReadableStream", "pipeThrough");
+
+ if ($isReadableStreamLocked(this)) throw $makeTypeError("ReadableStream is locked");
+
+ if ($isWritableStreamLocked(internalWritable)) throw $makeTypeError("WritableStream is locked");
+
+ $readableStreamPipeToWritableStream(this, internalWritable, preventClose, preventAbort, preventCancel, signal);
+
+ return readable;
+}
+
+export function pipeTo(this, destination) {
+ if (!$isReadableStream(this)) return Promise.$reject($makeThisTypeError("ReadableStream", "pipeTo"));
+
+ if ($isReadableStreamLocked(this)) return Promise.$reject($makeTypeError("ReadableStream is locked"));
+
+ // FIXME: https://bugs.webkit.org/show_bug.cgi?id=159869.
+ // Built-in generator should be able to parse function signature to compute the function length correctly.
+ let options = $argument(1);
+
+ let preventClose = false;
+ let preventAbort = false;
+ let preventCancel = false;
+ let signal;
+ if (!$isUndefinedOrNull(options)) {
+ if (!$isObject(options)) return Promise.$reject($makeTypeError("options must be an object"));
+
+ try {
+ preventAbort = !!options["preventAbort"];
+ preventCancel = !!options["preventCancel"];
+ preventClose = !!options["preventClose"];
+
+ signal = options["signal"];
+ } catch (e) {
+ return Promise.$reject(e);
+ }
+
+ if (signal !== undefined && !$isAbortSignal(signal))
+ return Promise.$reject(new TypeError("options.signal must be AbortSignal"));
+ }
+
+ const internalDestination = $getInternalWritableStream(destination);
+ if (!$isWritableStream(internalDestination))
+ return Promise.$reject(new TypeError("ReadableStream pipeTo requires a WritableStream"));
+
+ if ($isWritableStreamLocked(internalDestination)) return Promise.$reject(new TypeError("WritableStream is locked"));
+
+ return $readableStreamPipeToWritableStream(
+ this,
+ internalDestination,
+ preventClose,
+ preventAbort,
+ preventCancel,
+ signal,
+ );
+}
+
+export function tee(this) {
+ if (!$isReadableStream(this)) throw $makeThisTypeError("ReadableStream", "tee");
+
+ return $readableStreamTee(this, false);
+}
+
+$getter;
+export function locked(this) {
+ if (!$isReadableStream(this)) throw $makeGetterTypeError("ReadableStream", "locked");
+
+ return $isReadableStreamLocked(this);
+}
+
+export function values(this, options) {
+ var prototype = ReadableStream.prototype;
+ $readableStreamDefineLazyIterators(prototype);
+ return prototype.values.$call(this, options);
+}
+
+$linkTimeConstant;
+export function lazyAsyncIterator(this) {
+ var prototype = ReadableStream.prototype;
+ $readableStreamDefineLazyIterators(prototype);
+ return prototype[globalThis.Symbol.asyncIterator].$call(this);
+}
diff --git a/src/bun.js/builtins/ts/ReadableStreamBYOBReader.ts b/src/bun.js/builtins/ts/ReadableStreamBYOBReader.ts
new file mode 100644
index 000000000..5ebfddb19
--- /dev/null
+++ b/src/bun.js/builtins/ts/ReadableStreamBYOBReader.ts
@@ -0,0 +1,80 @@
+/*
+ * Copyright (C) 2017 Canon Inc.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ * 1. Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY CANON INC. AND ITS CONTRIBUTORS "AS IS" AND ANY
+ * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+ * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+ * DISCLAIMED. IN NO EVENT SHALL CANON INC. AND ITS CONTRIBUTORS BE LIABLE FOR
+ * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+ * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
+ * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
+ * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
+ * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+export function initializeReadableStreamBYOBReader(this, stream) {
+ if (!$isReadableStream(stream)) throw new TypeError("ReadableStreamBYOBReader needs a ReadableStream");
+ if (!$isReadableByteStreamController($getByIdDirectPrivate(stream, "readableStreamController")))
+ throw new TypeError("ReadableStreamBYOBReader needs a ReadableByteStreamController");
+ if ($isReadableStreamLocked(stream)) throw new TypeError("ReadableStream is locked");
+
+ $readableStreamReaderGenericInitialize(this, stream);
+ $putByIdDirectPrivate(this, "readIntoRequests", $createFIFO());
+
+ return this;
+}
+
+export function cancel(this, reason) {
+ if (!$isReadableStreamBYOBReader(this))
+ return Promise.$reject($makeThisTypeError("ReadableStreamBYOBReader", "cancel"));
+
+ if (!$getByIdDirectPrivate(this, "ownerReadableStream"))
+ return Promise.$reject($makeTypeError("cancel() called on a reader owned by no readable stream"));
+
+ return $readableStreamReaderGenericCancel(this, reason);
+}
+
+export function read(this, view: DataView) {
+ if (!$isReadableStreamBYOBReader(this))
+ return Promise.$reject($makeThisTypeError("ReadableStreamBYOBReader", "read"));
+
+ if (!$getByIdDirectPrivate(this, "ownerReadableStream"))
+ return Promise.$reject($makeTypeError("read() called on a reader owned by no readable stream"));
+
+ if (!$isObject(view)) return Promise.$reject($makeTypeError("Provided view is not an object"));
+
+ if (!ArrayBuffer.$isView(view)) return Promise.$reject($makeTypeError("Provided view is not an ArrayBufferView"));
+
+ if (view.byteLength === 0) return Promise.$reject($makeTypeError("Provided view cannot have a 0 byteLength"));
+
+ return $readableStreamBYOBReaderRead(this, view);
+}
+
+export function releaseLock(this) {
+ if (!$isReadableStreamBYOBReader(this)) throw $makeThisTypeError("ReadableStreamBYOBReader", "releaseLock");
+
+ if (!$getByIdDirectPrivate(this, "ownerReadableStream")) return;
+
+ if ($getByIdDirectPrivate(this, "readIntoRequests")?.isNotEmpty())
+ throw new TypeError("There are still pending read requests, cannot release the lock");
+
+ $readableStreamReaderGenericRelease(this);
+}
+
+$getter;
+export function closed(this) {
+ if (!$isReadableStreamBYOBReader(this))
+ return Promise.$reject($makeGetterTypeError("ReadableStreamBYOBReader", "closed"));
+
+ return $getByIdDirectPrivate(this, "closedPromiseCapability").$promise;
+}
diff --git a/src/bun.js/builtins/ts/ReadableStreamBYOBRequest.ts b/src/bun.js/builtins/ts/ReadableStreamBYOBRequest.ts
new file mode 100644
index 000000000..1354f9349
--- /dev/null
+++ b/src/bun.js/builtins/ts/ReadableStreamBYOBRequest.ts
@@ -0,0 +1,66 @@
+/*
+ * Copyright (C) 2017 Canon Inc.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ * 1. Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY APPLE INC. ``AS IS'' AND ANY
+ * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+ * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL APPLE INC. OR
+ * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
+ * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
+ * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
+ * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY
+ * OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+export function initializeReadableStreamBYOBRequest(this, controller, view) {
+ if (arguments.length !== 3 && arguments[2] !== $isReadableStream)
+ throw new TypeError("ReadableStreamBYOBRequest constructor should not be called directly");
+
+ return $privateInitializeReadableStreamBYOBRequest.$call(this, controller, view);
+}
+
+export function respond(this, bytesWritten) {
+ if (!$isReadableStreamBYOBRequest(this)) throw $makeThisTypeError("ReadableStreamBYOBRequest", "respond");
+
+ if ($getByIdDirectPrivate(this, "associatedReadableByteStreamController") === undefined)
+ throw new TypeError("ReadableStreamBYOBRequest.associatedReadableByteStreamController is undefined");
+
+ return $readableByteStreamControllerRespond(
+ $getByIdDirectPrivate(this, "associatedReadableByteStreamController"),
+ bytesWritten,
+ );
+}
+
+export function respondWithNewView(this, view) {
+ if (!$isReadableStreamBYOBRequest(this)) throw $makeThisTypeError("ReadableStreamBYOBRequest", "respond");
+
+ if ($getByIdDirectPrivate(this, "associatedReadableByteStreamController") === undefined)
+ throw new TypeError("ReadableStreamBYOBRequest.associatedReadableByteStreamController is undefined");
+
+ if (!$isObject(view)) throw new TypeError("Provided view is not an object");
+
+ if (!ArrayBuffer.$isView(view)) throw new TypeError("Provided view is not an ArrayBufferView");
+
+ return $readableByteStreamControllerRespondWithNewView(
+ $getByIdDirectPrivate(this, "associatedReadableByteStreamController"),
+ view,
+ );
+}
+
+$getter;
+export function view(this) {
+ if (!$isReadableStreamBYOBRequest(this)) throw $makeGetterTypeError("ReadableStreamBYOBRequest", "view");
+
+ return $getByIdDirectPrivate(this, "view");
+}
diff --git a/src/bun.js/builtins/ts/ReadableStreamDefaultController.ts b/src/bun.js/builtins/ts/ReadableStreamDefaultController.ts
new file mode 100644
index 000000000..912cd1acb
--- /dev/null
+++ b/src/bun.js/builtins/ts/ReadableStreamDefaultController.ts
@@ -0,0 +1,63 @@
+/*
+ * Copyright (C) 2015 Canon Inc.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ * 1. Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY APPLE INC. ``AS IS'' AND ANY
+ * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+ * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL APPLE INC. OR
+ * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
+ * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
+ * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
+ * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY
+ * OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+export function initializeReadableStreamDefaultController(this, stream, underlyingSource, size, highWaterMark) {
+ if (arguments.length !== 5 && arguments[4] !== $isReadableStream)
+ throw new TypeError("ReadableStreamDefaultController constructor should not be called directly");
+
+ return $privateInitializeReadableStreamDefaultController.$call(this, stream, underlyingSource, size, highWaterMark);
+}
+
+export function enqueue(this, chunk) {
+ if (!$isReadableStreamDefaultController(this)) throw $makeThisTypeError("ReadableStreamDefaultController", "enqueue");
+
+ if (!$readableStreamDefaultControllerCanCloseOrEnqueue(this))
+ throw new TypeError("ReadableStreamDefaultController is not in a state where chunk can be enqueued");
+
+ return $readableStreamDefaultControllerEnqueue(this, chunk);
+}
+
+export function error(this, err) {
+ if (!$isReadableStreamDefaultController(this)) throw $makeThisTypeError("ReadableStreamDefaultController", "error");
+
+ $readableStreamDefaultControllerError(this, err);
+}
+
+export function close(this) {
+ if (!$isReadableStreamDefaultController(this)) throw $makeThisTypeError("ReadableStreamDefaultController", "close");
+
+ if (!$readableStreamDefaultControllerCanCloseOrEnqueue(this))
+ throw new TypeError("ReadableStreamDefaultController is not in a state where it can be closed");
+
+ $readableStreamDefaultControllerClose(this);
+}
+
+$getter;
+export function desiredSize(this) {
+ if (!$isReadableStreamDefaultController(this))
+ throw $makeGetterTypeError("ReadableStreamDefaultController", "desiredSize");
+
+ return $readableStreamDefaultControllerGetDesiredSize(this);
+}
diff --git a/src/bun.js/builtins/ts/ReadableStreamDefaultReader.ts b/src/bun.js/builtins/ts/ReadableStreamDefaultReader.ts
new file mode 100644
index 000000000..ecd553ed5
--- /dev/null
+++ b/src/bun.js/builtins/ts/ReadableStreamDefaultReader.ts
@@ -0,0 +1,185 @@
+/*
+ * Copyright (C) 2015 Canon Inc.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ * 1. Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY APPLE INC. ``AS IS'' AND ANY
+ * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+ * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL APPLE INC. OR
+ * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
+ * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
+ * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
+ * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY
+ * OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+export function initializeReadableStreamDefaultReader(this, stream) {
+ if (!$isReadableStream(stream)) throw new TypeError("ReadableStreamDefaultReader needs a ReadableStream");
+ if ($isReadableStreamLocked(stream)) throw new TypeError("ReadableStream is locked");
+
+ $readableStreamReaderGenericInitialize(this, stream);
+ $putByIdDirectPrivate(this, "readRequests", $createFIFO());
+
+ return this;
+}
+
+export function cancel(this, reason) {
+ if (!$isReadableStreamDefaultReader(this))
+ return Promise.$reject($makeThisTypeError("ReadableStreamDefaultReader", "cancel"));
+
+ if (!$getByIdDirectPrivate(this, "ownerReadableStream"))
+ return Promise.$reject(new TypeError("cancel() called on a reader owned by no readable stream"));
+
+ return $readableStreamReaderGenericCancel(this, reason);
+}
+
+export function readMany(this) {
+ if (!$isReadableStreamDefaultReader(this))
+ throw new TypeError("ReadableStreamDefaultReader.readMany() should not be called directly");
+
+ const stream = $getByIdDirectPrivate(this, "ownerReadableStream");
+ if (!stream) throw new TypeError("readMany() called on a reader owned by no readable stream");
+
+ const state = $getByIdDirectPrivate(stream, "state");
+ $putByIdDirectPrivate(stream, "disturbed", true);
+ if (state === $streamClosed) return { value: [], size: 0, done: true };
+ else if (state === $streamErrored) {
+ throw $getByIdDirectPrivate(stream, "storedError");
+ }
+
+ var controller = $getByIdDirectPrivate(stream, "readableStreamController");
+ var queue = $getByIdDirectPrivate(controller, "queue");
+
+ if (!queue) {
+ // This is a ReadableStream direct controller implemented in JS
+ // It hasn't been started yet.
+ return controller.$pull(controller).$then(function ({ done, value }) {
+ return done ? { done: true, value: [], size: 0 } : { value: [value], size: 1, done: false };
+ });
+ }
+
+ const content = queue.content;
+ var size = queue.size;
+ var values = content.toArray(false);
+
+ var length = values.length;
+
+ if (length > 0) {
+ var outValues = $newArrayWithSize(length);
+ if ($isReadableByteStreamController(controller)) {
+ {
+ const buf = values[0];
+ if (!(ArrayBuffer.$isView(buf) || buf instanceof ArrayBuffer)) {
+ $putByValDirect(outValues, 0, new Uint8Array(buf.buffer, buf.byteOffset, buf.byteLength));
+ } else {
+ $putByValDirect(outValues, 0, buf);
+ }
+ }
+
+ for (var i = 1; i < length; i++) {
+ const buf = values[i];
+ if (!(ArrayBuffer.$isView(buf) || buf instanceof ArrayBuffer)) {
+ $putByValDirect(outValues, i, new Uint8Array(buf.buffer, buf.byteOffset, buf.byteLength));
+ } else {
+ $putByValDirect(outValues, i, buf);
+ }
+ }
+ } else {
+ $putByValDirect(outValues, 0, values[0].value);
+ for (var i = 1; i < length; i++) {
+ $putByValDirect(outValues, i, values[i].value);
+ }
+ }
+
+ $resetQueue($getByIdDirectPrivate(controller, "queue"));
+
+ if ($getByIdDirectPrivate(controller, "closeRequested"))
+ $readableStreamClose($getByIdDirectPrivate(controller, "controlledReadableStream"));
+ else if ($isReadableStreamDefaultController(controller))
+ $readableStreamDefaultControllerCallPullIfNeeded(controller);
+ else if ($isReadableByteStreamController(controller)) $readableByteStreamControllerCallPullIfNeeded(controller);
+
+ return { value: outValues, size, done: false };
+ }
+
+ var onPullMany = result => {
+ if (result.done) {
+ return { value: [], size: 0, done: true };
+ }
+ var controller = $getByIdDirectPrivate(stream, "readableStreamController");
+
+ var queue = $getByIdDirectPrivate(controller, "queue");
+ var value = [result.value].concat(queue.content.toArray(false));
+ var length = value.length;
+
+ if ($isReadableByteStreamController(controller)) {
+ for (var i = 0; i < length; i++) {
+ const buf = value[i];
+ if (!(ArrayBuffer.$isView(buf) || buf instanceof ArrayBuffer)) {
+ const { buffer, byteOffset, byteLength } = buf;
+ $putByValDirect(value, i, new Uint8Array(buffer, byteOffset, byteLength));
+ }
+ }
+ } else {
+ for (var i = 1; i < length; i++) {
+ $putByValDirect(value, i, value[i].value);
+ }
+ }
+
+ var size = queue.size;
+ $resetQueue(queue);
+
+ if ($getByIdDirectPrivate(controller, "closeRequested"))
+ $readableStreamClose($getByIdDirectPrivate(controller, "controlledReadableStream"));
+ else if ($isReadableStreamDefaultController(controller))
+ $readableStreamDefaultControllerCallPullIfNeeded(controller);
+ else if ($isReadableByteStreamController(controller)) $readableByteStreamControllerCallPullIfNeeded(controller);
+
+ return { value: value, size: size, done: false };
+ };
+
+ var pullResult = controller.$pull(controller);
+ if (pullResult && $isPromise(pullResult)) {
+ return pullResult.$then(onPullMany);
+ }
+
+ return onPullMany(pullResult);
+}
+
+export function read(this) {
+ if (!$isReadableStreamDefaultReader(this))
+ return Promise.$reject($makeThisTypeError("ReadableStreamDefaultReader", "read"));
+ if (!$getByIdDirectPrivate(this, "ownerReadableStream"))
+ return Promise.$reject(new TypeError("read() called on a reader owned by no readable stream"));
+
+ return $readableStreamDefaultReaderRead(this);
+}
+
+export function releaseLock(this) {
+ if (!$isReadableStreamDefaultReader(this)) throw $makeThisTypeError("ReadableStreamDefaultReader", "releaseLock");
+
+ if (!$getByIdDirectPrivate(this, "ownerReadableStream")) return;
+
+ if ($getByIdDirectPrivate(this, "readRequests")?.isNotEmpty())
+ throw new TypeError("There are still pending read requests, cannot release the lock");
+
+ $readableStreamReaderGenericRelease(this);
+}
+
+$getter;
+export function closed(this) {
+ if (!$isReadableStreamDefaultReader(this))
+ return Promise.$reject($makeGetterTypeError("ReadableStreamDefaultReader", "closed"));
+
+ return $getByIdDirectPrivate(this, "closedPromiseCapability").$promise;
+}
diff --git a/src/bun.js/builtins/ts/ReadableStreamInternals.ts b/src/bun.js/builtins/ts/ReadableStreamInternals.ts
new file mode 100644
index 000000000..c0867445f
--- /dev/null
+++ b/src/bun.js/builtins/ts/ReadableStreamInternals.ts
@@ -0,0 +1,1801 @@
+/*
+ * Copyright (C) 2015 Canon Inc. All rights reserved.
+ * Copyright (C) 2015 Igalia.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ * 1. Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY APPLE INC. ``AS IS'' AND ANY
+ * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+ * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL APPLE INC. OR
+ * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
+ * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
+ * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
+ * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY
+ * OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+// @internal
+
+export function readableStreamReaderGenericInitialize(reader, stream) {
+ $putByIdDirectPrivate(reader, "ownerReadableStream", stream);
+ $putByIdDirectPrivate(stream, "reader", reader);
+ if ($getByIdDirectPrivate(stream, "state") === $streamReadable)
+ $putByIdDirectPrivate(reader, "closedPromiseCapability", $newPromiseCapability(Promise));
+ else if ($getByIdDirectPrivate(stream, "state") === $streamClosed)
+ $putByIdDirectPrivate(reader, "closedPromiseCapability", {
+ $promise: Promise.$resolve(),
+ });
+ else {
+ $assert($getByIdDirectPrivate(stream, "state") === $streamErrored);
+ $putByIdDirectPrivate(reader, "closedPromiseCapability", {
+ $promise: $newHandledRejectedPromise($getByIdDirectPrivate(stream, "storedError")),
+ });
+ }
+}
+
+export function privateInitializeReadableStreamDefaultController(this, stream, underlyingSource, size, highWaterMark) {
+ if (!$isReadableStream(stream)) throw new TypeError("ReadableStreamDefaultController needs a ReadableStream");
+
+ // readableStreamController is initialized with null value.
+ if ($getByIdDirectPrivate(stream, "readableStreamController") !== null)
+ throw new TypeError("ReadableStream already has a controller");
+
+ $putByIdDirectPrivate(this, "controlledReadableStream", stream);
+ $putByIdDirectPrivate(this, "underlyingSource", underlyingSource);
+ $putByIdDirectPrivate(this, "queue", $newQueue());
+ $putByIdDirectPrivate(this, "started", -1);
+ $putByIdDirectPrivate(this, "closeRequested", false);
+ $putByIdDirectPrivate(this, "pullAgain", false);
+ $putByIdDirectPrivate(this, "pulling", false);
+ $putByIdDirectPrivate(this, "strategy", $validateAndNormalizeQueuingStrategy(size, highWaterMark));
+
+ return this;
+}
+
+export function readableStreamDefaultControllerError(controller, error) {
+ const stream = $getByIdDirectPrivate(controller, "controlledReadableStream");
+ if ($getByIdDirectPrivate(stream, "state") !== $streamReadable) return;
+ $putByIdDirectPrivate(controller, "queue", $newQueue());
+
+ $readableStreamError(stream, error);
+}
+
+export function readableStreamPipeTo(stream, sink) {
+ $assert($isReadableStream(stream));
+
+ const reader = new ReadableStreamDefaultReader(stream);
+
+ $getByIdDirectPrivate(reader, "closedPromiseCapability").$promise.$then(
+ () => {},
+ e => {
+ sink.error(e);
+ },
+ );
+
+ function doPipe() {
+ $readableStreamDefaultReaderRead(reader).$then(
+ function (result) {
+ if (result.done) {
+ sink.close();
+ return;
+ }
+ try {
+ sink.enqueue(result.value);
+ } catch (e) {
+ sink.error("ReadableStream chunk enqueueing in the sink failed");
+ return;
+ }
+ doPipe();
+ },
+ function (e) {
+ sink.error(e);
+ },
+ );
+ }
+ doPipe();
+}
+
+export function acquireReadableStreamDefaultReader(stream) {
+ var start = $getByIdDirectPrivate(stream, "start");
+ if (start) {
+ start.$call(stream);
+ }
+
+ return new ReadableStreamDefaultReader(stream);
+}
+
+// https://streams.spec.whatwg.org/#set-up-readable-stream-default-controller, starting from step 6.
+// The other part is implemented in privateInitializeReadableStreamDefaultController.
+export function setupReadableStreamDefaultController(
+ stream,
+ underlyingSource,
+ size,
+ highWaterMark,
+ startMethod,
+ pullMethod,
+ cancelMethod,
+) {
+ const controller = new ReadableStreamDefaultController(
+ stream,
+ underlyingSource,
+ size,
+ highWaterMark,
+ $isReadableStream,
+ );
+
+ const pullAlgorithm = () => $promiseInvokeOrNoopMethod(underlyingSource, pullMethod, [controller]);
+ const cancelAlgorithm = reason => $promiseInvokeOrNoopMethod(underlyingSource, cancelMethod, [reason]);
+
+ $putByIdDirectPrivate(controller, "pullAlgorithm", pullAlgorithm);
+ $putByIdDirectPrivate(controller, "cancelAlgorithm", cancelAlgorithm);
+ $putByIdDirectPrivate(controller, "pull", $readableStreamDefaultControllerPull);
+ $putByIdDirectPrivate(controller, "cancel", $readableStreamDefaultControllerCancel);
+ $putByIdDirectPrivate(stream, "readableStreamController", controller);
+
+ $readableStreamDefaultControllerStart(controller);
+}
+
+export function createReadableStreamController(stream, underlyingSource, strategy) {
+ const type = underlyingSource.type;
+ const typeString = $toString(type);
+
+ if (typeString === "bytes") {
+ // if (!$readableByteStreamAPIEnabled())
+ // $throwTypeError("ReadableByteStreamController is not implemented");
+
+ if (strategy.highWaterMark === undefined) strategy.highWaterMark = 0;
+ if (strategy.size !== undefined) $throwRangeError("Strategy for a ReadableByteStreamController cannot have a size");
+
+ $putByIdDirectPrivate(
+ stream,
+ "readableStreamController",
+ new ReadableByteStreamController(stream, underlyingSource, strategy.highWaterMark, $isReadableStream),
+ );
+ } else if (typeString === "direct") {
+ var highWaterMark = strategy?.highWaterMark;
+ $initializeArrayBufferStream.$call(stream, underlyingSource, highWaterMark);
+ } else if (type === undefined) {
+ if (strategy.highWaterMark === undefined) strategy.highWaterMark = 1;
+
+ $setupReadableStreamDefaultController(
+ stream,
+ underlyingSource,
+ strategy.size,
+ strategy.highWaterMark,
+ underlyingSource.start,
+ underlyingSource.pull,
+ underlyingSource.cancel,
+ );
+ } else throw new RangeError("Invalid type for underlying source");
+}
+
+export function readableStreamDefaultControllerStart(controller) {
+ if ($getByIdDirectPrivate(controller, "started") !== -1) return;
+
+ const underlyingSource = $getByIdDirectPrivate(controller, "underlyingSource");
+ const startMethod = underlyingSource.start;
+ $putByIdDirectPrivate(controller, "started", 0);
+
+ $promiseInvokeOrNoopMethodNoCatch(underlyingSource, startMethod, [controller]).$then(
+ () => {
+ $putByIdDirectPrivate(controller, "started", 1);
+ $assert(!$getByIdDirectPrivate(controller, "pulling"));
+ $assert(!$getByIdDirectPrivate(controller, "pullAgain"));
+ $readableStreamDefaultControllerCallPullIfNeeded(controller);
+ },
+ error => {
+ $readableStreamDefaultControllerError(controller, error);
+ },
+ );
+}
+
+// FIXME: Replace readableStreamPipeTo by below function.
+// This method implements the latest https://streams.spec.whatwg.org/#readable-stream-pipe-to.
+export function readableStreamPipeToWritableStream(
+ source,
+ destination,
+ preventClose,
+ preventAbort,
+ preventCancel,
+ signal,
+) {
+ // const isDirectStream = !!$getByIdDirectPrivate(source, "start");
+
+ $assert($isReadableStream(source));
+ $assert($isWritableStream(destination));
+ $assert(!$isReadableStreamLocked(source));
+ $assert(!$isWritableStreamLocked(destination));
+ $assert(signal === undefined || $isAbortSignal(signal));
+
+ if ($getByIdDirectPrivate(source, "underlyingByteSource") !== undefined)
+ return Promise.$reject("Piping to a readable bytestream is not supported");
+
+ let pipeState: any = {
+ source: source,
+ destination: destination,
+ preventAbort: preventAbort,
+ preventCancel: preventCancel,
+ preventClose: preventClose,
+ signal: signal,
+ };
+
+ pipeState.reader = $acquireReadableStreamDefaultReader(source);
+ pipeState.writer = $acquireWritableStreamDefaultWriter(destination);
+
+ $putByIdDirectPrivate(source, "disturbed", true);
+
+ pipeState.finalized = false;
+ pipeState.shuttingDown = false;
+ pipeState.promiseCapability = $newPromiseCapability(Promise);
+ pipeState.pendingReadPromiseCapability = $newPromiseCapability(Promise);
+ pipeState.pendingReadPromiseCapability.$resolve.$call();
+ pipeState.pendingWritePromise = Promise.$resolve();
+
+ if (signal !== undefined) {
+ const algorithm = reason => {
+ if (pipeState.finalized) return;
+
+ $pipeToShutdownWithAction(
+ pipeState,
+ () => {
+ const shouldAbortDestination =
+ !pipeState.preventAbort && $getByIdDirectPrivate(pipeState.destination, "state") === "writable";
+ const promiseDestination = shouldAbortDestination
+ ? $writableStreamAbort(pipeState.destination, reason)
+ : Promise.$resolve();
+
+ const shouldAbortSource =
+ !pipeState.preventCancel && $getByIdDirectPrivate(pipeState.source, "state") === $streamReadable;
+ const promiseSource = shouldAbortSource
+ ? $readableStreamCancel(pipeState.source, reason)
+ : Promise.$resolve();
+
+ let promiseCapability = $newPromiseCapability(Promise);
+ let shouldWait = true;
+ let handleResolvedPromise = () => {
+ if (shouldWait) {
+ shouldWait = false;
+ return;
+ }
+ promiseCapability.$resolve.$call();
+ };
+ let handleRejectedPromise = e => {
+ promiseCapability.$reject.$call(undefined, e);
+ };
+ promiseDestination.$then(handleResolvedPromise, handleRejectedPromise);
+ promiseSource.$then(handleResolvedPromise, handleRejectedPromise);
+ return promiseCapability.$promise;
+ },
+ reason,
+ );
+ };
+ if ($whenSignalAborted(signal, algorithm)) return pipeState.promiseCapability.$promise;
+ }
+
+ $pipeToErrorsMustBePropagatedForward(pipeState);
+ $pipeToErrorsMustBePropagatedBackward(pipeState);
+ $pipeToClosingMustBePropagatedForward(pipeState);
+ $pipeToClosingMustBePropagatedBackward(pipeState);
+
+ $pipeToLoop(pipeState);
+
+ return pipeState.promiseCapability.$promise;
+}
+
+export function pipeToLoop(pipeState) {
+ if (pipeState.shuttingDown) return;
+
+ $pipeToDoReadWrite(pipeState).$then(result => {
+ if (result) $pipeToLoop(pipeState);
+ });
+}
+
+export function pipeToDoReadWrite(pipeState) {
+ $assert(!pipeState.shuttingDown);
+
+ pipeState.pendingReadPromiseCapability = $newPromiseCapability(Promise);
+ $getByIdDirectPrivate(pipeState.writer, "readyPromise").$promise.$then(
+ () => {
+ if (pipeState.shuttingDown) {
+ pipeState.pendingReadPromiseCapability.$resolve.$call(undefined, false);
+ return;
+ }
+
+ $readableStreamDefaultReaderRead(pipeState.reader).$then(
+ result => {
+ const canWrite = !result.done && $getByIdDirectPrivate(pipeState.writer, "stream") !== undefined;
+ pipeState.pendingReadPromiseCapability.$resolve.$call(undefined, canWrite);
+ if (!canWrite) return;
+
+ pipeState.pendingWritePromise = $writableStreamDefaultWriterWrite(pipeState.writer, result.value);
+ },
+ e => {
+ pipeState.pendingReadPromiseCapability.$resolve.$call(undefined, false);
+ },
+ );
+ },
+ e => {
+ pipeState.pendingReadPromiseCapability.$resolve.$call(undefined, false);
+ },
+ );
+ return pipeState.pendingReadPromiseCapability.$promise;
+}
+
+export function pipeToErrorsMustBePropagatedForward(pipeState) {
+ const action = () => {
+ pipeState.pendingReadPromiseCapability.$resolve.$call(undefined, false);
+ const error = $getByIdDirectPrivate(pipeState.source, "storedError");
+ if (!pipeState.preventAbort) {
+ $pipeToShutdownWithAction(pipeState, () => $writableStreamAbort(pipeState.destination, error), error);
+ return;
+ }
+ $pipeToShutdown(pipeState, error);
+ };
+
+ if ($getByIdDirectPrivate(pipeState.source, "state") === $streamErrored) {
+ action();
+ return;
+ }
+
+ $getByIdDirectPrivate(pipeState.reader, "closedPromiseCapability").$promise.$then(undefined, action);
+}
+
+export function pipeToErrorsMustBePropagatedBackward(pipeState) {
+ const action = () => {
+ const error = $getByIdDirectPrivate(pipeState.destination, "storedError");
+ if (!pipeState.preventCancel) {
+ $pipeToShutdownWithAction(pipeState, () => $readableStreamCancel(pipeState.source, error), error);
+ return;
+ }
+ $pipeToShutdown(pipeState, error);
+ };
+ if ($getByIdDirectPrivate(pipeState.destination, "state") === "errored") {
+ action();
+ return;
+ }
+ $getByIdDirectPrivate(pipeState.writer, "closedPromise").$promise.$then(undefined, action);
+}
+
+export function pipeToClosingMustBePropagatedForward(pipeState) {
+ const action = () => {
+ pipeState.pendingReadPromiseCapability.$resolve.$call(undefined, false);
+ // const error = $getByIdDirectPrivate(pipeState.source, "storedError");
+ if (!pipeState.preventClose) {
+ $pipeToShutdownWithAction(pipeState, () =>
+ $writableStreamDefaultWriterCloseWithErrorPropagation(pipeState.writer),
+ );
+ return;
+ }
+ $pipeToShutdown(pipeState);
+ };
+ if ($getByIdDirectPrivate(pipeState.source, "state") === $streamClosed) {
+ action();
+ return;
+ }
+ $getByIdDirectPrivate(pipeState.reader, "closedPromiseCapability").$promise.$then(action, undefined);
+}
+
+export function pipeToClosingMustBePropagatedBackward(pipeState) {
+ if (
+ !$writableStreamCloseQueuedOrInFlight(pipeState.destination) &&
+ $getByIdDirectPrivate(pipeState.destination, "state") !== "closed"
+ )
+ return;
+
+ // $assert no chunks have been read/written
+
+ const error = new TypeError("closing is propagated backward");
+ if (!pipeState.preventCancel) {
+ $pipeToShutdownWithAction(pipeState, () => $readableStreamCancel(pipeState.source, error), error);
+ return;
+ }
+ $pipeToShutdown(pipeState, error);
+}
+
+export function pipeToShutdownWithAction(pipeState, action) {
+ if (pipeState.shuttingDown) return;
+
+ pipeState.shuttingDown = true;
+
+ const hasError = arguments.length > 2;
+ const error = arguments[2];
+ const finalize = () => {
+ const promise = action();
+ promise.$then(
+ () => {
+ if (hasError) $pipeToFinalize(pipeState, error);
+ else $pipeToFinalize(pipeState);
+ },
+ e => {
+ $pipeToFinalize(pipeState, e);
+ },
+ );
+ };
+
+ if (
+ $getByIdDirectPrivate(pipeState.destination, "state") === "writable" &&
+ !$writableStreamCloseQueuedOrInFlight(pipeState.destination)
+ ) {
+ pipeState.pendingReadPromiseCapability.$promise.$then(
+ () => {
+ pipeState.pendingWritePromise.$then(finalize, finalize);
+ },
+ e => $pipeToFinalize(pipeState, e),
+ );
+ return;
+ }
+
+ finalize();
+}
+
+export function pipeToShutdown(pipeState) {
+ if (pipeState.shuttingDown) return;
+
+ pipeState.shuttingDown = true;
+
+ const hasError = arguments.length > 1;
+ const error = arguments[1];
+ const finalize = () => {
+ if (hasError) $pipeToFinalize(pipeState, error);
+ else $pipeToFinalize(pipeState);
+ };
+
+ if (
+ $getByIdDirectPrivate(pipeState.destination, "state") === "writable" &&
+ !$writableStreamCloseQueuedOrInFlight(pipeState.destination)
+ ) {
+ pipeState.pendingReadPromiseCapability.$promise.$then(
+ () => {
+ pipeState.pendingWritePromise.$then(finalize, finalize);
+ },
+ e => $pipeToFinalize(pipeState, e),
+ );
+ return;
+ }
+ finalize();
+}
+
+export function pipeToFinalize(pipeState) {
+ $writableStreamDefaultWriterRelease(pipeState.writer);
+ $readableStreamReaderGenericRelease(pipeState.reader);
+
+ // Instead of removing the abort algorithm as per spec, we make it a no-op which is equivalent.
+ pipeState.finalized = true;
+
+ if (arguments.length > 1) pipeState.promiseCapability.$reject.$call(undefined, arguments[1]);
+ else pipeState.promiseCapability.$resolve.$call();
+}
+
+export function readableStreamTee(stream, shouldClone) {
+ $assert($isReadableStream(stream));
+ $assert(typeof shouldClone === "boolean");
+
+ var start_ = $getByIdDirectPrivate(stream, "start");
+ if (start_) {
+ $putByIdDirectPrivate(stream, "start", undefined);
+ start_();
+ }
+
+ const reader = new $ReadableStreamDefaultReader(stream);
+
+ const teeState = {
+ closedOrErrored: false,
+ canceled1: false,
+ canceled2: false,
+ reason1: undefined,
+ reason2: undefined,
+ };
+
+ teeState.cancelPromiseCapability = $newPromiseCapability(Promise);
+
+ const pullFunction = $readableStreamTeePullFunction(teeState, reader, shouldClone);
+
+ const branch1Source = {};
+ $putByIdDirectPrivate(branch1Source, "pull", pullFunction);
+ $putByIdDirectPrivate(branch1Source, "cancel", $readableStreamTeeBranch1CancelFunction(teeState, stream));
+
+ const branch2Source = {};
+ $putByIdDirectPrivate(branch2Source, "pull", pullFunction);
+ $putByIdDirectPrivate(branch2Source, "cancel", $readableStreamTeeBranch2CancelFunction(teeState, stream));
+
+ const branch1 = new $ReadableStream(branch1Source);
+ const branch2 = new $ReadableStream(branch2Source);
+
+ $getByIdDirectPrivate(reader, "closedPromiseCapability").$promise.$then(undefined, function (e) {
+ if (teeState.closedOrErrored) return;
+ $readableStreamDefaultControllerError(branch1.$readableStreamController, e);
+ $readableStreamDefaultControllerError(branch2.$readableStreamController, e);
+ teeState.closedOrErrored = true;
+ if (!teeState.canceled1 || !teeState.canceled2) teeState.cancelPromiseCapability.$resolve.$call();
+ });
+
+ // Additional fields compared to the spec, as they are needed within pull/cancel functions.
+ teeState.branch1 = branch1;
+ teeState.branch2 = branch2;
+
+ return [branch1, branch2];
+}
+
+export function readableStreamTeePullFunction(teeState, reader, shouldClone) {
+ return function () {
+ Promise.prototype.$then.$call($readableStreamDefaultReaderRead(reader), function (result) {
+ $assert($isObject(result));
+ $assert(typeof result.done === "boolean");
+ if (result.done && !teeState.closedOrErrored) {
+ if (!teeState.canceled1) $readableStreamDefaultControllerClose(teeState.branch1.$readableStreamController);
+ if (!teeState.canceled2) $readableStreamDefaultControllerClose(teeState.branch2.$readableStreamController);
+ teeState.closedOrErrored = true;
+ if (!teeState.canceled1 || !teeState.canceled2) teeState.cancelPromiseCapability.$resolve.$call();
+ }
+ if (teeState.closedOrErrored) return;
+ if (!teeState.canceled1)
+ $readableStreamDefaultControllerEnqueue(teeState.branch1.$readableStreamController, result.value);
+ if (!teeState.canceled2)
+ $readableStreamDefaultControllerEnqueue(
+ teeState.branch2.$readableStreamController,
+ shouldClone ? $structuredCloneForStream(result.value) : result.value,
+ );
+ });
+ };
+}
+
+export function readableStreamTeeBranch1CancelFunction(teeState, stream) {
+ return function (r) {
+ teeState.canceled1 = true;
+ teeState.reason1 = r;
+ if (teeState.canceled2) {
+ $readableStreamCancel(stream, [teeState.reason1, teeState.reason2]).$then(
+ teeState.cancelPromiseCapability.$resolve,
+ teeState.cancelPromiseCapability.$reject,
+ );
+ }
+ return teeState.cancelPromiseCapability.$promise;
+ };
+}
+
+export function readableStreamTeeBranch2CancelFunction(teeState, stream) {
+ return function (r) {
+ teeState.canceled2 = true;
+ teeState.reason2 = r;
+ if (teeState.canceled1) {
+ $readableStreamCancel(stream, [teeState.reason1, teeState.reason2]).$then(
+ teeState.cancelPromiseCapability.$resolve,
+ teeState.cancelPromiseCapability.$reject,
+ );
+ }
+ return teeState.cancelPromiseCapability.$promise;
+ };
+}
+
+export function isReadableStream(stream) {
+ // Spec tells to return true only if stream has a readableStreamController internal slot.
+ // However, since it is a private slot, it cannot be checked using hasOwnProperty().
+ // Therefore, readableStreamController is initialized with null value.
+ return $isObject(stream) && $getByIdDirectPrivate(stream, "readableStreamController") !== undefined;
+}
+
+export function isReadableStreamDefaultReader(reader) {
+ // Spec tells to return true only if reader has a readRequests internal slot.
+ // However, since it is a private slot, it cannot be checked using hasOwnProperty().
+ // Since readRequests is initialized with an empty array, the following test is ok.
+ return $isObject(reader) && !!$getByIdDirectPrivate(reader, "readRequests");
+}
+
+export function isReadableStreamDefaultController(controller) {
+ // Spec tells to return true only if controller has an underlyingSource internal slot.
+ // However, since it is a private slot, it cannot be checked using hasOwnProperty().
+ // underlyingSource is obtained in ReadableStream constructor: if undefined, it is set
+ // to an empty object. Therefore, following test is ok.
+ return $isObject(controller) && !!$getByIdDirectPrivate(controller, "underlyingSource");
+}
+
+export function readDirectStream(stream, sink, underlyingSource) {
+ $putByIdDirectPrivate(stream, "underlyingSource", undefined);
+ $putByIdDirectPrivate(stream, "start", undefined);
+
+ function close(stream, reason) {
+ if (reason && underlyingSource?.cancel) {
+ try {
+ var prom = underlyingSource.cancel(reason);
+ $markPromiseAsHandled(prom);
+ } catch (e) {}
+
+ underlyingSource = undefined;
+ }
+
+ if (stream) {
+ $putByIdDirectPrivate(stream, "readableStreamController", undefined);
+ $putByIdDirectPrivate(stream, "reader", undefined);
+ if (reason) {
+ $putByIdDirectPrivate(stream, "state", $streamErrored);
+ $putByIdDirectPrivate(stream, "storedError", reason);
+ } else {
+ $putByIdDirectPrivate(stream, "state", $streamClosed);
+ }
+ stream = undefined;
+ }
+ }
+
+ if (!underlyingSource.pull) {
+ close();
+ return;
+ }
+
+ if (!$isCallable(underlyingSource.pull)) {
+ close();
+ $throwTypeError("pull is not a function");
+ return;
+ }
+
+ $putByIdDirectPrivate(stream, "readableStreamController", sink);
+ const highWaterMark = $getByIdDirectPrivate(stream, "highWaterMark");
+
+ sink.start({
+ highWaterMark: !highWaterMark || highWaterMark < 64 ? 64 : highWaterMark,
+ });
+
+ $startDirectStream.$call(sink, stream, underlyingSource.pull, close);
+ $putByIdDirectPrivate(stream, "reader", {});
+
+ var maybePromise = underlyingSource.pull(sink);
+ sink = undefined;
+ if (maybePromise && $isPromise(maybePromise)) {
+ return maybePromise.$then(() => {});
+ }
+}
+
+$linkTimeConstant;
+export function assignToStream(stream, sink) {
+ // The stream is either a direct stream or a "default" JS stream
+ var underlyingSource = $getByIdDirectPrivate(stream, "underlyingSource");
+
+ // we know it's a direct stream when $underlyingSource is set
+ if (underlyingSource) {
+ try {
+ return $readDirectStream(stream, sink, underlyingSource);
+ } catch (e) {
+ throw e;
+ } finally {
+ underlyingSource = undefined;
+ stream = undefined;
+ sink = undefined;
+ }
+ }
+
+ return $readStreamIntoSink(stream, sink, true);
+}
+
+export async function readStreamIntoSink(stream, sink, isNative) {
+ var didClose = false;
+ var didThrow = false;
+ try {
+ var reader = stream.getReader();
+ var many = reader.readMany();
+ if (many && $isPromise(many)) {
+ many = await many;
+ }
+ if (many.done) {
+ didClose = true;
+ return sink.end();
+ }
+ var wroteCount = many.value.length;
+ const highWaterMark = $getByIdDirectPrivate(stream, "highWaterMark");
+ if (isNative)
+ $startDirectStream.$call(sink, stream, undefined, () => !didThrow && $markPromiseAsHandled(stream.cancel()));
+
+ sink.start({ highWaterMark: highWaterMark || 0 });
+
+ for (var i = 0, values = many.value, length = many.value.length; i < length; i++) {
+ sink.write(values[i]);
+ }
+
+ var streamState = $getByIdDirectPrivate(stream, "state");
+ if (streamState === $streamClosed) {
+ didClose = true;
+ return sink.end();
+ }
+
+ while (true) {
+ var { value, done } = await reader.read();
+ if (done) {
+ didClose = true;
+ return sink.end();
+ }
+
+ sink.write(value);
+ }
+ } catch (e) {
+ didThrow = true;
+
+ try {
+ reader = undefined;
+ const prom = stream.cancel(e);
+ $markPromiseAsHandled(prom);
+ } catch (j) {}
+
+ if (sink && !didClose) {
+ didClose = true;
+ try {
+ sink.close(e);
+ } catch (j) {
+ throw new globalThis.AggregateError([e, j]);
+ }
+ }
+
+ throw e;
+ } finally {
+ if (reader) {
+ try {
+ reader.releaseLock();
+ } catch (e) {}
+ reader = undefined;
+ }
+ sink = undefined;
+ var streamState = $getByIdDirectPrivate(stream, "state");
+ if (stream) {
+ // make it easy for this to be GC'd
+ // but don't do property transitions
+ var readableStreamController = $getByIdDirectPrivate(stream, "readableStreamController");
+ if (readableStreamController) {
+ if ($getByIdDirectPrivate(readableStreamController, "underlyingSource"))
+ $putByIdDirectPrivate(readableStreamController, "underlyingSource", undefined);
+ if ($getByIdDirectPrivate(readableStreamController, "controlledReadableStream"))
+ $putByIdDirectPrivate(readableStreamController, "controlledReadableStream", undefined);
+
+ $putByIdDirectPrivate(stream, "readableStreamController", null);
+ if ($getByIdDirectPrivate(stream, "underlyingSource"))
+ $putByIdDirectPrivate(stream, "underlyingSource", undefined);
+ readableStreamController = undefined;
+ }
+
+ if (!didThrow && streamState !== $streamClosed && streamState !== $streamErrored) {
+ $readableStreamClose(stream);
+ }
+ stream = undefined;
+ }
+ }
+}
+
+export function handleDirectStreamError(e) {
+ var controller = this;
+ var sink = controller.$sink;
+ if (sink) {
+ $putByIdDirectPrivate(controller, "sink", undefined);
+ try {
+ sink.close(e);
+ } catch (f) {}
+ }
+
+ this.error = this.flush = this.write = this.close = this.end = $onReadableStreamDirectControllerClosed;
+
+ if (typeof this.$underlyingSource.close === "function") {
+ try {
+ this.$underlyingSource.close.$call(this.$underlyingSource, e);
+ } catch (e) {}
+ }
+
+ try {
+ var pend = controller._pendingRead;
+ if (pend) {
+ controller._pendingRead = undefined;
+ $rejectPromise(pend, e);
+ }
+ } catch (f) {}
+ var stream = controller.$controlledReadableStream;
+ if (stream) $readableStreamError(stream, e);
+}
+
+export function handleDirectStreamErrorReject(e) {
+ $handleDirectStreamError.$call(this, e);
+ return Promise.$reject(e);
+}
+
+export function onPullDirectStream(controller) {
+ var stream = controller.$controlledReadableStream;
+ if (!stream || $getByIdDirectPrivate(stream, "state") !== $streamReadable) return;
+
+ // pull is in progress
+ // this is a recursive call
+ // ignore it
+ if (controller._deferClose === -1) {
+ return;
+ }
+
+ controller._deferClose = -1;
+ controller._deferFlush = -1;
+ var deferClose;
+ var deferFlush;
+
+ // Direct streams allow $pull to be called multiple times, unlike the spec.
+ // Backpressure is handled by the destination, not by the underlying source.
+ // In this case, we rely on the heuristic that repeatedly draining in the same tick
+ // is bad for performance
+ // this code is only run when consuming a direct stream from JS
+ // without the HTTP server or anything else
+ try {
+ var result = controller.$underlyingSource.pull(controller);
+
+ if (result && $isPromise(result)) {
+ if (controller._handleError === undefined) {
+ controller._handleError = $handleDirectStreamErrorReject.bind(controller);
+ }
+
+ Promise.prototype.catch.$call(result, controller._handleError);
+ }
+ } catch (e) {
+ return $handleDirectStreamErrorReject.$call(controller, e);
+ } finally {
+ deferClose = controller._deferClose;
+ deferFlush = controller._deferFlush;
+ controller._deferFlush = controller._deferClose = 0;
+ }
+
+ var promiseToReturn;
+
+ if (controller._pendingRead === undefined) {
+ controller._pendingRead = promiseToReturn = $newPromise();
+ } else {
+ promiseToReturn = $readableStreamAddReadRequest(stream);
+ }
+
+ // they called close during $pull()
+ // we delay that
+ if (deferClose === 1) {
+ var reason = controller._deferCloseReason;
+ controller._deferCloseReason = undefined;
+ $onCloseDirectStream.$call(controller, reason);
+ return promiseToReturn;
+ }
+
+ // not done, but they called flush()
+ if (deferFlush === 1) {
+ $onFlushDirectStream.$call(controller);
+ }
+
+ return promiseToReturn;
+}
+
+export function noopDoneFunction() {
+ return Promise.$resolve({ value: undefined, done: true });
+}
+
+export function onReadableStreamDirectControllerClosed(reason) {
+ $throwTypeError("ReadableStreamDirectController is now closed");
+}
+
+export function onCloseDirectStream(reason) {
+ var stream = this.$controlledReadableStream;
+ if (!stream || $getByIdDirectPrivate(stream, "state") !== $streamReadable) return;
+
+ if (this._deferClose !== 0) {
+ this._deferClose = 1;
+ this._deferCloseReason = reason;
+ return;
+ }
+
+ $putByIdDirectPrivate(stream, "state", $streamClosing);
+ if (typeof this.$underlyingSource.close === "function") {
+ try {
+ this.$underlyingSource.close.$call(this.$underlyingSource, reason);
+ } catch (e) {}
+ }
+
+ var flushed;
+ try {
+ flushed = this.$sink.end();
+ $putByIdDirectPrivate(this, "sink", undefined);
+ } catch (e) {
+ if (this._pendingRead) {
+ var read = this._pendingRead;
+ this._pendingRead = undefined;
+ $rejectPromise(read, e);
+ }
+ $readableStreamError(stream, e);
+ return;
+ }
+
+ this.error = this.flush = this.write = this.close = this.end = $onReadableStreamDirectControllerClosed;
+
+ var reader = $getByIdDirectPrivate(stream, "reader");
+
+ if (reader && $isReadableStreamDefaultReader(reader)) {
+ var _pendingRead = this._pendingRead;
+ if (_pendingRead && $isPromise(_pendingRead) && flushed?.byteLength) {
+ this._pendingRead = undefined;
+ $fulfillPromise(_pendingRead, { value: flushed, done: false });
+ $readableStreamClose(stream);
+ return;
+ }
+ }
+
+ if (flushed?.byteLength) {
+ var requests = $getByIdDirectPrivate(reader, "readRequests");
+ if (requests?.isNotEmpty()) {
+ $readableStreamFulfillReadRequest(stream, flushed, false);
+ $readableStreamClose(stream);
+ return;
+ }
+
+ $putByIdDirectPrivate(stream, "state", $streamReadable);
+ this.$pull = () => {
+ var thisResult = $createFulfilledPromise({
+ value: flushed,
+ done: false,
+ });
+ flushed = undefined;
+ $readableStreamClose(stream);
+ stream = undefined;
+ return thisResult;
+ };
+ } else if (this._pendingRead) {
+ var read = this._pendingRead;
+ this._pendingRead = undefined;
+ $putByIdDirectPrivate(this, "pull", $noopDoneFunction);
+ $fulfillPromise(read, { value: undefined, done: true });
+ }
+
+ $readableStreamClose(stream);
+}
+
+export function onFlushDirectStream() {
+ var stream = this.$controlledReadableStream;
+ var reader = $getByIdDirectPrivate(stream, "reader");
+ if (!reader || !$isReadableStreamDefaultReader(reader)) {
+ return;
+ }
+
+ var _pendingRead = this._pendingRead;
+ this._pendingRead = undefined;
+ if (_pendingRead && $isPromise(_pendingRead)) {
+ var flushed = this.$sink.flush();
+ if (flushed?.byteLength) {
+ this._pendingRead = $getByIdDirectPrivate(stream, "readRequests")?.shift();
+ $fulfillPromise(_pendingRead, { value: flushed, done: false });
+ } else {
+ this._pendingRead = _pendingRead;
+ }
+ } else if ($getByIdDirectPrivate(stream, "readRequests")?.isNotEmpty()) {
+ var flushed = this.$sink.flush();
+ if (flushed?.byteLength) {
+ $readableStreamFulfillReadRequest(stream, flushed, false);
+ }
+ } else if (this._deferFlush === -1) {
+ this._deferFlush = 1;
+ }
+}
+
+export function createTextStream(highWaterMark) {
+ var sink;
+ var array = [];
+ var hasString = false;
+ var hasBuffer = false;
+ var rope = "";
+ var estimatedLength = $toLength(0);
+ var capability = $newPromiseCapability(Promise);
+ var calledDone = false;
+
+ sink = {
+ start() {},
+ write(chunk) {
+ if (typeof chunk === "string") {
+ var chunkLength = $toLength(chunk.length);
+ if (chunkLength > 0) {
+ rope += chunk;
+ hasString = true;
+ // TODO: utf16 byte length
+ estimatedLength += chunkLength;
+ }
+
+ return chunkLength;
+ }
+
+ if (!chunk || !($ArrayBuffer.$isView(chunk) || chunk instanceof $ArrayBuffer)) {
+ $throwTypeError("Expected text, ArrayBuffer or ArrayBufferView");
+ }
+
+ const byteLength = $toLength(chunk.byteLength);
+ if (byteLength > 0) {
+ hasBuffer = true;
+ if (rope.length > 0) {
+ $arrayPush(array, rope, chunk);
+ rope = "";
+ } else {
+ $arrayPush(array, chunk);
+ }
+ }
+ estimatedLength += byteLength;
+ return byteLength;
+ },
+
+ flush() {
+ return 0;
+ },
+
+ end() {
+ if (calledDone) {
+ return "";
+ }
+ return sink.fulfill();
+ },
+
+ fulfill() {
+ calledDone = true;
+ const result = sink.finishInternal();
+
+ $fulfillPromise(capability.$promise, result);
+ return result;
+ },
+
+ finishInternal() {
+ if (!hasString && !hasBuffer) {
+ return "";
+ }
+
+ if (hasString && !hasBuffer) {
+ return rope;
+ }
+
+ if (hasBuffer && !hasString) {
+ return new globalThis.TextDecoder().decode($Bun.concatArrayBuffers(array));
+ }
+
+ // worst case: mixed content
+
+ var arrayBufferSink = new $Bun.ArrayBufferSink();
+ arrayBufferSink.start({
+ highWaterMark: estimatedLength,
+ asUint8Array: true,
+ });
+ for (let item of array) {
+ arrayBufferSink.write(item);
+ }
+ array.length = 0;
+ if (rope.length > 0) {
+ arrayBufferSink.write(rope);
+ rope = "";
+ }
+
+ // TODO: use builtin
+ return new globalThis.TextDecoder().decode(arrayBufferSink.end());
+ },
+
+ close() {
+ try {
+ if (!calledDone) {
+ calledDone = true;
+ sink.fulfill();
+ }
+ } catch (e) {}
+ },
+ };
+
+ return [sink, capability];
+}
+
+export function initializeTextStream(underlyingSource, highWaterMark) {
+ var [sink, closingPromise] = $createTextStream(highWaterMark);
+
+ var controller = {
+ $underlyingSource: underlyingSource,
+ $pull: $onPullDirectStream,
+ $controlledReadableStream: this,
+ $sink: sink,
+ close: $onCloseDirectStream,
+ write: sink.write,
+ error: $handleDirectStreamError,
+ end: $onCloseDirectStream,
+ $close: $onCloseDirectStream,
+ flush: $onFlushDirectStream,
+ _pendingRead: undefined,
+ _deferClose: 0,
+ _deferFlush: 0,
+ _deferCloseReason: undefined,
+ _handleError: undefined,
+ };
+
+ $putByIdDirectPrivate(this, "readableStreamController", controller);
+ $putByIdDirectPrivate(this, "underlyingSource", undefined);
+ $putByIdDirectPrivate(this, "start", undefined);
+ return closingPromise;
+}
+
+export function initializeArrayStream(underlyingSource, highWaterMark) {
+ var array = [];
+ var closingPromise = $newPromiseCapability(Promise);
+ var calledDone = false;
+
+ function fulfill() {
+ calledDone = true;
+ closingPromise.$resolve.$call(undefined, array);
+ return array;
+ }
+
+ var sink = {
+ start() {},
+ write(chunk) {
+ $arrayPush(array, chunk);
+ return chunk.byteLength || chunk.length;
+ },
+
+ flush() {
+ return 0;
+ },
+
+ end() {
+ if (calledDone) {
+ return [];
+ }
+ return fulfill();
+ },
+
+ close() {
+ if (!calledDone) {
+ fulfill();
+ }
+ },
+ };
+
+ var controller = {
+ $underlyingSource: underlyingSource,
+ $pull: $onPullDirectStream,
+ $controlledReadableStream: this,
+ $sink: sink,
+ close: $onCloseDirectStream,
+ write: sink.write,
+ error: $handleDirectStreamError,
+ end: $onCloseDirectStream,
+ $close: $onCloseDirectStream,
+ flush: $onFlushDirectStream,
+ _pendingRead: undefined,
+ _deferClose: 0,
+ _deferFlush: 0,
+ _deferCloseReason: undefined,
+ _handleError: undefined,
+ };
+
+ $putByIdDirectPrivate(this, "readableStreamController", controller);
+ $putByIdDirectPrivate(this, "underlyingSource", undefined);
+ $putByIdDirectPrivate(this, "start", undefined);
+ return closingPromise;
+}
+
+export function initializeArrayBufferStream(underlyingSource, highWaterMark) {
+ // This is the fallback implementation for direct streams
+ // When we don't know what the destination type is
+ // We assume it is a Uint8Array.
+
+ var opts =
+ highWaterMark && typeof highWaterMark === "number"
+ ? { highWaterMark, stream: true, asUint8Array: true }
+ : { stream: true, asUint8Array: true };
+ var sink = new $Bun.ArrayBufferSink();
+ sink.start(opts);
+
+ var controller = {
+ $underlyingSource: underlyingSource,
+ $pull: $onPullDirectStream,
+ $controlledReadableStream: this,
+ $sink: sink,
+ close: $onCloseDirectStream,
+ write: sink.write.bind(sink),
+ error: $handleDirectStreamError,
+ end: $onCloseDirectStream,
+ $close: $onCloseDirectStream,
+ flush: $onFlushDirectStream,
+ _pendingRead: undefined,
+ _deferClose: 0,
+ _deferFlush: 0,
+ _deferCloseReason: undefined,
+ _handleError: undefined,
+ };
+
+ $putByIdDirectPrivate(this, "readableStreamController", controller);
+ $putByIdDirectPrivate(this, "underlyingSource", undefined);
+ $putByIdDirectPrivate(this, "start", undefined);
+}
+
+export function readableStreamError(stream, error) {
+ $assert($isReadableStream(stream));
+ $assert($getByIdDirectPrivate(stream, "state") === $streamReadable);
+ $putByIdDirectPrivate(stream, "state", $streamErrored);
+ $putByIdDirectPrivate(stream, "storedError", error);
+
+ const reader = $getByIdDirectPrivate(stream, "reader");
+
+ if (!reader) return;
+
+ if ($isReadableStreamDefaultReader(reader)) {
+ const requests = $getByIdDirectPrivate(reader, "readRequests");
+ $putByIdDirectPrivate(reader, "readRequests", $createFIFO());
+ for (var request = requests.shift(); request; request = requests.shift()) $rejectPromise(request, error);
+ } else {
+ $assert($isReadableStreamBYOBReader(reader));
+ const requests = $getByIdDirectPrivate(reader, "readIntoRequests");
+ $putByIdDirectPrivate(reader, "readIntoRequests", $createFIFO());
+ for (var request = requests.shift(); request; request = requests.shift()) $rejectPromise(request, error);
+ }
+
+ $getByIdDirectPrivate(reader, "closedPromiseCapability").$reject.$call(undefined, error);
+ const promise = $getByIdDirectPrivate(reader, "closedPromiseCapability").$promise;
+ $markPromiseAsHandled(promise);
+}
+
+export function readableStreamDefaultControllerShouldCallPull(controller) {
+ const stream = $getByIdDirectPrivate(controller, "controlledReadableStream");
+
+ if (!$readableStreamDefaultControllerCanCloseOrEnqueue(controller)) return false;
+ if (!($getByIdDirectPrivate(controller, "started") === 1)) return false;
+ if (
+ (!$isReadableStreamLocked(stream) ||
+ !$getByIdDirectPrivate($getByIdDirectPrivate(stream, "reader"), "readRequests")?.isNotEmpty()) &&
+ $readableStreamDefaultControllerGetDesiredSize(controller) <= 0
+ )
+ return false;
+ const desiredSize = $readableStreamDefaultControllerGetDesiredSize(controller);
+ $assert(desiredSize !== null);
+ return desiredSize > 0;
+}
+
+export function readableStreamDefaultControllerCallPullIfNeeded(controller) {
+ // FIXME: use $readableStreamDefaultControllerShouldCallPull
+ const stream = $getByIdDirectPrivate(controller, "controlledReadableStream");
+
+ if (!$readableStreamDefaultControllerCanCloseOrEnqueue(controller)) return;
+ if (!($getByIdDirectPrivate(controller, "started") === 1)) return;
+ if (
+ (!$isReadableStreamLocked(stream) ||
+ !$getByIdDirectPrivate($getByIdDirectPrivate(stream, "reader"), "readRequests")?.isNotEmpty()) &&
+ $readableStreamDefaultControllerGetDesiredSize(controller) <= 0
+ )
+ return;
+
+ if ($getByIdDirectPrivate(controller, "pulling")) {
+ $putByIdDirectPrivate(controller, "pullAgain", true);
+ return;
+ }
+
+ $assert(!$getByIdDirectPrivate(controller, "pullAgain"));
+ $putByIdDirectPrivate(controller, "pulling", true);
+
+ $getByIdDirectPrivate(controller, "pullAlgorithm")
+ .$call(undefined)
+ .$then(
+ function () {
+ $putByIdDirectPrivate(controller, "pulling", false);
+ if ($getByIdDirectPrivate(controller, "pullAgain")) {
+ $putByIdDirectPrivate(controller, "pullAgain", false);
+
+ $readableStreamDefaultControllerCallPullIfNeeded(controller);
+ }
+ },
+ function (error) {
+ $readableStreamDefaultControllerError(controller, error);
+ },
+ );
+}
+
+export function isReadableStreamLocked(stream) {
+ $assert($isReadableStream(stream));
+ return !!$getByIdDirectPrivate(stream, "reader");
+}
+
+export function readableStreamDefaultControllerGetDesiredSize(controller) {
+ const stream = $getByIdDirectPrivate(controller, "controlledReadableStream");
+ const state = $getByIdDirectPrivate(stream, "state");
+
+ if (state === $streamErrored) return null;
+ if (state === $streamClosed) return 0;
+
+ return $getByIdDirectPrivate(controller, "strategy").highWaterMark - $getByIdDirectPrivate(controller, "queue").size;
+}
+
+export function readableStreamReaderGenericCancel(reader, reason) {
+ const stream = $getByIdDirectPrivate(reader, "ownerReadableStream");
+ $assert(!!stream);
+ return $readableStreamCancel(stream, reason);
+}
+
+export function readableStreamCancel(stream, reason) {
+ $putByIdDirectPrivate(stream, "disturbed", true);
+ const state = $getByIdDirectPrivate(stream, "state");
+ if (state === $streamClosed) return Promise.$resolve();
+ if (state === $streamErrored) return Promise.$reject($getByIdDirectPrivate(stream, "storedError"));
+ $readableStreamClose(stream);
+
+ var controller = $getByIdDirectPrivate(stream, "readableStreamController");
+ var cancel = controller.$cancel;
+ if (cancel) {
+ return cancel(controller, reason).$then(function () {});
+ }
+
+ var close = controller.close;
+ if (close) {
+ return Promise.$resolve(controller.close(reason));
+ }
+
+ $throwTypeError("ReadableStreamController has no cancel or close method");
+}
+
+export function readableStreamDefaultControllerCancel(controller, reason) {
+ $putByIdDirectPrivate(controller, "queue", $newQueue());
+ return $getByIdDirectPrivate(controller, "cancelAlgorithm").$call(undefined, reason);
+}
+
+export function readableStreamDefaultControllerPull(controller) {
+ var queue = $getByIdDirectPrivate(controller, "queue");
+ if (queue.content.isNotEmpty()) {
+ const chunk = $dequeueValue(queue);
+ if ($getByIdDirectPrivate(controller, "closeRequested") && queue.content.isEmpty())
+ $readableStreamClose($getByIdDirectPrivate(controller, "controlledReadableStream"));
+ else $readableStreamDefaultControllerCallPullIfNeeded(controller);
+
+ return $createFulfilledPromise({ value: chunk, done: false });
+ }
+ const pendingPromise = $readableStreamAddReadRequest($getByIdDirectPrivate(controller, "controlledReadableStream"));
+ $readableStreamDefaultControllerCallPullIfNeeded(controller);
+ return pendingPromise;
+}
+
+export function readableStreamDefaultControllerClose(controller) {
+ $assert($readableStreamDefaultControllerCanCloseOrEnqueue(controller));
+ $putByIdDirectPrivate(controller, "closeRequested", true);
+ if ($getByIdDirectPrivate(controller, "queue")?.content?.isEmpty())
+ $readableStreamClose($getByIdDirectPrivate(controller, "controlledReadableStream"));
+}
+
+export function readableStreamClose(stream) {
+ $assert($getByIdDirectPrivate(stream, "state") === $streamReadable);
+ $putByIdDirectPrivate(stream, "state", $streamClosed);
+ if (!$getByIdDirectPrivate(stream, "reader")) return;
+
+ if ($isReadableStreamDefaultReader($getByIdDirectPrivate(stream, "reader"))) {
+ const requests = $getByIdDirectPrivate($getByIdDirectPrivate(stream, "reader"), "readRequests");
+ if (requests.isNotEmpty()) {
+ $putByIdDirectPrivate($getByIdDirectPrivate(stream, "reader"), "readRequests", $createFIFO());
+
+ for (var request = requests.shift(); request; request = requests.shift())
+ $fulfillPromise(request, { value: undefined, done: true });
+ }
+ }
+
+ $getByIdDirectPrivate($getByIdDirectPrivate(stream, "reader"), "closedPromiseCapability").$resolve.$call();
+}
+
+export function readableStreamFulfillReadRequest(stream, chunk, done) {
+ const readRequest = $getByIdDirectPrivate($getByIdDirectPrivate(stream, "reader"), "readRequests").shift();
+ $fulfillPromise(readRequest, { value: chunk, done: done });
+}
+
+export function readableStreamDefaultControllerEnqueue(controller, chunk) {
+ const stream = $getByIdDirectPrivate(controller, "controlledReadableStream");
+ // this is checked by callers
+ $assert($readableStreamDefaultControllerCanCloseOrEnqueue(controller));
+
+ if (
+ $isReadableStreamLocked(stream) &&
+ $getByIdDirectPrivate($getByIdDirectPrivate(stream, "reader"), "readRequests")?.isNotEmpty()
+ ) {
+ $readableStreamFulfillReadRequest(stream, chunk, false);
+ $readableStreamDefaultControllerCallPullIfNeeded(controller);
+ return;
+ }
+
+ try {
+ let chunkSize = 1;
+ if ($getByIdDirectPrivate(controller, "strategy").size !== undefined)
+ chunkSize = $getByIdDirectPrivate(controller, "strategy").size(chunk);
+ $enqueueValueWithSize($getByIdDirectPrivate(controller, "queue"), chunk, chunkSize);
+ } catch (error) {
+ $readableStreamDefaultControllerError(controller, error);
+ throw error;
+ }
+ $readableStreamDefaultControllerCallPullIfNeeded(controller);
+}
+
+export function readableStreamDefaultReaderRead(reader) {
+ const stream = $getByIdDirectPrivate(reader, "ownerReadableStream");
+ $assert(!!stream);
+ const state = $getByIdDirectPrivate(stream, "state");
+
+ $putByIdDirectPrivate(stream, "disturbed", true);
+ if (state === $streamClosed) return $createFulfilledPromise({ value: undefined, done: true });
+ if (state === $streamErrored) return Promise.$reject($getByIdDirectPrivate(stream, "storedError"));
+ $assert(state === $streamReadable);
+
+ return $getByIdDirectPrivate(stream, "readableStreamController").$pull(
+ $getByIdDirectPrivate(stream, "readableStreamController"),
+ );
+}
+
+export function readableStreamAddReadRequest(stream) {
+ $assert($isReadableStreamDefaultReader($getByIdDirectPrivate(stream, "reader")));
+ $assert($getByIdDirectPrivate(stream, "state") == $streamReadable);
+
+ const readRequest = $newPromise();
+
+ $getByIdDirectPrivate($getByIdDirectPrivate(stream, "reader"), "readRequests").push(readRequest);
+
+ return readRequest;
+}
+
+export function isReadableStreamDisturbed(stream) {
+ $assert($isReadableStream(stream));
+ return $getByIdDirectPrivate(stream, "disturbed");
+}
+
+export function readableStreamReaderGenericRelease(reader) {
+ $assert(!!$getByIdDirectPrivate(reader, "ownerReadableStream"));
+ $assert($getByIdDirectPrivate($getByIdDirectPrivate(reader, "ownerReadableStream"), "reader") === reader);
+
+ if ($getByIdDirectPrivate($getByIdDirectPrivate(reader, "ownerReadableStream"), "state") === $streamReadable)
+ $getByIdDirectPrivate(reader, "closedPromiseCapability").$reject.$call(
+ undefined,
+ $makeTypeError("releasing lock of reader whose stream is still in readable state"),
+ );
+ else
+ $putByIdDirectPrivate(reader, "closedPromiseCapability", {
+ $promise: $newHandledRejectedPromise($makeTypeError("reader released lock")),
+ });
+
+ const promise = $getByIdDirectPrivate(reader, "closedPromiseCapability").$promise;
+ $markPromiseAsHandled(promise);
+ $putByIdDirectPrivate($getByIdDirectPrivate(reader, "ownerReadableStream"), "reader", undefined);
+ $putByIdDirectPrivate(reader, "ownerReadableStream", undefined);
+}
+
+export function readableStreamDefaultControllerCanCloseOrEnqueue(controller) {
+ return (
+ !$getByIdDirectPrivate(controller, "closeRequested") &&
+ $getByIdDirectPrivate($getByIdDirectPrivate(controller, "controlledReadableStream"), "state") === $streamReadable
+ );
+}
+
+export function lazyLoadStream(stream, autoAllocateChunkSize) {
+ var nativeType = $getByIdDirectPrivate(stream, "bunNativeType");
+ var nativePtr = $getByIdDirectPrivate(stream, "bunNativePtr");
+ var Prototype = $lazyStreamPrototypeMap.$get(nativeType);
+ if (Prototype === undefined) {
+ var [pull, start, cancel, setClose, deinit, setRefOrUnref, drain] = $lazyLoad(nativeType);
+ var closer = [false];
+ var handleResult;
+ function handleNativeReadableStreamPromiseResult(val) {
+ var { c, v } = this;
+ this.c = undefined;
+ this.v = undefined;
+ handleResult(val, c, v);
+ }
+
+ function callClose(controller) {
+ try {
+ controller.close();
+ } catch (e) {
+ globalThis.reportError(e);
+ }
+ }
+
+ handleResult = function handleResult(result, controller, view) {
+ if (result && $isPromise(result)) {
+ return result.then(
+ handleNativeReadableStreamPromiseResult.bind({
+ c: controller,
+ v: view,
+ }),
+ err => controller.error(err),
+ );
+ } else if (typeof result === "number") {
+ if (view && view.byteLength === result && view.buffer === controller.byobRequest?.view?.buffer) {
+ controller.byobRequest.respondWithNewView(view);
+ } else {
+ controller.byobRequest.respond(result);
+ }
+ } else if (result.constructor === $Uint8Array) {
+ controller.enqueue(result);
+ }
+
+ if (closer[0] || result === false) {
+ $enqueueJob(callClose, controller);
+ closer[0] = false;
+ }
+ };
+
+ function createResult(tag, controller, view, closer) {
+ closer[0] = false;
+
+ var result;
+ try {
+ result = pull(tag, view, closer);
+ } catch (err) {
+ return controller.error(err);
+ }
+
+ return handleResult(result, controller, view);
+ }
+
+ const registry = deinit ? new FinalizationRegistry(deinit) : null;
+ Prototype = class NativeReadableStreamSource {
+ constructor(tag, autoAllocateChunkSize, drainValue) {
+ this.#tag = tag;
+ this.#cancellationToken = {};
+ this.pull = this.#pull.bind(this);
+ this.cancel = this.#cancel.bind(this);
+ this.autoAllocateChunkSize = autoAllocateChunkSize;
+
+ if (drainValue !== undefined) {
+ this.start = controller => {
+ controller.enqueue(drainValue);
+ };
+ }
+
+ if (registry) {
+ registry.register(this, tag, this.#cancellationToken);
+ }
+ }
+
+ #cancellationToken;
+ pull;
+ cancel;
+ start;
+
+ #tag;
+ type = "bytes";
+ autoAllocateChunkSize = 0;
+
+ static startSync = start;
+
+ #pull(controller) {
+ var tag = this.#tag;
+
+ if (!tag) {
+ controller.close();
+ return;
+ }
+
+ createResult(tag, controller, controller.byobRequest.view, closer);
+ }
+
+ #cancel(reason) {
+ var tag = this.#tag;
+
+ registry && registry.unregister(this.#cancellationToken);
+ setRefOrUnref && setRefOrUnref(tag, false);
+ cancel(tag, reason);
+ }
+ static deinit = deinit;
+ static drain = drain;
+ };
+ $lazyStreamPrototypeMap.$set(nativeType, Prototype);
+ }
+
+ const chunkSize = Prototype.startSync(nativePtr, autoAllocateChunkSize);
+ var drainValue;
+ const { drain: drainFn, deinit: deinitFn } = Prototype;
+ if (drainFn) {
+ drainValue = drainFn(nativePtr);
+ }
+
+ // empty file, no need for native back-and-forth on this
+ if (chunkSize === 0) {
+ deinit && nativePtr && $enqueueJob(deinit, nativePtr);
+
+ if ((drainValue?.byteLength ?? 0) > 0) {
+ return {
+ start(controller) {
+ controller.enqueue(drainValue);
+ controller.close();
+ },
+ type: "bytes",
+ };
+ }
+
+ return {
+ start(controller) {
+ controller.close();
+ },
+ type: "bytes",
+ };
+ }
+
+ return new Prototype(nativePtr, chunkSize, drainValue);
+}
+
+export function readableStreamIntoArray(stream) {
+ var reader = stream.getReader();
+ var manyResult = reader.readMany();
+
+ async function processManyResult(result) {
+ if (result.done) {
+ return [];
+ }
+
+ var chunks = result.value || [];
+
+ while (true) {
+ var thisResult = await reader.read();
+ if (thisResult.done) {
+ break;
+ }
+ chunks = chunks.concat(thisResult.value);
+ }
+
+ return chunks;
+ }
+
+ if (manyResult && $isPromise(manyResult)) {
+ return manyResult.$then(processManyResult);
+ }
+
+ return processManyResult(manyResult);
+}
+
+export function readableStreamIntoText(stream) {
+ const [textStream, closer] = $createTextStream($getByIdDirectPrivate(stream, "highWaterMark"));
+ const prom = $readStreamIntoSink(stream, textStream, false);
+ if (prom && $isPromise(prom)) {
+ return Promise.$resolve(prom).$then(closer.$promise);
+ }
+ return closer.$promise;
+}
+
+export function readableStreamToArrayBufferDirect(stream, underlyingSource) {
+ var sink = new $Bun.ArrayBufferSink();
+ $putByIdDirectPrivate(stream, "underlyingSource", undefined);
+ var highWaterMark = $getByIdDirectPrivate(stream, "highWaterMark");
+ sink.start(highWaterMark ? { highWaterMark } : {});
+ var capability = $newPromiseCapability(Promise);
+ var ended = false;
+ var pull = underlyingSource.pull;
+ var close = underlyingSource.close;
+
+ var controller = {
+ start() {},
+ close(reason) {
+ if (!ended) {
+ ended = true;
+ if (close) {
+ close();
+ }
+
+ $fulfillPromise(capability.$promise, sink.end());
+ }
+ },
+ end() {
+ if (!ended) {
+ ended = true;
+ if (close) {
+ close();
+ }
+ $fulfillPromise(capability.$promise, sink.end());
+ }
+ },
+ flush() {
+ return 0;
+ },
+ write: sink.write.bind(sink),
+ };
+
+ var didError = false;
+ try {
+ const firstPull = pull(controller);
+ if (firstPull && $isObject(firstPull) && $isPromise(firstPull)) {
+ return (async function (controller, promise, pull) {
+ while (!ended) {
+ await pull(controller);
+ }
+ return await promise;
+ })(controller, promise, pull);
+ }
+
+ return capability.$promise;
+ } catch (e) {
+ didError = true;
+ $readableStreamError(stream, e);
+ return Promise.$reject(e);
+ } finally {
+ if (!didError && stream) $readableStreamClose(stream);
+ controller = close = sink = pull = stream = undefined;
+ }
+}
+
+export async function readableStreamToTextDirect(stream, underlyingSource) {
+ const capability = $initializeTextStream.$call(stream, underlyingSource, undefined);
+ var reader = stream.getReader();
+
+ while ($getByIdDirectPrivate(stream, "state") === $streamReadable) {
+ var thisResult = await reader.read();
+ if (thisResult.done) {
+ break;
+ }
+ }
+
+ try {
+ reader.releaseLock();
+ } catch (e) {}
+ reader = undefined;
+ stream = undefined;
+
+ return capability.$promise;
+}
+
+export async function readableStreamToArrayDirect(stream, underlyingSource) {
+ const capability = $initializeArrayStream.$call(stream, underlyingSource, undefined);
+ underlyingSource = undefined;
+ var reader = stream.getReader();
+ try {
+ while ($getByIdDirectPrivate(stream, "state") === $streamReadable) {
+ var thisResult = await reader.read();
+ if (thisResult.done) {
+ break;
+ }
+ }
+
+ try {
+ reader.releaseLock();
+ } catch (e) {}
+ reader = undefined;
+
+ return Promise.$resolve(capability.$promise);
+ } catch (e) {
+ throw e;
+ } finally {
+ stream = undefined;
+ reader = undefined;
+ }
+
+ return capability.$promise;
+}
+
+export function readableStreamDefineLazyIterators(prototype) {
+ var asyncIterator = globalThis.Symbol.asyncIterator;
+
+ var ReadableStreamAsyncIterator = async function* ReadableStreamAsyncIterator(stream, preventCancel) {
+ var reader = stream.getReader();
+ var deferredError;
+ try {
+ while (true) {
+ var done, value;
+ const firstResult = reader.readMany();
+ if ($isPromise(firstResult)) {
+ ({ done, value } = await firstResult);
+ } else {
+ ({ done, value } = firstResult);
+ }
+
+ if (done) {
+ return;
+ }
+ yield* value;
+ }
+ } catch (e) {
+ deferredError = e;
+ } finally {
+ reader.releaseLock();
+
+ if (!preventCancel) {
+ stream.cancel(deferredError);
+ }
+
+ if (deferredError) {
+ throw deferredError;
+ }
+ }
+ };
+ var createAsyncIterator = function asyncIterator() {
+ return ReadableStreamAsyncIterator(this, false);
+ };
+ var createValues = function values({ preventCancel = false } = { preventCancel: false }) {
+ return ReadableStreamAsyncIterator(this, preventCancel);
+ };
+ $Object.$defineProperty(prototype, asyncIterator, { value: createAsyncIterator });
+ $Object.$defineProperty(prototype, "values", { value: createValues });
+ return prototype;
+}
diff --git a/src/bun.js/builtins/ts/StreamInternals.ts b/src/bun.js/builtins/ts/StreamInternals.ts
new file mode 100644
index 000000000..b42dc2f57
--- /dev/null
+++ b/src/bun.js/builtins/ts/StreamInternals.ts
@@ -0,0 +1,268 @@
+/*
+ * Copyright (C) 2015 Canon Inc.
+ * Copyright (C) 2015 Igalia.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ * 1. Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY APPLE INC. ``AS IS'' AND ANY
+ * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+ * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL APPLE INC. OR
+ * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
+ * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
+ * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
+ * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY
+ * OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+// @internal
+
+export function markPromiseAsHandled(promise: Promise<unknown>) {
+ $assert($isPromise(promise));
+ $putPromiseInternalField(
+ promise,
+ $promiseFieldFlags,
+ $getPromiseInternalField(promise, $promiseFieldFlags) | $promiseFlagsIsHandled,
+ );
+}
+
+export function shieldingPromiseResolve(result) {
+ const promise = Promise.$resolve(result);
+ if (promise.$then === undefined) promise.$then = Promise.prototype.$then;
+ return promise;
+}
+
+export function promiseInvokeOrNoopMethodNoCatch(object, method, args) {
+ if (method === undefined) return Promise.$resolve();
+ return $shieldingPromiseResolve(method.$apply(object, args));
+}
+
+export function promiseInvokeOrNoopNoCatch(object, key, args) {
+ return $promiseInvokeOrNoopMethodNoCatch(object, object[key], args);
+}
+
+export function promiseInvokeOrNoopMethod(object, method, args) {
+ try {
+ return $promiseInvokeOrNoopMethodNoCatch(object, method, args);
+ } catch (error) {
+ return Promise.$reject(error);
+ }
+}
+
+export function promiseInvokeOrNoop(object, key, args) {
+ try {
+ return $promiseInvokeOrNoopNoCatch(object, key, args);
+ } catch (error) {
+ return Promise.$reject(error);
+ }
+}
+
+export function promiseInvokeOrFallbackOrNoop(object, key1, args1, key2, args2) {
+ try {
+ const method = object[key1];
+ if (method === undefined) return $promiseInvokeOrNoopNoCatch(object, key2, args2);
+ return $shieldingPromiseResolve(method.$apply(object, args1));
+ } catch (error) {
+ return Promise.$reject(error);
+ }
+}
+
+export function validateAndNormalizeQueuingStrategy(size, highWaterMark) {
+ if (size !== undefined && typeof size !== "function") throw new TypeError("size parameter must be a function");
+
+ const newHighWaterMark = $toNumber(highWaterMark);
+
+ if (isNaN(newHighWaterMark) || newHighWaterMark < 0)
+ throw new RangeError("highWaterMark value is negative or not a number");
+
+ return { size: size, highWaterMark: newHighWaterMark };
+}
+
+$linkTimeConstant;
+export function createFIFO() {
+ var slice = Array.prototype.slice;
+
+ class Denqueue {
+ constructor() {
+ this._head = 0;
+ this._tail = 0;
+ // this._capacity = 0;
+ this._capacityMask = 0x3;
+ this._list = $newArrayWithSize(4);
+ }
+
+ _head;
+ _tail;
+ _capacityMask;
+ _list;
+
+ size() {
+ if (this._head === this._tail) return 0;
+ if (this._head < this._tail) return this._tail - this._head;
+ else return this._capacityMask + 1 - (this._head - this._tail);
+ }
+
+ isEmpty() {
+ return this.size() == 0;
+ }
+
+ isNotEmpty() {
+ return this.size() > 0;
+ }
+
+ shift() {
+ var { _head: head, _tail, _list, _capacityMask } = this;
+ if (head === _tail) return undefined;
+ var item = _list[head];
+ $putByValDirect(_list, head, undefined);
+ head = this._head = (head + 1) & _capacityMask;
+ if (head < 2 && _tail > 10000 && _tail <= _list.length >>> 2) this._shrinkArray();
+ return item;
+ }
+
+ peek() {
+ if (this._head === this._tail) return undefined;
+ return this._list[this._head];
+ }
+
+ push(item) {
+ var tail = this._tail;
+ $putByValDirect(this._list, tail, item);
+ this._tail = (tail + 1) & this._capacityMask;
+ if (this._tail === this._head) {
+ this._growArray();
+ }
+ // if (this._capacity && this.size() > this._capacity) {
+ // this.shift();
+ // }
+ }
+
+ toArray(fullCopy) {
+ var list = this._list;
+ var len = $toLength(list.length);
+
+ if (fullCopy || this._head > this._tail) {
+ var _head = $toLength(this._head);
+ var _tail = $toLength(this._tail);
+ var total = $toLength(len - _head + _tail);
+ var array = $newArrayWithSize(total);
+ var j = 0;
+ for (var i = _head; i < len; i++) $putByValDirect(array, j++, list[i]);
+ for (var i = 0; i < _tail; i++) $putByValDirect(array, j++, list[i]);
+ return array;
+ } else {
+ return slice.$call(list, this._head, this._tail);
+ }
+ }
+
+ clear() {
+ this._head = 0;
+ this._tail = 0;
+ this._list.fill(undefined);
+ }
+
+ _growArray() {
+ if (this._head) {
+ // copy existing data, head to end, then beginning to tail.
+ this._list = this.toArray(true);
+ this._head = 0;
+ }
+
+ // head is at 0 and array is now full, safe to extend
+ this._tail = $toLength(this._list.length);
+
+ this._list.length <<= 1;
+ this._capacityMask = (this._capacityMask << 1) | 1;
+ }
+
+ shrinkArray() {
+ this._list.length >>>= 1;
+ this._capacityMask >>>= 1;
+ }
+ }
+
+ return new Denqueue();
+}
+
+export function newQueue() {
+ return { content: $createFIFO(), size: 0 };
+}
+
+export function dequeueValue(queue) {
+ const record = queue.content.shift();
+ queue.size -= record.size;
+ // As described by spec, below case may occur due to rounding errors.
+ if (queue.size < 0) queue.size = 0;
+ return record.value;
+}
+
+export function enqueueValueWithSize(queue, value, size) {
+ size = $toNumber(size);
+ if (!isFinite(size) || size < 0) throw new RangeError("size has an incorrect value");
+
+ queue.content.push({ value, size });
+ queue.size += size;
+}
+
+export function peekQueueValue(queue) {
+ return queue.content.peek()?.value;
+}
+
+export function resetQueue(queue) {
+ $assert("content" in queue);
+ $assert("size" in queue);
+ queue.content.clear();
+ queue.size = 0;
+}
+
+export function extractSizeAlgorithm(strategy) {
+ const sizeAlgorithm = strategy.size;
+
+ if (sizeAlgorithm === undefined) return () => 1;
+
+ if (typeof sizeAlgorithm !== "function") throw new TypeError("strategy.size must be a function");
+
+ return chunk => {
+ return sizeAlgorithm(chunk);
+ };
+}
+
+export function extractHighWaterMark(strategy, defaultHWM) {
+ const highWaterMark = strategy.highWaterMark;
+
+ if (highWaterMark === undefined) return defaultHWM;
+
+ if (isNaN(highWaterMark) || highWaterMark < 0)
+ throw new RangeError("highWaterMark value is negative or not a number");
+
+ return $toNumber(highWaterMark);
+}
+
+export function extractHighWaterMarkFromQueuingStrategyInit(init: { highWaterMark?: number }) {
+ if (!$isObject(init)) throw new TypeError("QueuingStrategyInit argument must be an object.");
+ const { highWaterMark } = init;
+ if (highWaterMark === undefined) throw new TypeError("QueuingStrategyInit.highWaterMark member is required.");
+
+ return $toNumber(highWaterMark);
+}
+
+export function createFulfilledPromise(value) {
+ const promise = $newPromise();
+ $fulfillPromise(promise, value);
+ return promise;
+}
+
+export function toDictionary(value, defaultValue, errorMessage) {
+ if (value === undefined || value === null) return defaultValue;
+ if (!$isObject(value)) throw new TypeError(errorMessage);
+ return value;
+}
diff --git a/src/bun.js/builtins/ts/TransformStream.ts b/src/bun.js/builtins/ts/TransformStream.ts
new file mode 100644
index 000000000..54467db39
--- /dev/null
+++ b/src/bun.js/builtins/ts/TransformStream.ts
@@ -0,0 +1,106 @@
+/*
+ * Copyright (C) 2020 Apple Inc. All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ * 1. Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY APPLE INC. AND ITS CONTRIBUTORS ``AS IS''
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
+ * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+ * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL APPLE INC. OR ITS CONTRIBUTORS
+ * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
+ * THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+export function initializeTransformStream(this) {
+ let transformer = arguments[0];
+
+ // This is the path for CreateTransformStream.
+ if ($isObject(transformer) && $getByIdDirectPrivate(transformer, "TransformStream")) return this;
+
+ let writableStrategy = arguments[1];
+ let readableStrategy = arguments[2];
+
+ if (transformer === undefined) transformer = null;
+
+ if (readableStrategy === undefined) readableStrategy = {};
+
+ if (writableStrategy === undefined) writableStrategy = {};
+
+ let transformerDict = {};
+ if (transformer !== null) {
+ if ("start" in transformer) {
+ transformerDict["start"] = transformer["start"];
+ if (typeof transformerDict["start"] !== "function") $throwTypeError("transformer.start should be a function");
+ }
+ if ("transform" in transformer) {
+ transformerDict["transform"] = transformer["transform"];
+ if (typeof transformerDict["transform"] !== "function")
+ $throwTypeError("transformer.transform should be a function");
+ }
+ if ("flush" in transformer) {
+ transformerDict["flush"] = transformer["flush"];
+ if (typeof transformerDict["flush"] !== "function") $throwTypeError("transformer.flush should be a function");
+ }
+
+ if ("readableType" in transformer) throw new RangeError("TransformStream transformer has a readableType");
+ if ("writableType" in transformer) throw new RangeError("TransformStream transformer has a writableType");
+ }
+
+ const readableHighWaterMark = $extractHighWaterMark(readableStrategy, 0);
+ const readableSizeAlgorithm = $extractSizeAlgorithm(readableStrategy);
+
+ const writableHighWaterMark = $extractHighWaterMark(writableStrategy, 1);
+ const writableSizeAlgorithm = $extractSizeAlgorithm(writableStrategy);
+
+ const startPromiseCapability = $newPromiseCapability(Promise);
+ $initializeTransformStream(
+ this,
+ startPromiseCapability.$promise,
+ writableHighWaterMark,
+ writableSizeAlgorithm,
+ readableHighWaterMark,
+ readableSizeAlgorithm,
+ );
+ $setUpTransformStreamDefaultControllerFromTransformer(this, transformer, transformerDict);
+
+ if ("start" in transformerDict) {
+ const controller = $getByIdDirectPrivate(this, "controller");
+ const startAlgorithm = () => $promiseInvokeOrNoopMethodNoCatch(transformer, transformerDict["start"], [controller]);
+ startAlgorithm().$then(
+ () => {
+ // FIXME: We probably need to resolve start promise with the result of the start algorithm.
+ startPromiseCapability.$resolve.$call();
+ },
+ error => {
+ startPromiseCapability.$reject.$call(undefined, error);
+ },
+ );
+ } else startPromiseCapability.$resolve.$call();
+
+ return this;
+}
+
+$getter;
+export function readable() {
+ if (!$isTransformStream(this)) throw $makeThisTypeError("TransformStream", "readable");
+
+ return $getByIdDirectPrivate(this, "readable");
+}
+
+export function writable() {
+ if (!$isTransformStream(this)) throw $makeThisTypeError("TransformStream", "writable");
+
+ return $getByIdDirectPrivate(this, "writable");
+}
diff --git a/src/bun.js/builtins/ts/TransformStreamDefaultController.ts b/src/bun.js/builtins/ts/TransformStreamDefaultController.ts
new file mode 100644
index 000000000..1045498b8
--- /dev/null
+++ b/src/bun.js/builtins/ts/TransformStreamDefaultController.ts
@@ -0,0 +1,60 @@
+/*
+ * Copyright (C) 2020 Apple Inc. All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ * 1. Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY APPLE INC. AND ITS CONTRIBUTORS ``AS IS''
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
+ * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+ * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL APPLE INC. OR ITS CONTRIBUTORS
+ * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
+ * THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+export function initializeTransformStreamDefaultController(this) {
+ return this;
+}
+
+$getter;
+export function desiredSize(this) {
+ if (!$isTransformStreamDefaultController(this))
+ throw $makeThisTypeError("TransformStreamDefaultController", "enqueue");
+
+ const stream = $getByIdDirectPrivate(this, "stream");
+ const readable = $getByIdDirectPrivate(stream, "readable");
+ const readableController = $getByIdDirectPrivate(readable, "readableStreamController");
+
+ return $readableStreamDefaultControllerGetDesiredSize(readableController);
+}
+
+export function enqueue(this, chunk) {
+ if (!$isTransformStreamDefaultController(this))
+ throw $makeThisTypeError("TransformStreamDefaultController", "enqueue");
+
+ $transformStreamDefaultControllerEnqueue(this, chunk);
+}
+
+export function error(this, e) {
+ if (!$isTransformStreamDefaultController(this)) throw $makeThisTypeError("TransformStreamDefaultController", "error");
+
+ $transformStreamDefaultControllerError(this, e);
+}
+
+export function terminate(this) {
+ if (!$isTransformStreamDefaultController(this))
+ throw $makeThisTypeError("TransformStreamDefaultController", "terminate");
+
+ $transformStreamDefaultControllerTerminate(this);
+}
diff --git a/src/bun.js/builtins/ts/TransformStreamInternals.ts b/src/bun.js/builtins/ts/TransformStreamInternals.ts
new file mode 100644
index 000000000..9994d1282
--- /dev/null
+++ b/src/bun.js/builtins/ts/TransformStreamInternals.ts
@@ -0,0 +1,348 @@
+/*
+ * Copyright (C) 2020 Apple Inc. All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ * 1. Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY APPLE INC. AND ITS CONTRIBUTORS ``AS IS''
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
+ * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+ * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL APPLE INC. OR ITS CONTRIBUTORS
+ * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
+ * THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+// @internal
+
+export function isTransformStream(stream) {
+ return $isObject(stream) && !!$getByIdDirectPrivate(stream, "readable");
+}
+
+export function isTransformStreamDefaultController(controller) {
+ return $isObject(controller) && !!$getByIdDirectPrivate(controller, "transformAlgorithm");
+}
+
+export function createTransformStream(
+ startAlgorithm,
+ transformAlgorithm,
+ flushAlgorithm,
+ writableHighWaterMark,
+ writableSizeAlgorithm,
+ readableHighWaterMark,
+ readableSizeAlgorithm,
+) {
+ if (writableHighWaterMark === undefined) writableHighWaterMark = 1;
+ if (writableSizeAlgorithm === undefined) writableSizeAlgorithm = () => 1;
+ if (readableHighWaterMark === undefined) readableHighWaterMark = 0;
+ if (readableSizeAlgorithm === undefined) readableSizeAlgorithm = () => 1;
+ $assert(writableHighWaterMark >= 0);
+ $assert(readableHighWaterMark >= 0);
+
+ const transform = {};
+ $putByIdDirectPrivate(transform, "TransformStream", true);
+
+ const stream = new TransformStream(transform);
+ const startPromiseCapability = $newPromiseCapability(Promise);
+ $initializeTransformStream(
+ stream,
+ startPromiseCapability.$promise,
+ writableHighWaterMark,
+ writableSizeAlgorithm,
+ readableHighWaterMark,
+ readableSizeAlgorithm,
+ );
+
+ const controller = new TransformStreamDefaultController();
+ $setUpTransformStreamDefaultController(stream, controller, transformAlgorithm, flushAlgorithm);
+
+ startAlgorithm().$then(
+ () => {
+ startPromiseCapability.$resolve.$call();
+ },
+ error => {
+ startPromiseCapability.$reject.$call(undefined, error);
+ },
+ );
+
+ return stream;
+}
+
+export function initializeTransformStream(
+ stream,
+ startPromise,
+ writableHighWaterMark,
+ writableSizeAlgorithm,
+ readableHighWaterMark,
+ readableSizeAlgorithm,
+) {
+ const startAlgorithm = () => {
+ return startPromise;
+ };
+ const writeAlgorithm = chunk => {
+ return $transformStreamDefaultSinkWriteAlgorithm(stream, chunk);
+ };
+ const abortAlgorithm = reason => {
+ return $transformStreamDefaultSinkAbortAlgorithm(stream, reason);
+ };
+ const closeAlgorithm = () => {
+ return $transformStreamDefaultSinkCloseAlgorithm(stream);
+ };
+ const writable = $createWritableStream(
+ startAlgorithm,
+ writeAlgorithm,
+ closeAlgorithm,
+ abortAlgorithm,
+ writableHighWaterMark,
+ writableSizeAlgorithm,
+ );
+
+ const pullAlgorithm = () => {
+ return $transformStreamDefaultSourcePullAlgorithm(stream);
+ };
+ const cancelAlgorithm = reason => {
+ $transformStreamErrorWritableAndUnblockWrite(stream, reason);
+ return Promise.$resolve();
+ };
+ const underlyingSource = {};
+ $putByIdDirectPrivate(underlyingSource, "start", startAlgorithm);
+ $putByIdDirectPrivate(underlyingSource, "pull", pullAlgorithm);
+ $putByIdDirectPrivate(underlyingSource, "cancel", cancelAlgorithm);
+ const options = {};
+ $putByIdDirectPrivate(options, "size", readableSizeAlgorithm);
+ $putByIdDirectPrivate(options, "highWaterMark", readableHighWaterMark);
+ const readable = new ReadableStream(underlyingSource, options);
+
+ // The writable to expose to JS through writable getter.
+ $putByIdDirectPrivate(stream, "writable", writable);
+ // The writable to use for the actual transform algorithms.
+ $putByIdDirectPrivate(stream, "internalWritable", $getInternalWritableStream(writable));
+
+ $putByIdDirectPrivate(stream, "readable", readable);
+ $putByIdDirectPrivate(stream, "backpressure", undefined);
+ $putByIdDirectPrivate(stream, "backpressureChangePromise", undefined);
+
+ $transformStreamSetBackpressure(stream, true);
+ $putByIdDirectPrivate(stream, "controller", undefined);
+}
+
+export function transformStreamError(stream, e) {
+ const readable = $getByIdDirectPrivate(stream, "readable");
+ const readableController = $getByIdDirectPrivate(readable, "readableStreamController");
+ $readableStreamDefaultControllerError(readableController, e);
+
+ $transformStreamErrorWritableAndUnblockWrite(stream, e);
+}
+
+export function transformStreamErrorWritableAndUnblockWrite(stream, e) {
+ $transformStreamDefaultControllerClearAlgorithms($getByIdDirectPrivate(stream, "controller"));
+
+ const writable = $getByIdDirectPrivate(stream, "internalWritable");
+ $writableStreamDefaultControllerErrorIfNeeded($getByIdDirectPrivate(writable, "controller"), e);
+
+ if ($getByIdDirectPrivate(stream, "backpressure")) $transformStreamSetBackpressure(stream, false);
+}
+
+export function transformStreamSetBackpressure(stream, backpressure) {
+ $assert($getByIdDirectPrivate(stream, "backpressure") !== backpressure);
+
+ const backpressureChangePromise = $getByIdDirectPrivate(stream, "backpressureChangePromise");
+ if (backpressureChangePromise !== undefined) backpressureChangePromise.$resolve.$call();
+
+ $putByIdDirectPrivate(stream, "backpressureChangePromise", $newPromiseCapability(Promise));
+ $putByIdDirectPrivate(stream, "backpressure", backpressure);
+}
+
+export function setUpTransformStreamDefaultController(stream, controller, transformAlgorithm, flushAlgorithm) {
+ $assert($isTransformStream(stream));
+ $assert($getByIdDirectPrivate(stream, "controller") === undefined);
+
+ $putByIdDirectPrivate(controller, "stream", stream);
+ $putByIdDirectPrivate(stream, "controller", controller);
+ $putByIdDirectPrivate(controller, "transformAlgorithm", transformAlgorithm);
+ $putByIdDirectPrivate(controller, "flushAlgorithm", flushAlgorithm);
+}
+
+export function setUpTransformStreamDefaultControllerFromTransformer(stream, transformer, transformerDict) {
+ const controller = new TransformStreamDefaultController();
+ let transformAlgorithm = chunk => {
+ try {
+ $transformStreamDefaultControllerEnqueue(controller, chunk);
+ } catch (e) {
+ return Promise.$reject(e);
+ }
+ return Promise.$resolve();
+ };
+ let flushAlgorithm = () => {
+ return Promise.$resolve();
+ };
+
+ if ("transform" in transformerDict)
+ transformAlgorithm = chunk => {
+ return $promiseInvokeOrNoopMethod(transformer, transformerDict["transform"], [chunk, controller]);
+ };
+
+ if ("flush" in transformerDict) {
+ flushAlgorithm = () => {
+ return $promiseInvokeOrNoopMethod(transformer, transformerDict["flush"], [controller]);
+ };
+ }
+
+ $setUpTransformStreamDefaultController(stream, controller, transformAlgorithm, flushAlgorithm);
+}
+
+export function transformStreamDefaultControllerClearAlgorithms(controller) {
+ // We set transformAlgorithm to true to allow GC but keep the isTransformStreamDefaultController check.
+ $putByIdDirectPrivate(controller, "transformAlgorithm", true);
+ $putByIdDirectPrivate(controller, "flushAlgorithm", undefined);
+}
+
+export function transformStreamDefaultControllerEnqueue(controller, chunk) {
+ const stream = $getByIdDirectPrivate(controller, "stream");
+ const readable = $getByIdDirectPrivate(stream, "readable");
+ const readableController = $getByIdDirectPrivate(readable, "readableStreamController");
+
+ $assert(readableController !== undefined);
+ if (!$readableStreamDefaultControllerCanCloseOrEnqueue(readableController))
+ $throwTypeError("TransformStream.readable cannot close or enqueue");
+
+ try {
+ $readableStreamDefaultControllerEnqueue(readableController, chunk);
+ } catch (e) {
+ $transformStreamErrorWritableAndUnblockWrite(stream, e);
+ throw $getByIdDirectPrivate(readable, "storedError");
+ }
+
+ const backpressure = !$readableStreamDefaultControllerShouldCallPull(readableController);
+ if (backpressure !== $getByIdDirectPrivate(stream, "backpressure")) {
+ $assert(backpressure);
+ $transformStreamSetBackpressure(stream, true);
+ }
+}
+
+export function transformStreamDefaultControllerError(controller, e) {
+ $transformStreamError($getByIdDirectPrivate(controller, "stream"), e);
+}
+
+export function transformStreamDefaultControllerPerformTransform(controller, chunk) {
+ const promiseCapability = $newPromiseCapability(Promise);
+
+ const transformPromise = $getByIdDirectPrivate(controller, "transformAlgorithm").$call(undefined, chunk);
+ transformPromise.$then(
+ () => {
+ promiseCapability.$resolve();
+ },
+ r => {
+ $transformStreamError($getByIdDirectPrivate(controller, "stream"), r);
+ promiseCapability.$reject.$call(undefined, r);
+ },
+ );
+ return promiseCapability.$promise;
+}
+
+export function transformStreamDefaultControllerTerminate(controller) {
+ const stream = $getByIdDirectPrivate(controller, "stream");
+ const readable = $getByIdDirectPrivate(stream, "readable");
+ const readableController = $getByIdDirectPrivate(readable, "readableStreamController");
+
+ // FIXME: Update readableStreamDefaultControllerClose to make this check.
+ if ($readableStreamDefaultControllerCanCloseOrEnqueue(readableController))
+ $readableStreamDefaultControllerClose(readableController);
+ const error = $makeTypeError("the stream has been terminated");
+ $transformStreamErrorWritableAndUnblockWrite(stream, error);
+}
+
+export function transformStreamDefaultSinkWriteAlgorithm(stream, chunk) {
+ const writable = $getByIdDirectPrivate(stream, "internalWritable");
+
+ $assert($getByIdDirectPrivate(writable, "state") === "writable");
+
+ const controller = $getByIdDirectPrivate(stream, "controller");
+
+ if ($getByIdDirectPrivate(stream, "backpressure")) {
+ const promiseCapability = $newPromiseCapability(Promise);
+
+ const backpressureChangePromise = $getByIdDirectPrivate(stream, "backpressureChangePromise");
+ $assert(backpressureChangePromise !== undefined);
+ backpressureChangePromise.$promise.$then(
+ () => {
+ const state = $getByIdDirectPrivate(writable, "state");
+ if (state === "erroring") {
+ promiseCapability.$reject.$call(undefined, $getByIdDirectPrivate(writable, "storedError"));
+ return;
+ }
+
+ $assert(state === "writable");
+ $transformStreamDefaultControllerPerformTransform(controller, chunk).$then(
+ () => {
+ promiseCapability.$resolve();
+ },
+ e => {
+ promiseCapability.$reject.$call(undefined, e);
+ },
+ );
+ },
+ e => {
+ promiseCapability.$reject.$call(undefined, e);
+ },
+ );
+
+ return promiseCapability.$promise;
+ }
+ return $transformStreamDefaultControllerPerformTransform(controller, chunk);
+}
+
+export function transformStreamDefaultSinkAbortAlgorithm(stream, reason) {
+ $transformStreamError(stream, reason);
+ return Promise.$resolve();
+}
+
+export function transformStreamDefaultSinkCloseAlgorithm(stream) {
+ const readable = $getByIdDirectPrivate(stream, "readable");
+ const controller = $getByIdDirectPrivate(stream, "controller");
+ const readableController = $getByIdDirectPrivate(readable, "readableStreamController");
+
+ const flushAlgorithm = $getByIdDirectPrivate(controller, "flushAlgorithm");
+ $assert(flushAlgorithm !== undefined);
+ const flushPromise = $getByIdDirectPrivate(controller, "flushAlgorithm").$call();
+ $transformStreamDefaultControllerClearAlgorithms(controller);
+
+ const promiseCapability = $newPromiseCapability(Promise);
+ flushPromise.$then(
+ () => {
+ if ($getByIdDirectPrivate(readable, "state") === $streamErrored) {
+ promiseCapability.$reject.$call(undefined, $getByIdDirectPrivate(readable, "storedError"));
+ return;
+ }
+
+ // FIXME: Update readableStreamDefaultControllerClose to make this check.
+ if ($readableStreamDefaultControllerCanCloseOrEnqueue(readableController))
+ $readableStreamDefaultControllerClose(readableController);
+ promiseCapability.$resolve();
+ },
+ r => {
+ $transformStreamError($getByIdDirectPrivate(controller, "stream"), r);
+ promiseCapability.$reject.$call(undefined, $getByIdDirectPrivate(readable, "storedError"));
+ },
+ );
+ return promiseCapability.$promise;
+}
+
+export function transformStreamDefaultSourcePullAlgorithm(stream) {
+ $assert($getByIdDirectPrivate(stream, "backpressure"));
+ $assert($getByIdDirectPrivate(stream, "backpressureChangePromise") !== undefined);
+
+ $transformStreamSetBackpressure(stream, false);
+
+ return $getByIdDirectPrivate(stream, "backpressureChangePromise").$promise;
+}
diff --git a/src/bun.js/builtins/ts/WritableStreamDefaultController.ts b/src/bun.js/builtins/ts/WritableStreamDefaultController.ts
new file mode 100644
index 000000000..1a3ddc290
--- /dev/null
+++ b/src/bun.js/builtins/ts/WritableStreamDefaultController.ts
@@ -0,0 +1,48 @@
+/*
+ * Copyright (C) 2020 Apple Inc. All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ * 1. Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY APPLE INC. AND ITS CONTRIBUTORS ``AS IS''
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
+ * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+ * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL APPLE INC. OR ITS CONTRIBUTORS
+ * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
+ * THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+export function initializeWritableStreamDefaultController(this) {
+ $putByIdDirectPrivate(this, "queue", $newQueue());
+ $putByIdDirectPrivate(this, "abortSteps", reason => {
+ const result = $getByIdDirectPrivate(this, "abortAlgorithm").$call(undefined, reason);
+ $writableStreamDefaultControllerClearAlgorithms(this);
+ return result;
+ });
+
+ $putByIdDirectPrivate(this, "errorSteps", () => {
+ $resetQueue($getByIdDirectPrivate(this, "queue"));
+ });
+
+ return this;
+}
+
+export function error(this, e) {
+ if ($getByIdDirectPrivate(this, "abortSteps") === undefined)
+ throw $makeThisTypeError("WritableStreamDefaultController", "error");
+
+ const stream = $getByIdDirectPrivate(this, "stream");
+ if ($getByIdDirectPrivate(stream, "state") !== "writable") return;
+ $writableStreamDefaultControllerError(this, e);
+}
diff --git a/src/bun.js/builtins/ts/WritableStreamDefaultWriter.ts b/src/bun.js/builtins/ts/WritableStreamDefaultWriter.ts
new file mode 100644
index 000000000..795b43892
--- /dev/null
+++ b/src/bun.js/builtins/ts/WritableStreamDefaultWriter.ts
@@ -0,0 +1,104 @@
+/*
+ * Copyright (C) 2020 Apple Inc. All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ * 1. Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY APPLE INC. AND ITS CONTRIBUTORS ``AS IS''
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
+ * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+ * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL APPLE INC. OR ITS CONTRIBUTORS
+ * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
+ * THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+export function initializeWritableStreamDefaultWriter(stream) {
+ // stream can be a WritableStream if WritableStreamDefaultWriter constructor is called directly from JS
+ // or an InternalWritableStream in other code paths.
+ const internalStream = $getInternalWritableStream(stream);
+ if (internalStream) stream = internalStream;
+
+ if (!$isWritableStream(stream)) $throwTypeError("WritableStreamDefaultWriter constructor takes a WritableStream");
+
+ $setUpWritableStreamDefaultWriter(this, stream);
+ return this;
+}
+
+$getter;
+export function closed() {
+ if (!$isWritableStreamDefaultWriter(this))
+ return Promise.$reject($makeGetterTypeError("WritableStreamDefaultWriter", "closed"));
+
+ return $getByIdDirectPrivate(this, "closedPromise").$promise;
+}
+
+$getter;
+export function desiredSize() {
+ if (!$isWritableStreamDefaultWriter(this)) throw $makeThisTypeError("WritableStreamDefaultWriter", "desiredSize");
+
+ if ($getByIdDirectPrivate(this, "stream") === undefined) $throwTypeError("WritableStreamDefaultWriter has no stream");
+
+ return $writableStreamDefaultWriterGetDesiredSize(this);
+}
+
+$getter;
+export function ready() {
+ if (!$isWritableStreamDefaultWriter(this))
+ return Promise.$reject($makeThisTypeError("WritableStreamDefaultWriter", "ready"));
+
+ return $getByIdDirectPrivate(this, "readyPromise").$promise;
+}
+
+export function abort(reason) {
+ if (!$isWritableStreamDefaultWriter(this))
+ return Promise.$reject($makeThisTypeError("WritableStreamDefaultWriter", "abort"));
+
+ if ($getByIdDirectPrivate(this, "stream") === undefined)
+ return Promise.$reject($makeTypeError("WritableStreamDefaultWriter has no stream"));
+
+ return $writableStreamDefaultWriterAbort(this, reason);
+}
+
+export function close() {
+ if (!$isWritableStreamDefaultWriter(this))
+ return Promise.$reject($makeThisTypeError("WritableStreamDefaultWriter", "close"));
+
+ const stream = $getByIdDirectPrivate(this, "stream");
+ if (stream === undefined) return Promise.$reject($makeTypeError("WritableStreamDefaultWriter has no stream"));
+
+ if ($writableStreamCloseQueuedOrInFlight(stream))
+ return Promise.$reject($makeTypeError("WritableStreamDefaultWriter is being closed"));
+
+ return $writableStreamDefaultWriterClose(this);
+}
+
+export function releaseLock() {
+ if (!$isWritableStreamDefaultWriter(this)) throw $makeThisTypeError("WritableStreamDefaultWriter", "releaseLock");
+
+ const stream = $getByIdDirectPrivate(this, "stream");
+ if (stream === undefined) return;
+
+ $assert($getByIdDirectPrivate(stream, "writer") !== undefined);
+ $writableStreamDefaultWriterRelease(this);
+}
+
+export function write(chunk) {
+ if (!$isWritableStreamDefaultWriter(this))
+ return Promise.$reject($makeThisTypeError("WritableStreamDefaultWriter", "write"));
+
+ if ($getByIdDirectPrivate(this, "stream") === undefined)
+ return Promise.$reject($makeTypeError("WritableStreamDefaultWriter has no stream"));
+
+ return $writableStreamDefaultWriterWrite(this, chunk);
+}
diff --git a/src/bun.js/builtins/ts/WritableStreamInternals.ts b/src/bun.js/builtins/ts/WritableStreamInternals.ts
new file mode 100644
index 000000000..f436a285e
--- /dev/null
+++ b/src/bun.js/builtins/ts/WritableStreamInternals.ts
@@ -0,0 +1,790 @@
+/*
+ * Copyright (C) 2015 Canon Inc.
+ * Copyright (C) 2015 Igalia
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ * 1. Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY APPLE INC. ``AS IS'' AND ANY
+ * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+ * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL APPLE INC. OR
+ * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
+ * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
+ * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
+ * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY
+ * OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+// @internal
+
+export function isWritableStream(stream) {
+ return $isObject(stream) && !!$getByIdDirectPrivate(stream, "underlyingSink");
+}
+
+export function isWritableStreamDefaultWriter(writer) {
+ return $isObject(writer) && !!$getByIdDirectPrivate(writer, "closedPromise");
+}
+
+export function acquireWritableStreamDefaultWriter(stream) {
+ return new WritableStreamDefaultWriter(stream);
+}
+
+// https://streams.spec.whatwg.org/#create-writable-stream
+export function createWritableStream(
+ startAlgorithm,
+ writeAlgorithm,
+ closeAlgorithm,
+ abortAlgorithm,
+ highWaterMark,
+ sizeAlgorithm,
+) {
+ $assert(typeof highWaterMark === "number" && !isNaN(highWaterMark) && highWaterMark >= 0);
+
+ const internalStream = {};
+ $initializeWritableStreamSlots(internalStream, {});
+ const controller = new WritableStreamDefaultController();
+
+ $setUpWritableStreamDefaultController(
+ internalStream,
+ controller,
+ startAlgorithm,
+ writeAlgorithm,
+ closeAlgorithm,
+ abortAlgorithm,
+ highWaterMark,
+ sizeAlgorithm,
+ );
+
+ return $createWritableStreamFromInternal(internalStream);
+}
+
+export function createInternalWritableStreamFromUnderlyingSink(underlyingSink, strategy) {
+ const stream = {};
+
+ if (underlyingSink === undefined) underlyingSink = {};
+
+ if (strategy === undefined) strategy = {};
+
+ if (!$isObject(underlyingSink)) $throwTypeError("WritableStream constructor takes an object as first argument");
+
+ if ("type" in underlyingSink) $throwRangeError("Invalid type is specified");
+
+ const sizeAlgorithm = $extractSizeAlgorithm(strategy);
+ const highWaterMark = $extractHighWaterMark(strategy, 1);
+
+ const underlyingSinkDict = {};
+ if ("start" in underlyingSink) {
+ underlyingSinkDict["start"] = underlyingSink["start"];
+ if (typeof underlyingSinkDict["start"] !== "function") $throwTypeError("underlyingSink.start should be a function");
+ }
+ if ("write" in underlyingSink) {
+ underlyingSinkDict["write"] = underlyingSink["write"];
+ if (typeof underlyingSinkDict["write"] !== "function") $throwTypeError("underlyingSink.write should be a function");
+ }
+ if ("close" in underlyingSink) {
+ underlyingSinkDict["close"] = underlyingSink["close"];
+ if (typeof underlyingSinkDict["close"] !== "function") $throwTypeError("underlyingSink.close should be a function");
+ }
+ if ("abort" in underlyingSink) {
+ underlyingSinkDict["abort"] = underlyingSink["abort"];
+ if (typeof underlyingSinkDict["abort"] !== "function") $throwTypeError("underlyingSink.abort should be a function");
+ }
+
+ $initializeWritableStreamSlots(stream, underlyingSink);
+ $setUpWritableStreamDefaultControllerFromUnderlyingSink(
+ stream,
+ underlyingSink,
+ underlyingSinkDict,
+ highWaterMark,
+ sizeAlgorithm,
+ );
+
+ return stream;
+}
+
+export function initializeWritableStreamSlots(stream, underlyingSink) {
+ $putByIdDirectPrivate(stream, "state", "writable");
+ $putByIdDirectPrivate(stream, "storedError", undefined);
+ $putByIdDirectPrivate(stream, "writer", undefined);
+ $putByIdDirectPrivate(stream, "controller", undefined);
+ $putByIdDirectPrivate(stream, "inFlightWriteRequest", undefined);
+ $putByIdDirectPrivate(stream, "closeRequest", undefined);
+ $putByIdDirectPrivate(stream, "inFlightCloseRequest", undefined);
+ $putByIdDirectPrivate(stream, "pendingAbortRequest", undefined);
+ $putByIdDirectPrivate(stream, "writeRequests", $createFIFO());
+ $putByIdDirectPrivate(stream, "backpressure", false);
+ $putByIdDirectPrivate(stream, "underlyingSink", underlyingSink);
+}
+
+export function writableStreamCloseForBindings(stream) {
+ if ($isWritableStreamLocked(stream))
+ return Promise.$reject($makeTypeError("WritableStream.close method can only be used on non locked WritableStream"));
+
+ if ($writableStreamCloseQueuedOrInFlight(stream))
+ return Promise.$reject(
+ $makeTypeError("WritableStream.close method can only be used on a being close WritableStream"),
+ );
+
+ return $writableStreamClose(stream);
+}
+
+export function writableStreamAbortForBindings(stream, reason) {
+ if ($isWritableStreamLocked(stream))
+ return Promise.$reject($makeTypeError("WritableStream.abort method can only be used on non locked WritableStream"));
+
+ return $writableStreamAbort(stream, reason);
+}
+
+export function isWritableStreamLocked(stream) {
+ return $getByIdDirectPrivate(stream, "writer") !== undefined;
+}
+
+export function setUpWritableStreamDefaultWriter(writer, stream) {
+ if ($isWritableStreamLocked(stream)) $throwTypeError("WritableStream is locked");
+
+ $putByIdDirectPrivate(writer, "stream", stream);
+ $putByIdDirectPrivate(stream, "writer", writer);
+
+ const readyPromiseCapability = $newPromiseCapability(Promise);
+ const closedPromiseCapability = $newPromiseCapability(Promise);
+ $putByIdDirectPrivate(writer, "readyPromise", readyPromiseCapability);
+ $putByIdDirectPrivate(writer, "closedPromise", closedPromiseCapability);
+
+ const state = $getByIdDirectPrivate(stream, "state");
+ if (state === "writable") {
+ if ($writableStreamCloseQueuedOrInFlight(stream) || !$getByIdDirectPrivate(stream, "backpressure"))
+ readyPromiseCapability.$resolve.$call();
+ } else if (state === "erroring") {
+ readyPromiseCapability.$reject.$call(undefined, $getByIdDirectPrivate(stream, "storedError"));
+ $markPromiseAsHandled(readyPromiseCapability.$promise);
+ } else if (state === "closed") {
+ readyPromiseCapability.$resolve.$call();
+ closedPromiseCapability.$resolve.$call();
+ } else {
+ $assert(state === "errored");
+ const storedError = $getByIdDirectPrivate(stream, "storedError");
+ readyPromiseCapability.$reject.$call(undefined, storedError);
+ $markPromiseAsHandled(readyPromiseCapability.$promise);
+ closedPromiseCapability.$reject.$call(undefined, storedError);
+ $markPromiseAsHandled(closedPromiseCapability.$promise);
+ }
+}
+
+export function writableStreamAbort(stream, reason) {
+ const state = $getByIdDirectPrivate(stream, "state");
+ if (state === "closed" || state === "errored") return Promise.$resolve();
+
+ const pendingAbortRequest = $getByIdDirectPrivate(stream, "pendingAbortRequest");
+ if (pendingAbortRequest !== undefined) return pendingAbortRequest.promise.$promise;
+
+ $assert(state === "writable" || state === "erroring");
+ let wasAlreadyErroring = false;
+ if (state === "erroring") {
+ wasAlreadyErroring = true;
+ reason = undefined;
+ }
+
+ const abortPromiseCapability = $newPromiseCapability(Promise);
+ $putByIdDirectPrivate(stream, "pendingAbortRequest", {
+ promise: abortPromiseCapability,
+ reason: reason,
+ wasAlreadyErroring: wasAlreadyErroring,
+ });
+
+ if (!wasAlreadyErroring) $writableStreamStartErroring(stream, reason);
+ return abortPromiseCapability.$promise;
+}
+
+export function writableStreamClose(stream) {
+ const state = $getByIdDirectPrivate(stream, "state");
+ if (state === "closed" || state === "errored")
+ return Promise.$reject($makeTypeError("Cannot close a writable stream that is closed or errored"));
+
+ $assert(state === "writable" || state === "erroring");
+ $assert(!$writableStreamCloseQueuedOrInFlight(stream));
+
+ const closePromiseCapability = $newPromiseCapability(Promise);
+ $putByIdDirectPrivate(stream, "closeRequest", closePromiseCapability);
+
+ const writer = $getByIdDirectPrivate(stream, "writer");
+ if (writer !== undefined && $getByIdDirectPrivate(stream, "backpressure") && state === "writable")
+ $getByIdDirectPrivate(writer, "readyPromise").$resolve.$call();
+
+ $writableStreamDefaultControllerClose($getByIdDirectPrivate(stream, "controller"));
+
+ return closePromiseCapability.$promise;
+}
+
+export function writableStreamAddWriteRequest(stream) {
+ $assert($isWritableStreamLocked(stream));
+ $assert($getByIdDirectPrivate(stream, "state") === "writable");
+
+ const writePromiseCapability = $newPromiseCapability(Promise);
+ const writeRequests = $getByIdDirectPrivate(stream, "writeRequests");
+ writeRequests.push(writePromiseCapability);
+ return writePromiseCapability.$promise;
+}
+
+export function writableStreamCloseQueuedOrInFlight(stream) {
+ return (
+ $getByIdDirectPrivate(stream, "closeRequest") !== undefined ||
+ $getByIdDirectPrivate(stream, "inFlightCloseRequest") !== undefined
+ );
+}
+
+export function writableStreamDealWithRejection(stream, error) {
+ const state = $getByIdDirectPrivate(stream, "state");
+ if (state === "writable") {
+ $writableStreamStartErroring(stream, error);
+ return;
+ }
+
+ $assert(state === "erroring");
+ $writableStreamFinishErroring(stream);
+}
+
+export function writableStreamFinishErroring(stream) {
+ $assert($getByIdDirectPrivate(stream, "state") === "erroring");
+ $assert(!$writableStreamHasOperationMarkedInFlight(stream));
+
+ $putByIdDirectPrivate(stream, "state", "errored");
+
+ const controller = $getByIdDirectPrivate(stream, "controller");
+ $getByIdDirectPrivate(controller, "errorSteps").$call();
+
+ const storedError = $getByIdDirectPrivate(stream, "storedError");
+ const requests = $getByIdDirectPrivate(stream, "writeRequests");
+ for (var request = requests.shift(); request; request = requests.shift())
+ request.$reject.$call(undefined, storedError);
+
+ // TODO: is this still necessary?
+ $putByIdDirectPrivate(stream, "writeRequests", $createFIFO());
+
+ const abortRequest = $getByIdDirectPrivate(stream, "pendingAbortRequest");
+ if (abortRequest === undefined) {
+ $writableStreamRejectCloseAndClosedPromiseIfNeeded(stream);
+ return;
+ }
+
+ $putByIdDirectPrivate(stream, "pendingAbortRequest", undefined);
+ if (abortRequest.wasAlreadyErroring) {
+ abortRequest.promise.$reject.$call(undefined, storedError);
+ $writableStreamRejectCloseAndClosedPromiseIfNeeded(stream);
+ return;
+ }
+
+ $getByIdDirectPrivate(controller, "abortSteps")
+ .$call(undefined, abortRequest.reason)
+ .$then(
+ () => {
+ abortRequest.promise.$resolve.$call();
+ $writableStreamRejectCloseAndClosedPromiseIfNeeded(stream);
+ },
+ reason => {
+ abortRequest.promise.$reject.$call(undefined, reason);
+ $writableStreamRejectCloseAndClosedPromiseIfNeeded(stream);
+ },
+ );
+}
+
+export function writableStreamFinishInFlightClose(stream) {
+ const inFlightCloseRequest = $getByIdDirectPrivate(stream, "inFlightCloseRequest");
+ inFlightCloseRequest.$resolve.$call();
+
+ $putByIdDirectPrivate(stream, "inFlightCloseRequest", undefined);
+
+ const state = $getByIdDirectPrivate(stream, "state");
+ $assert(state === "writable" || state === "erroring");
+
+ if (state === "erroring") {
+ $putByIdDirectPrivate(stream, "storedError", undefined);
+ const abortRequest = $getByIdDirectPrivate(stream, "pendingAbortRequest");
+ if (abortRequest !== undefined) {
+ abortRequest.promise.$resolve.$call();
+ $putByIdDirectPrivate(stream, "pendingAbortRequest", undefined);
+ }
+ }
+
+ $putByIdDirectPrivate(stream, "state", "closed");
+
+ const writer = $getByIdDirectPrivate(stream, "writer");
+ if (writer !== undefined) $getByIdDirectPrivate(writer, "closedPromise").$resolve.$call();
+
+ $assert($getByIdDirectPrivate(stream, "pendingAbortRequest") === undefined);
+ $assert($getByIdDirectPrivate(stream, "storedError") === undefined);
+}
+
+export function writableStreamFinishInFlightCloseWithError(stream, error) {
+ const inFlightCloseRequest = $getByIdDirectPrivate(stream, "inFlightCloseRequest");
+ $assert(inFlightCloseRequest !== undefined);
+ inFlightCloseRequest.$reject.$call(undefined, error);
+
+ $putByIdDirectPrivate(stream, "inFlightCloseRequest", undefined);
+
+ const state = $getByIdDirectPrivate(stream, "state");
+ $assert(state === "writable" || state === "erroring");
+
+ const abortRequest = $getByIdDirectPrivate(stream, "pendingAbortRequest");
+ if (abortRequest !== undefined) {
+ abortRequest.promise.$reject.$call(undefined, error);
+ $putByIdDirectPrivate(stream, "pendingAbortRequest", undefined);
+ }
+
+ $writableStreamDealWithRejection(stream, error);
+}
+
+export function writableStreamFinishInFlightWrite(stream) {
+ const inFlightWriteRequest = $getByIdDirectPrivate(stream, "inFlightWriteRequest");
+ $assert(inFlightWriteRequest !== undefined);
+ inFlightWriteRequest.$resolve.$call();
+
+ $putByIdDirectPrivate(stream, "inFlightWriteRequest", undefined);
+}
+
+export function writableStreamFinishInFlightWriteWithError(stream, error) {
+ const inFlightWriteRequest = $getByIdDirectPrivate(stream, "inFlightWriteRequest");
+ $assert(inFlightWriteRequest !== undefined);
+ inFlightWriteRequest.$reject.$call(undefined, error);
+
+ $putByIdDirectPrivate(stream, "inFlightWriteRequest", undefined);
+
+ const state = $getByIdDirectPrivate(stream, "state");
+ $assert(state === "writable" || state === "erroring");
+
+ $writableStreamDealWithRejection(stream, error);
+}
+
+export function writableStreamHasOperationMarkedInFlight(stream) {
+ return (
+ $getByIdDirectPrivate(stream, "inFlightWriteRequest") !== undefined ||
+ $getByIdDirectPrivate(stream, "inFlightCloseRequest") !== undefined
+ );
+}
+
+export function writableStreamMarkCloseRequestInFlight(stream) {
+ const closeRequest = $getByIdDirectPrivate(stream, "closeRequest");
+ $assert($getByIdDirectPrivate(stream, "inFlightCloseRequest") === undefined);
+ $assert(closeRequest !== undefined);
+
+ $putByIdDirectPrivate(stream, "inFlightCloseRequest", closeRequest);
+ $putByIdDirectPrivate(stream, "closeRequest", undefined);
+}
+
+export function writableStreamMarkFirstWriteRequestInFlight(stream) {
+ const writeRequests = $getByIdDirectPrivate(stream, "writeRequests");
+ $assert($getByIdDirectPrivate(stream, "inFlightWriteRequest") === undefined);
+ $assert(writeRequests.isNotEmpty());
+
+ const writeRequest = writeRequests.shift();
+ $putByIdDirectPrivate(stream, "inFlightWriteRequest", writeRequest);
+}
+
+export function writableStreamRejectCloseAndClosedPromiseIfNeeded(stream) {
+ $assert($getByIdDirectPrivate(stream, "state") === "errored");
+
+ const storedError = $getByIdDirectPrivate(stream, "storedError");
+
+ const closeRequest = $getByIdDirectPrivate(stream, "closeRequest");
+ if (closeRequest !== undefined) {
+ $assert($getByIdDirectPrivate(stream, "inFlightCloseRequest") === undefined);
+ closeRequest.$reject.$call(undefined, storedError);
+ $putByIdDirectPrivate(stream, "closeRequest", undefined);
+ }
+
+ const writer = $getByIdDirectPrivate(stream, "writer");
+ if (writer !== undefined) {
+ const closedPromise = $getByIdDirectPrivate(writer, "closedPromise");
+ closedPromise.$reject.$call(undefined, storedError);
+ $markPromiseAsHandled(closedPromise.$promise);
+ }
+}
+
+export function writableStreamStartErroring(stream, reason) {
+ $assert($getByIdDirectPrivate(stream, "storedError") === undefined);
+ $assert($getByIdDirectPrivate(stream, "state") === "writable");
+
+ const controller = $getByIdDirectPrivate(stream, "controller");
+ $assert(controller !== undefined);
+
+ $putByIdDirectPrivate(stream, "state", "erroring");
+ $putByIdDirectPrivate(stream, "storedError", reason);
+
+ const writer = $getByIdDirectPrivate(stream, "writer");
+ if (writer !== undefined) $writableStreamDefaultWriterEnsureReadyPromiseRejected(writer, reason);
+
+ if (!$writableStreamHasOperationMarkedInFlight(stream) && $getByIdDirectPrivate(controller, "started") === 1)
+ $writableStreamFinishErroring(stream);
+}
+
+export function writableStreamUpdateBackpressure(stream, backpressure) {
+ $assert($getByIdDirectPrivate(stream, "state") === "writable");
+ $assert(!$writableStreamCloseQueuedOrInFlight(stream));
+
+ const writer = $getByIdDirectPrivate(stream, "writer");
+ if (writer !== undefined && backpressure !== $getByIdDirectPrivate(stream, "backpressure")) {
+ if (backpressure) $putByIdDirectPrivate(writer, "readyPromise", $newPromiseCapability(Promise));
+ else $getByIdDirectPrivate(writer, "readyPromise").$resolve.$call();
+ }
+ $putByIdDirectPrivate(stream, "backpressure", backpressure);
+}
+
+export function writableStreamDefaultWriterAbort(writer, reason) {
+ const stream = $getByIdDirectPrivate(writer, "stream");
+ $assert(stream !== undefined);
+ return $writableStreamAbort(stream, reason);
+}
+
+export function writableStreamDefaultWriterClose(writer) {
+ const stream = $getByIdDirectPrivate(writer, "stream");
+ $assert(stream !== undefined);
+ return $writableStreamClose(stream);
+}
+
+export function writableStreamDefaultWriterCloseWithErrorPropagation(writer) {
+ const stream = $getByIdDirectPrivate(writer, "stream");
+ $assert(stream !== undefined);
+
+ const state = $getByIdDirectPrivate(stream, "state");
+
+ if ($writableStreamCloseQueuedOrInFlight(stream) || state === "closed") return Promise.$resolve();
+
+ if (state === "errored") return Promise.$reject($getByIdDirectPrivate(stream, "storedError"));
+
+ $assert(state === "writable" || state === "erroring");
+ return $writableStreamDefaultWriterClose(writer);
+}
+
+export function writableStreamDefaultWriterEnsureClosedPromiseRejected(writer, error) {
+ let closedPromiseCapability = $getByIdDirectPrivate(writer, "closedPromise");
+ let closedPromise = closedPromiseCapability.$promise;
+
+ if (($getPromiseInternalField(closedPromise, $promiseFieldFlags) & $promiseStateMask) !== $promiseStatePending) {
+ closedPromiseCapability = $newPromiseCapability(Promise);
+ closedPromise = closedPromiseCapability.$promise;
+ $putByIdDirectPrivate(writer, "closedPromise", closedPromiseCapability);
+ }
+
+ closedPromiseCapability.$reject.$call(undefined, error);
+ $markPromiseAsHandled(closedPromise);
+}
+
+export function writableStreamDefaultWriterEnsureReadyPromiseRejected(writer, error) {
+ let readyPromiseCapability = $getByIdDirectPrivate(writer, "readyPromise");
+ let readyPromise = readyPromiseCapability.$promise;
+
+ if (($getPromiseInternalField(readyPromise, $promiseFieldFlags) & $promiseStateMask) !== $promiseStatePending) {
+ readyPromiseCapability = $newPromiseCapability(Promise);
+ readyPromise = readyPromiseCapability.$promise;
+ $putByIdDirectPrivate(writer, "readyPromise", readyPromiseCapability);
+ }
+
+ readyPromiseCapability.$reject.$call(undefined, error);
+ $markPromiseAsHandled(readyPromise);
+}
+
+export function writableStreamDefaultWriterGetDesiredSize(writer) {
+ const stream = $getByIdDirectPrivate(writer, "stream");
+ $assert(stream !== undefined);
+
+ const state = $getByIdDirectPrivate(stream, "state");
+
+ if (state === "errored" || state === "erroring") return null;
+
+ if (state === "closed") return 0;
+
+ return $writableStreamDefaultControllerGetDesiredSize($getByIdDirectPrivate(stream, "controller"));
+}
+
+export function writableStreamDefaultWriterRelease(writer) {
+ const stream = $getByIdDirectPrivate(writer, "stream");
+ $assert(stream !== undefined);
+ $assert($getByIdDirectPrivate(stream, "writer") === writer);
+
+ const releasedError = $makeTypeError("writableStreamDefaultWriterRelease");
+
+ $writableStreamDefaultWriterEnsureReadyPromiseRejected(writer, releasedError);
+ $writableStreamDefaultWriterEnsureClosedPromiseRejected(writer, releasedError);
+
+ $putByIdDirectPrivate(stream, "writer", undefined);
+ $putByIdDirectPrivate(writer, "stream", undefined);
+}
+
+export function writableStreamDefaultWriterWrite(writer, chunk) {
+ const stream = $getByIdDirectPrivate(writer, "stream");
+ $assert(stream !== undefined);
+
+ const controller = $getByIdDirectPrivate(stream, "controller");
+ $assert(controller !== undefined);
+ const chunkSize = $writableStreamDefaultControllerGetChunkSize(controller, chunk);
+
+ if (stream !== $getByIdDirectPrivate(writer, "stream"))
+ return Promise.$reject($makeTypeError("writer is not stream's writer"));
+
+ const state = $getByIdDirectPrivate(stream, "state");
+ if (state === "errored") return Promise.$reject($getByIdDirectPrivate(stream, "storedError"));
+
+ if ($writableStreamCloseQueuedOrInFlight(stream) || state === "closed")
+ return Promise.$reject($makeTypeError("stream is closing or closed"));
+
+ if ($writableStreamCloseQueuedOrInFlight(stream) || state === "closed")
+ return Promise.$reject($makeTypeError("stream is closing or closed"));
+
+ if (state === "erroring") return Promise.$reject($getByIdDirectPrivate(stream, "storedError"));
+
+ $assert(state === "writable");
+
+ const promise = $writableStreamAddWriteRequest(stream);
+ $writableStreamDefaultControllerWrite(controller, chunk, chunkSize);
+ return promise;
+}
+
+export function setUpWritableStreamDefaultController(
+ stream,
+ controller,
+ startAlgorithm,
+ writeAlgorithm,
+ closeAlgorithm,
+ abortAlgorithm,
+ highWaterMark,
+ sizeAlgorithm,
+) {
+ $assert($isWritableStream(stream));
+ $assert($getByIdDirectPrivate(stream, "controller") === undefined);
+
+ $putByIdDirectPrivate(controller, "stream", stream);
+ $putByIdDirectPrivate(stream, "controller", controller);
+
+ $resetQueue($getByIdDirectPrivate(controller, "queue"));
+
+ $putByIdDirectPrivate(controller, "started", -1);
+ $putByIdDirectPrivate(controller, "startAlgorithm", startAlgorithm);
+ $putByIdDirectPrivate(controller, "strategySizeAlgorithm", sizeAlgorithm);
+ $putByIdDirectPrivate(controller, "strategyHWM", highWaterMark);
+ $putByIdDirectPrivate(controller, "writeAlgorithm", writeAlgorithm);
+ $putByIdDirectPrivate(controller, "closeAlgorithm", closeAlgorithm);
+ $putByIdDirectPrivate(controller, "abortAlgorithm", abortAlgorithm);
+
+ const backpressure = $writableStreamDefaultControllerGetBackpressure(controller);
+ $writableStreamUpdateBackpressure(stream, backpressure);
+
+ $writableStreamDefaultControllerStart(controller);
+}
+
+export function writableStreamDefaultControllerStart(controller) {
+ if ($getByIdDirectPrivate(controller, "started") !== -1) return;
+
+ $putByIdDirectPrivate(controller, "started", 0);
+
+ const startAlgorithm = $getByIdDirectPrivate(controller, "startAlgorithm");
+ $putByIdDirectPrivate(controller, "startAlgorithm", undefined);
+ const stream = $getByIdDirectPrivate(controller, "stream");
+ return Promise.$resolve(startAlgorithm.$call()).$then(
+ () => {
+ const state = $getByIdDirectPrivate(stream, "state");
+ $assert(state === "writable" || state === "erroring");
+ $putByIdDirectPrivate(controller, "started", 1);
+ $writableStreamDefaultControllerAdvanceQueueIfNeeded(controller);
+ },
+ error => {
+ const state = $getByIdDirectPrivate(stream, "state");
+ $assert(state === "writable" || state === "erroring");
+ $putByIdDirectPrivate(controller, "started", 1);
+ $writableStreamDealWithRejection(stream, error);
+ },
+ );
+}
+
+export function setUpWritableStreamDefaultControllerFromUnderlyingSink(
+ stream,
+ underlyingSink,
+ underlyingSinkDict,
+ highWaterMark,
+ sizeAlgorithm,
+) {
+ const controller = new $WritableStreamDefaultController();
+
+ let startAlgorithm = () => {};
+ let writeAlgorithm = () => {
+ return Promise.$resolve();
+ };
+ let closeAlgorithm = () => {
+ return Promise.$resolve();
+ };
+ let abortAlgorithm = () => {
+ return Promise.$resolve();
+ };
+
+ if ("start" in underlyingSinkDict) {
+ const startMethod = underlyingSinkDict["start"];
+ startAlgorithm = () => $promiseInvokeOrNoopMethodNoCatch(underlyingSink, startMethod, [controller]);
+ }
+ if ("write" in underlyingSinkDict) {
+ const writeMethod = underlyingSinkDict["write"];
+ writeAlgorithm = chunk => $promiseInvokeOrNoopMethod(underlyingSink, writeMethod, [chunk, controller]);
+ }
+ if ("close" in underlyingSinkDict) {
+ const closeMethod = underlyingSinkDict["close"];
+ closeAlgorithm = () => $promiseInvokeOrNoopMethod(underlyingSink, closeMethod, []);
+ }
+ if ("abort" in underlyingSinkDict) {
+ const abortMethod = underlyingSinkDict["abort"];
+ abortAlgorithm = reason => $promiseInvokeOrNoopMethod(underlyingSink, abortMethod, [reason]);
+ }
+
+ $setUpWritableStreamDefaultController(
+ stream,
+ controller,
+ startAlgorithm,
+ writeAlgorithm,
+ closeAlgorithm,
+ abortAlgorithm,
+ highWaterMark,
+ sizeAlgorithm,
+ );
+}
+
+export function writableStreamDefaultControllerAdvanceQueueIfNeeded(controller) {
+ const stream = $getByIdDirectPrivate(controller, "stream");
+
+ if ($getByIdDirectPrivate(controller, "started") !== 1) return;
+
+ $assert(stream !== undefined);
+ if ($getByIdDirectPrivate(stream, "inFlightWriteRequest") !== undefined) return;
+
+ const state = $getByIdDirectPrivate(stream, "state");
+ $assert(state !== "closed" || state !== "errored");
+ if (state === "erroring") {
+ $writableStreamFinishErroring(stream);
+ return;
+ }
+
+ const queue = $getByIdDirectPrivate(controller, "queue");
+
+ if (queue.content?.isEmpty() ?? false) return;
+
+ const value = $peekQueueValue(queue);
+ if (value === $isCloseSentinel) $writableStreamDefaultControllerProcessClose(controller);
+ else $writableStreamDefaultControllerProcessWrite(controller, value);
+}
+
+export function isCloseSentinel() {}
+
+export function writableStreamDefaultControllerClearAlgorithms(controller) {
+ $putByIdDirectPrivate(controller, "writeAlgorithm", undefined);
+ $putByIdDirectPrivate(controller, "closeAlgorithm", undefined);
+ $putByIdDirectPrivate(controller, "abortAlgorithm", undefined);
+ $putByIdDirectPrivate(controller, "strategySizeAlgorithm", undefined);
+}
+
+export function writableStreamDefaultControllerClose(controller) {
+ $enqueueValueWithSize($getByIdDirectPrivate(controller, "queue"), $isCloseSentinel, 0);
+ $writableStreamDefaultControllerAdvanceQueueIfNeeded(controller);
+}
+
+export function writableStreamDefaultControllerError(controller, error) {
+ const stream = $getByIdDirectPrivate(controller, "stream");
+ $assert(stream !== undefined);
+ $assert($getByIdDirectPrivate(stream, "state") === "writable");
+
+ $writableStreamDefaultControllerClearAlgorithms(controller);
+ $writableStreamStartErroring(stream, error);
+}
+
+export function writableStreamDefaultControllerErrorIfNeeded(controller, error) {
+ const stream = $getByIdDirectPrivate(controller, "stream");
+ if ($getByIdDirectPrivate(stream, "state") === "writable") $writableStreamDefaultControllerError(controller, error);
+}
+
+export function writableStreamDefaultControllerGetBackpressure(controller) {
+ const desiredSize = $writableStreamDefaultControllerGetDesiredSize(controller);
+ return desiredSize <= 0;
+}
+
+export function writableStreamDefaultControllerGetChunkSize(controller, chunk) {
+ try {
+ return $getByIdDirectPrivate(controller, "strategySizeAlgorithm").$call(undefined, chunk);
+ } catch (e) {
+ $writableStreamDefaultControllerErrorIfNeeded(controller, e);
+ return 1;
+ }
+}
+
+export function writableStreamDefaultControllerGetDesiredSize(controller) {
+ return $getByIdDirectPrivate(controller, "strategyHWM") - $getByIdDirectPrivate(controller, "queue").size;
+}
+
+export function writableStreamDefaultControllerProcessClose(controller) {
+ const stream = $getByIdDirectPrivate(controller, "stream");
+
+ $writableStreamMarkCloseRequestInFlight(stream);
+ $dequeueValue($getByIdDirectPrivate(controller, "queue"));
+
+ $assert($getByIdDirectPrivate(controller, "queue").content?.isEmpty());
+
+ const sinkClosePromise = $getByIdDirectPrivate(controller, "closeAlgorithm").$call();
+ $writableStreamDefaultControllerClearAlgorithms(controller);
+
+ sinkClosePromise.$then(
+ () => {
+ $writableStreamFinishInFlightClose(stream);
+ },
+ reason => {
+ $writableStreamFinishInFlightCloseWithError(stream, reason);
+ },
+ );
+}
+
+export function writableStreamDefaultControllerProcessWrite(controller, chunk) {
+ const stream = $getByIdDirectPrivate(controller, "stream");
+
+ $writableStreamMarkFirstWriteRequestInFlight(stream);
+
+ const sinkWritePromise = $getByIdDirectPrivate(controller, "writeAlgorithm").$call(undefined, chunk);
+
+ sinkWritePromise.$then(
+ () => {
+ $writableStreamFinishInFlightWrite(stream);
+ const state = $getByIdDirectPrivate(stream, "state");
+ $assert(state === "writable" || state === "erroring");
+
+ $dequeueValue($getByIdDirectPrivate(controller, "queue"));
+ if (!$writableStreamCloseQueuedOrInFlight(stream) && state === "writable") {
+ const backpressure = $writableStreamDefaultControllerGetBackpressure(controller);
+ $writableStreamUpdateBackpressure(stream, backpressure);
+ }
+ $writableStreamDefaultControllerAdvanceQueueIfNeeded(controller);
+ },
+ reason => {
+ const state = $getByIdDirectPrivate(stream, "state");
+ if (state === "writable") $writableStreamDefaultControllerClearAlgorithms(controller);
+
+ $writableStreamFinishInFlightWriteWithError(stream, reason);
+ },
+ );
+}
+
+export function writableStreamDefaultControllerWrite(controller, chunk, chunkSize) {
+ try {
+ $enqueueValueWithSize($getByIdDirectPrivate(controller, "queue"), chunk, chunkSize);
+
+ const stream = $getByIdDirectPrivate(controller, "stream");
+
+ const state = $getByIdDirectPrivate(stream, "state");
+ if (!$writableStreamCloseQueuedOrInFlight(stream) && state === "writable") {
+ const backpressure = $writableStreamDefaultControllerGetBackpressure(controller);
+ $writableStreamUpdateBackpressure(stream, backpressure);
+ }
+ $writableStreamDefaultControllerAdvanceQueueIfNeeded(controller);
+ } catch (e) {
+ $writableStreamDefaultControllerErrorIfNeeded(controller, e);
+ }
+}