diff options
author | 2023-05-22 18:51:05 -0700 | |
---|---|---|
committer | 2023-05-22 18:51:05 -0700 | |
commit | fc40c690ea30a632a8d0d9490321c50ec898d8a5 (patch) | |
tree | 6e3ca0bb2c02347006a6b2a09c4aa156b86bd770 /src/bun.js/builtins/ts | |
parent | 23d42dc2377440dedc9d8e423f1ea077507d62c8 (diff) | |
download | bun-fc40c690ea30a632a8d0d9490321c50ec898d8a5.tar.gz bun-fc40c690ea30a632a8d0d9490321c50ec898d8a5.tar.zst bun-fc40c690ea30a632a8d0d9490321c50ec898d8a5.zip |
Write out builtins with TypeScript + Minify them (#2999)
* start work drafting how builtins will work
* work on ts builtin
* builtins stuff so far
* builtins
* done for today
* continue work
* working on it
* bindings so far
* well, it builds. doesnt run
* IT RUNS
* still lots of ts errors but it is functional
* sloppy mode
Diffstat (limited to 'src/bun.js/builtins/ts')
23 files changed, 7012 insertions, 0 deletions
diff --git a/src/bun.js/builtins/ts/BundlerPlugin.ts b/src/bun.js/builtins/ts/BundlerPlugin.ts new file mode 100644 index 000000000..831a6614e --- /dev/null +++ b/src/bun.js/builtins/ts/BundlerPlugin.ts @@ -0,0 +1,370 @@ +import type { + AnyFunction, + BuildConfig, + BunPlugin, + OnLoadCallback, + OnLoadResult, + OnLoadResultObject, + OnLoadResultSourceCode, + OnResolveCallback, + PluginBuilder, + PluginConstraints, +} from "bun"; + +// This API expects 4 functions: +// It should be generic enough to reuse for Bun.plugin() eventually, too. +interface BundlerPlugin { + onLoad: Map<string, [RegExp, OnLoadCallback][]>; + onResolve: Map<string, [RegExp, OnResolveCallback][]>; + onLoadAsync( + internalID, + sourceCode: string | Uint8Array | ArrayBuffer | DataView | null, + loaderKey: number | null, + ): void; + onResolveAsync(internalID, a, b, c): void; + addError(internalID, error, number): void; + addFilter(filter, namespace, number): void; +} + +// Extra types +type Setup = BunPlugin["setup"]; +type MinifyObj = Exclude<BuildConfig["minify"], boolean>; +interface BuildConfigExt extends BuildConfig { + // we support esbuild-style entryPoints + entryPoints?: string[]; + // plugins is guaranteed to not be null + plugins: BunPlugin[]; +} +interface PluginBuilderExt extends PluginBuilder { + // these functions aren't implemented yet, so we dont publicly expose them + resolve: AnyFunction; + onStart: AnyFunction; + onEnd: AnyFunction; + onDispose: AnyFunction; + // we partially support initialOptions. it's read-only and a subset of + // all options mapped to their esbuild names + initialOptions: any; + // we set this to an empty object + esbuild: any; +} + +export function runSetupFunction(this: BundlerPlugin, setup: Setup, config: BuildConfigExt) { + var onLoadPlugins = new Map<string, [RegExp, AnyFunction][]>(); + var onResolvePlugins = new Map<string, [RegExp, AnyFunction][]>(); + + function validate(filterObject: PluginConstraints, callback, map) { + if (!filterObject || !$isObject(filterObject)) { + throw new TypeError('Expected an object with "filter" RegExp'); + } + + if (!callback || !$isCallable(callback)) { + throw new TypeError("callback must be a function"); + } + + var { filter, namespace = "file" } = filterObject; + + if (!filter) { + throw new TypeError('Expected an object with "filter" RegExp'); + } + + if (!$isRegExpObject(filter)) { + throw new TypeError("filter must be a RegExp"); + } + + if (namespace && !(typeof namespace === "string")) { + throw new TypeError("namespace must be a string"); + } + + if ((namespace?.length ?? 0) === 0) { + namespace = "file"; + } + + if (!/^([/$a-zA-Z0-9_\\-]+)$/.test(namespace)) { + throw new TypeError("namespace can only contain $a-zA-Z0-9_\\-"); + } + + var callbacks = map.$get(namespace); + + if (!callbacks) { + map.$set(namespace, [[filter, callback]]); + } else { + $arrayPush(callbacks, [filter, callback]); + } + } + + function onLoad(filterObject, callback) { + validate(filterObject, callback, onLoadPlugins); + } + + function onResolve(filterObject, callback) { + validate(filterObject, callback, onResolvePlugins); + } + + const processSetupResult = () => { + var anyOnLoad = false, + anyOnResolve = false; + + for (var [namespace, callbacks] of onLoadPlugins.entries()) { + for (var [filter] of callbacks) { + this.addFilter(filter, namespace, 1); + anyOnLoad = true; + } + } + + for (var [namespace, callbacks] of onResolvePlugins.entries()) { + for (var [filter] of callbacks) { + this.addFilter(filter, namespace, 0); + anyOnResolve = true; + } + } + + if (anyOnResolve) { + var onResolveObject = this.onResolve; + if (!onResolveObject) { + this.onResolve = onResolvePlugins; + } else { + for (var [namespace, callbacks] of onResolvePlugins.entries()) { + var existing = onResolveObject.$get(namespace) as [RegExp, AnyFunction][]; + + if (!existing) { + onResolveObject.$set(namespace, callbacks); + } else { + onResolveObject.$set(namespace, existing.concat(callbacks)); + } + } + } + } + + if (anyOnLoad) { + var onLoadObject = this.onLoad; + if (!onLoadObject) { + this.onLoad = onLoadPlugins; + } else { + for (var [namespace, callbacks] of onLoadPlugins.entries()) { + var existing = onLoadObject.$get(namespace) as [RegExp, AnyFunction][]; + + if (!existing) { + onLoadObject.$set(namespace, callbacks); + } else { + onLoadObject.$set(namespace, existing.concat(callbacks)); + } + } + } + } + + return anyOnLoad || anyOnResolve; + }; + + var setupResult = setup({ + config: config, + onDispose: notImplementedIssueFn(2771, "On-dispose callbacks"), + onEnd: notImplementedIssueFn(2771, "On-end callbacks"), + onLoad, + onResolve, + onStart: notImplementedIssueFn(2771, "On-start callbacks"), + resolve: notImplementedIssueFn(2771, "build.resolve()"), + // esbuild's options argument is different, we provide some interop + initialOptions: { + ...config, + bundle: true, + entryPoints: config.entrypoints ?? config.entryPoints ?? [], + minify: typeof config.minify === "boolean" ? config.minify : false, + minifyIdentifiers: config.minify === true || (config.minify as MinifyObj)?.identifiers, + minifyWhitespace: config.minify === true || (config.minify as MinifyObj)?.whitespace, + minifySyntax: config.minify === true || (config.minify as MinifyObj)?.syntax, + outbase: config.root, + platform: config.target === "bun" ? "node" : config.target, + }, + esbuild: {}, + } satisfies PluginBuilderExt as PluginBuilder); + + if (setupResult && $isPromise(setupResult)) { + if ($getPromiseInternalField(setupResult, $promiseFieldFlags) & $promiseStateFulfilled) { + setupResult = $getPromiseInternalField(setupResult, $promiseFieldReactionsOrResult); + } else { + return setupResult.$then(processSetupResult); + } + } + + return processSetupResult(); +} + +export function runOnResolvePlugins(this: BundlerPlugin, specifier, inputNamespace, importer, internalID, kindId) { + // Must be kept in sync with ImportRecord.label + const kind = $ImportKindIdToLabel[kindId]; + + var promiseResult: any = (async (inputPath, inputNamespace, importer, kind) => { + var { onResolve, onLoad } = this; + var results = onResolve.$get(inputNamespace); + if (!results) { + this.onResolveAsync(internalID, null, null, null); + return null; + } + + for (let [filter, callback] of results) { + if (filter.test(inputPath)) { + var result = callback({ + path: inputPath, + importer, + namespace: inputNamespace, + // resolveDir + kind, + // pluginData + }); + + while ( + result && + $isPromise(result) && + ($getPromiseInternalField(result, $promiseFieldFlags) & $promiseStateMask) === $promiseStateFulfilled + ) { + result = $getPromiseInternalField(result, $promiseFieldReactionsOrResult); + } + + if (result && $isPromise(result)) { + result = await result; + } + + if (!result || !$isObject(result)) { + continue; + } + + var { path, namespace: userNamespace = inputNamespace, external } = result; + if (!(typeof path === "string") || !(typeof userNamespace === "string")) { + throw new TypeError("onResolve plugins must return an object with a string 'path' and string 'loader' field"); + } + + if (!path) { + continue; + } + + if (!userNamespace) { + userNamespace = inputNamespace; + } + if (typeof external !== "boolean" && !$isUndefinedOrNull(external)) { + throw new TypeError('onResolve plugins "external" field must be boolean or unspecified'); + } + + if (!external) { + if (userNamespace === "file") { + if (process.platform !== "win32") { + if (path[0] !== "/" || path.includes("..")) { + throw new TypeError('onResolve plugin "path" must be absolute when the namespace is "file"'); + } + } else { + // TODO: Windows + } + } + if (userNamespace === "dataurl") { + if (!path.startsWith("data:")) { + throw new TypeError('onResolve plugin "path" must start with "data:" when the namespace is "dataurl"'); + } + } + + if (userNamespace && userNamespace !== "file" && (!onLoad || !onLoad.$has(userNamespace))) { + throw new TypeError(`Expected onLoad plugin for namespace ${userNamespace} to exist`); + } + } + this.onResolveAsync(internalID, path, userNamespace, external); + return null; + } + } + + this.onResolveAsync(internalID, null, null, null); + return null; + })(specifier, inputNamespace, importer, kind); + + while ( + promiseResult && + $isPromise(promiseResult) && + ($getPromiseInternalField(promiseResult, $promiseFieldFlags) & $promiseStateMask) === $promiseStateFulfilled + ) { + promiseResult = $getPromiseInternalField(promiseResult, $promiseFieldReactionsOrResult); + } + + if (promiseResult && $isPromise(promiseResult)) { + promiseResult.then( + () => {}, + e => { + this.addError(internalID, e, 0); + }, + ); + } +} + +export function runOnLoadPlugins(this: BundlerPlugin, internalID, path, namespace, defaultLoaderId) { + const LOADERS_MAP = $LoaderLabelToId; + const loaderName = $LoaderIdToLabel[defaultLoaderId]; + + var promiseResult = (async (internalID, path, namespace, defaultLoader) => { + var results = this.onLoad.$get(namespace); + if (!results) { + this.onLoadAsync(internalID, null, null); + return null; + } + + for (let [filter, callback] of results) { + if (filter.test(path)) { + var result = callback({ + path, + namespace, + // suffix + // pluginData + loader: defaultLoader, + }); + + while ( + result && + $isPromise(result) && + ($getPromiseInternalField(result, $promiseFieldFlags) & $promiseStateMask) === $promiseStateFulfilled + ) { + result = $getPromiseInternalField(result, $promiseFieldReactionsOrResult); + } + + if (result && $isPromise(result)) { + result = await result; + } + + if (!result || !$isObject(result)) { + continue; + } + + var { contents, loader = defaultLoader } = result as OnLoadResultSourceCode & OnLoadResultObject; + if (!(typeof contents === "string") && !$isTypedArrayView(contents)) { + throw new TypeError('onLoad plugins must return an object with "contents" as a string or Uint8Array'); + } + + if (!(typeof loader === "string")) { + throw new TypeError('onLoad plugins must return an object with "loader" as a string'); + } + + const chosenLoader = LOADERS_MAP[loader]; + if (chosenLoader === undefined) { + throw new TypeError(`Loader ${loader} is not supported.`); + } + + this.onLoadAsync(internalID, contents, chosenLoader); + return null; + } + } + + this.onLoadAsync(internalID, null, null); + return null; + })(internalID, path, namespace, loaderName); + + while ( + promiseResult && + $isPromise(promiseResult) && + ($getPromiseInternalField(promiseResult, $promiseFieldFlags) & $promiseStateMask) === $promiseStateFulfilled + ) { + promiseResult = $getPromiseInternalField(promiseResult, $promiseFieldReactionsOrResult); + } + + if (promiseResult && $isPromise(promiseResult)) { + promiseResult.then( + () => {}, + e => { + this.addError(internalID, e, 1); + }, + ); + } +} diff --git a/src/bun.js/builtins/ts/ByteLengthQueuingStrategy.ts b/src/bun.js/builtins/ts/ByteLengthQueuingStrategy.ts new file mode 100644 index 000000000..fc3f3d998 --- /dev/null +++ b/src/bun.js/builtins/ts/ByteLengthQueuingStrategy.ts @@ -0,0 +1,42 @@ +/* + * Copyright (C) 2015 Canon Inc. + * Copyright (C) 2015 Igalia S.L. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY APPLE INC. ``AS IS'' AND ANY + * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR + * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL APPLE INC. OR + * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, + * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, + * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR + * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY + * OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +$getter; +export function highWaterMark(this: any) { + const highWaterMark = $getByIdDirectPrivate(this, "highWaterMark"); + if (highWaterMark === undefined) + throw new TypeError("ByteLengthQueuingStrategy.highWaterMark getter called on incompatible |this| value."); + + return highWaterMark; +} + +export function size(chunk) { + return chunk.byteLength; +} + +export function initializeByteLengthQueuingStrategy(this: any, parameters: any) { + $putByIdDirectPrivate(this, "highWaterMark", $extractHighWaterMarkFromQueuingStrategyInit(parameters)); +} diff --git a/src/bun.js/builtins/ts/ConsoleObject.ts b/src/bun.js/builtins/ts/ConsoleObject.ts new file mode 100644 index 000000000..45746459a --- /dev/null +++ b/src/bun.js/builtins/ts/ConsoleObject.ts @@ -0,0 +1,84 @@ +$overriddenName = "[Symbol.asyncIterator]"; +export function asyncIterator(this: Console) { + const Iterator = async function* ConsoleAsyncIterator() { + const stream = Bun.stdin.stream(); + var reader = stream.getReader(); + + // TODO: use builtin + var decoder = new (globalThis as any).TextDecoder("utf-8", { fatal: false }) as TextDecoder; + var deferredError; + var indexOf = Bun.indexOfLine; + + try { + while (true) { + var done, value; + var pendingChunk; + const firstResult = reader.readMany(); + if ($isPromise(firstResult)) { + ({ done, value } = await firstResult); + } else { + ({ done, value } = firstResult); + } + + if (done) { + if (pendingChunk) { + yield decoder.decode(pendingChunk); + } + return; + } + + var actualChunk; + // we assume it was given line-by-line + for (const chunk of value) { + actualChunk = chunk; + if (pendingChunk) { + actualChunk = Buffer.concat([pendingChunk, chunk]); + pendingChunk = null; + } + + var last = 0; + // TODO: "\r", 0x4048, 0x4049, 0x404A, 0x404B, 0x404C, 0x404D, 0x404E, 0x404F + var i = indexOf(actualChunk, last); + while (i !== -1) { + yield decoder.decode(actualChunk.subarray(last, i)); + last = i + 1; + i = indexOf(actualChunk, last); + } + + pendingChunk = actualChunk.subarray(last); + } + } + } catch (e) { + deferredError = e; + } finally { + reader.releaseLock(); + + if (deferredError) { + throw deferredError; + } + } + }; + + const symbol = globalThis.Symbol.asyncIterator; + this[symbol] = Iterator; + return Iterator(); +} + +export function write(this: Console, input) { + var writer = $getByIdDirectPrivate(this, "writer"); + if (!writer) { + var length = $toLength(input?.length ?? 0); + writer = Bun.stdout.writer({ highWaterMark: length > 65536 ? length : 65536 }); + $putByIdDirectPrivate(this, "writer", writer); + } + + var wrote = writer.write(input); + + const count = $argumentCount(); + for (var i = 1; i < count; i++) { + wrote += writer.write($argument(i)); + } + + writer.flush(true); + return wrote; +} diff --git a/src/bun.js/builtins/ts/CountQueuingStrategy.ts b/src/bun.js/builtins/ts/CountQueuingStrategy.ts new file mode 100644 index 000000000..a72dca1ca --- /dev/null +++ b/src/bun.js/builtins/ts/CountQueuingStrategy.ts @@ -0,0 +1,42 @@ +/* + * Copyright (C) 2015 Canon Inc. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY APPLE INC. ``AS IS'' AND ANY + * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR + * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL APPLE INC. OR + * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, + * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, + * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR + * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY + * OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +$getter; +export function highWaterMark(this: any) { + const highWaterMark = $getByIdDirectPrivate(this, "highWaterMark"); + + if (highWaterMark === undefined) + throw new TypeError("CountQueuingStrategy.highWaterMark getter called on incompatible |this| value."); + + return highWaterMark; +} + +export function size() { + return 1; +} + +export function initializeCountQueuingStrategy(this: any, parameters: any) { + $putByIdDirectPrivate(this, "highWaterMark", $extractHighWaterMarkFromQueuingStrategyInit(parameters)); +} diff --git a/src/bun.js/builtins/ts/ImportMetaObject.ts b/src/bun.js/builtins/ts/ImportMetaObject.ts new file mode 100644 index 000000000..6c66075c6 --- /dev/null +++ b/src/bun.js/builtins/ts/ImportMetaObject.ts @@ -0,0 +1,152 @@ +type ImportMetaObject = Partial<ImportMeta>; + +export function loadCJS2ESM(this: ImportMetaObject, resolvedSpecifier: string) { + var loader = Loader; + var queue = $createFIFO(); + var key = resolvedSpecifier; + while (key) { + // we need to explicitly check because state could be $ModuleFetch + // it will throw this error if we do not: + // $throwTypeError("Requested module is already fetched."); + var entry = loader.registry.$get(key); + + if (!entry || !entry.state || entry.state <= $ModuleFetch) { + $fulfillModuleSync(key); + entry = loader.registry.$get(key)!; + } + + // entry.fetch is a Promise<SourceCode> + // SourceCode is not a string, it's a JSC::SourceCode object + // this pulls it out of the promise without delaying by a tick + // the promise is already fullfilled by $fullfillModuleSync + var sourceCodeObject = $getPromiseInternalField(entry.fetch, $promiseFieldReactionsOrResult); + // parseModule() returns a Promise, but the value is already fulfilled + // so we just pull it out of the promise here once again + // But, this time we do it a little more carefully because this is a JSC function call and not bun source code + var moduleRecordPromise = loader.parseModule(key, sourceCodeObject); + var module = entry.module; + if (!module && moduleRecordPromise && $isPromise(moduleRecordPromise)) { + var reactionsOrResult = $getPromiseInternalField(moduleRecordPromise, $promiseFieldReactionsOrResult); + var flags = $getPromiseInternalField(moduleRecordPromise, $promiseFieldFlags); + var state = flags & $promiseStateMask; + // this branch should never happen, but just to be safe + if (state === $promiseStatePending || (reactionsOrResult && $isPromise(reactionsOrResult))) { + throw new TypeError(`require() async module "${key}" is unsupported`); + } else if (state === $promiseStateRejected) { + // TODO: use SyntaxError but preserve the specifier + throw new TypeError(`${reactionsOrResult?.message ?? "An error occurred"} while parsing module \"${key}\"`); + } + entry.module = module = reactionsOrResult; + } else if (moduleRecordPromise && !module) { + entry.module = module = moduleRecordPromise as LoaderModule; + } + + // This is very similar to "requestInstantiate" in ModuleLoader.js in JavaScriptCore. + $setStateToMax(entry, $ModuleLink); + var dependenciesMap = module.dependenciesMap; + var requestedModules = loader.requestedModules(module); + var dependencies = $newArrayWithSize<string>(requestedModules.length); + for (var i = 0, length = requestedModules.length; i < length; ++i) { + var depName = requestedModules[i]; + // optimization: if it starts with a slash then it's an absolute path + // we don't need to run the resolver a 2nd time + var depKey = depName[0] === "/" ? depName : loader.resolve(depName, key); + var depEntry = loader.ensureRegistered(depKey); + if (depEntry.state < $ModuleLink) { + queue.push(depKey); + } + + $putByValDirect(dependencies, i, depEntry); + dependenciesMap.$set(depName, depEntry); + } + + entry.dependencies = dependencies; + // All dependencies resolved, set instantiate and satisfy field directly. + entry.instantiate = Promise.resolve(entry); + entry.satisfy = Promise.resolve(entry); + key = queue.shift(); + while (key && (loader.registry.$get(key)?.state ?? $ModuleFetch) >= $ModuleLink) { + key = queue.shift(); + } + } + + var linkAndEvaluateResult = loader.linkAndEvaluateModule(resolvedSpecifier, undefined); + if (linkAndEvaluateResult && $isPromise(linkAndEvaluateResult)) { + // if you use top-level await, or any dependencies use top-level await, then we throw here + // this means the module will still actually load eventually, but that's okay. + throw new TypeError(`require() async module \"${resolvedSpecifier}\" is unsupported`); + } + + return loader.registry.$get(resolvedSpecifier); +} + +export function requireESM(this: ImportMetaObject, resolved) { + var entry = Loader.registry.$get(resolved); + + if (!entry || !entry.evaluated) { + entry = $loadCJS2ESM(resolved); + } + + if (!entry || !entry.evaluated || !entry.module) { + throw new TypeError(`require() failed to evaluate module "${resolved}". This is an internal consistentency error.`); + } + var exports = Loader.getModuleNamespaceObject(entry.module); + var commonJS = exports.default; + var cjs = commonJS?.[$commonJSSymbol]; + if (cjs === 0) { + return commonJS; + } else if (cjs && $isCallable(commonJS)) { + return commonJS(); + } + + return exports; +} + +export function internalRequire(this: ImportMetaObject, resolved) { + var cached = $requireMap.$get(resolved); + const last5 = resolved.substring(resolved.length - 5); + if (cached) { + if (last5 === ".node") { + return cached.exports; + } + return cached; + } + + // TODO: remove this hardcoding + if (last5 === ".json") { + var fs = (globalThis[Symbol.for("_fs")] ||= Bun.fs()); + var exports = JSON.parse(fs.readFileSync(resolved, "utf8")); + $requireMap.$set(resolved, exports); + return exports; + } else if (last5 === ".node") { + var module = { exports: {} }; + process.dlopen(module, resolved); + $requireMap.$set(resolved, module); + return module.exports; + } else if (last5 === ".toml") { + var fs = (globalThis[Symbol.for("_fs")] ||= Bun.fs()); + var exports = Bun.TOML.parse(fs.readFileSync(resolved, "utf8")); + $requireMap.$set(resolved, exports); + return exports; + } else { + var exports = $requireESM(resolved); + $requireMap.$set(resolved, exports); + return exports; + } +} + +$sloppy; +export function require(this: ImportMetaObject, name) { + var from = this?.path ?? arguments.callee.path; + + if (typeof name !== "string") { + throw new TypeError("require(name) must be a string"); + } + + return $internalRequire($resolveSync(name, from)); +} + +$getter; +export function main(this: ImportMetaObject) { + return this.path === Bun.main; +} diff --git a/src/bun.js/builtins/ts/JSBufferConstructor.ts b/src/bun.js/builtins/ts/JSBufferConstructor.ts new file mode 100644 index 000000000..debc62d51 --- /dev/null +++ b/src/bun.js/builtins/ts/JSBufferConstructor.ts @@ -0,0 +1,67 @@ +export function from(items) { + if ($isUndefinedOrNull(items)) { + throw new TypeError( + "The first argument must be one of type string, Buffer, ArrayBuffer, Array, or Array-like Object.", + ); + } + + // TODO: figure out why private symbol not found + if ( + typeof items === "string" || + (typeof items === "object" && + ($isTypedArrayView(items) || + items instanceof ArrayBuffer || + items instanceof SharedArrayBuffer || + items instanceof String)) + ) { + switch ($argumentCount()) { + case 1: { + return new $Buffer(items); + } + case 2: { + return new $Buffer(items, $argument(1)); + } + default: { + return new $Buffer(items, $argument(1), $argument(2)); + } + } + } + + var arrayLike = $toObject( + items, + "The first argument must be of type string or an instance of Buffer, ArrayBuffer, or Array or an Array-like Object.", + ) as ArrayLike<any>; + + if (!$isJSArray(arrayLike)) { + const toPrimitive = $tryGetByIdWithWellKnownSymbol(items, "toPrimitive"); + + if (toPrimitive) { + const primitive = toPrimitive.$call(items, "string"); + + if (typeof primitive === "string") { + switch ($argumentCount()) { + case 1: { + return new $Buffer(primitive); + } + case 2: { + return new $Buffer(primitive, $argument(1)); + } + default: { + return new $Buffer(primitive, $argument(1), $argument(2)); + } + } + } + } + + if (!("length" in arrayLike) || $isCallable(arrayLike)) { + throw new TypeError( + "The first argument must be of type string or an instance of Buffer, ArrayBuffer, or Array or an Array-like Object.", + ); + } + } + + // Don't pass the second argument because Node's Buffer.from doesn't accept + // a function and Uint8Array.from requires it if it exists + // That means we cannot use $tailCallFowrardArguments here, sadly + return new $Buffer(Uint8Array.from(arrayLike).buffer); +} diff --git a/src/bun.js/builtins/ts/JSBufferPrototype.ts b/src/bun.js/builtins/ts/JSBufferPrototype.ts new file mode 100644 index 000000000..97b25b9b2 --- /dev/null +++ b/src/bun.js/builtins/ts/JSBufferPrototype.ts @@ -0,0 +1,495 @@ +// The fastest way as of April 2022 is to use DataView. +// DataView has intrinsics that cause inlining + +interface BufferExt extends Buffer { + $dataView?: DataView; + + toString(encoding?: BufferEncoding, start?: number, end?: number): string; + toString(offset: number, length: number, encoding?: BufferEncoding): string; +} + +export function setBigUint64(this: BufferExt, offset, value, le) { + return (this.$dataView ||= new DataView(this.buffer, this.byteOffset, this.byteLength)).setBigUint64( + offset, + value, + le, + ); +} +export function readInt8(this: BufferExt, offset) { + return (this.$dataView ||= new DataView(this.buffer, this.byteOffset, this.byteLength)).getInt8(offset); +} +export function readUInt8(this: BufferExt, offset) { + return (this.$dataView ||= new DataView(this.buffer, this.byteOffset, this.byteLength)).getUint8(offset); +} +export function readInt16LE(this: BufferExt, offset) { + return (this.$dataView ||= new DataView(this.buffer, this.byteOffset, this.byteLength)).getInt16(offset, true); +} +export function readInt16BE(this: BufferExt, offset) { + return (this.$dataView ||= new DataView(this.buffer, this.byteOffset, this.byteLength)).getInt16(offset, false); +} +export function readUInt16LE(this: BufferExt, offset) { + return (this.$dataView ||= new DataView(this.buffer, this.byteOffset, this.byteLength)).getUint16(offset, true); +} +export function readUInt16BE(this: BufferExt, offset) { + return (this.$dataView ||= new DataView(this.buffer, this.byteOffset, this.byteLength)).getUint16(offset, false); +} +export function readInt32LE(this: BufferExt, offset) { + return (this.$dataView ||= new DataView(this.buffer, this.byteOffset, this.byteLength)).getInt32(offset, true); +} +export function readInt32BE(this: BufferExt, offset) { + return (this.$dataView ||= new DataView(this.buffer, this.byteOffset, this.byteLength)).getInt32(offset, false); +} +export function readUInt32LE(this: BufferExt, offset) { + return (this.$dataView ||= new DataView(this.buffer, this.byteOffset, this.byteLength)).getUint32(offset, true); +} +export function readUInt32BE(this: BufferExt, offset) { + return (this.$dataView ||= new DataView(this.buffer, this.byteOffset, this.byteLength)).getUint32(offset, false); +} + +export function readIntLE(this: BufferExt, offset, byteLength) { + const view = (this.$dataView ||= new DataView(this.buffer, this.byteOffset, this.byteLength)); + switch (byteLength) { + case 1: { + return view.getInt8(offset); + } + case 2: { + return view.getInt16(offset, true); + } + case 3: { + const val = view.getUint16(offset, true) + view.getUint8(offset + 2) * 2 ** 16; + return val | ((val & (2 ** 23)) * 0x1fe); + } + case 4: { + return view.getInt32(offset, true); + } + case 5: { + const last = view.getUint8(offset + 4); + return (last | ((last & (2 ** 7)) * 0x1fffffe)) * 2 ** 32 + view.getUint32(offset, true); + } + case 6: { + const last = view.getUint16(offset + 4, true); + return (last | ((last & (2 ** 15)) * 0x1fffe)) * 2 ** 32 + view.getUint32(offset, true); + } + } + throw new RangeError("byteLength must be >= 1 and <= 6"); +} +export function readIntBE(this: BufferExt, offset, byteLength) { + const view = (this.$dataView ||= new DataView(this.buffer, this.byteOffset, this.byteLength)); + switch (byteLength) { + case 1: { + return view.getInt8(offset); + } + case 2: { + return view.getInt16(offset, false); + } + case 3: { + const val = view.getUint16(offset + 1, false) + view.getUint8(offset) * 2 ** 16; + return val | ((val & (2 ** 23)) * 0x1fe); + } + case 4: { + return view.getInt32(offset, false); + } + case 5: { + const last = view.getUint8(offset); + return (last | ((last & (2 ** 7)) * 0x1fffffe)) * 2 ** 32 + view.getUint32(offset + 1, false); + } + case 6: { + const last = view.getUint16(offset, false); + return (last | ((last & (2 ** 15)) * 0x1fffe)) * 2 ** 32 + view.getUint32(offset + 2, false); + } + } + throw new RangeError("byteLength must be >= 1 and <= 6"); +} +export function readUIntLE(this: BufferExt, offset, byteLength) { + const view = (this.$dataView ||= new DataView(this.buffer, this.byteOffset, this.byteLength)); + switch (byteLength) { + case 1: { + return view.getUint8(offset); + } + case 2: { + return view.getUint16(offset, true); + } + case 3: { + return view.getUint16(offset, true) + view.getUint8(offset + 2) * 2 ** 16; + } + case 4: { + return view.getUint32(offset, true); + } + case 5: { + return view.getUint8(offset + 4) * 2 ** 32 + view.getUint32(offset, true); + } + case 6: { + return view.getUint16(offset + 4, true) * 2 ** 32 + view.getUint32(offset, true); + } + } + throw new RangeError("byteLength must be >= 1 and <= 6"); +} +export function readUIntBE(this: BufferExt, offset, byteLength) { + const view = (this.$dataView ||= new DataView(this.buffer, this.byteOffset, this.byteLength)); + switch (byteLength) { + case 1: { + return view.getUint8(offset); + } + case 2: { + return view.getUint16(offset, false); + } + case 3: { + return view.getUint16(offset + 1, false) + view.getUint8(offset) * 2 ** 16; + } + case 4: { + return view.getUint32(offset, false); + } + case 5: { + const last = view.getUint8(offset); + return (last | ((last & (2 ** 7)) * 0x1fffffe)) * 2 ** 32 + view.getUint32(offset + 1, false); + } + case 6: { + const last = view.getUint16(offset, false); + return (last | ((last & (2 ** 15)) * 0x1fffe)) * 2 ** 32 + view.getUint32(offset + 2, false); + } + } + throw new RangeError("byteLength must be >= 1 and <= 6"); +} + +export function readFloatLE(this: BufferExt, offset) { + return (this.$dataView ||= new DataView(this.buffer, this.byteOffset, this.byteLength)).getFloat32(offset, true); +} +export function readFloatBE(this: BufferExt, offset) { + return (this.$dataView ||= new DataView(this.buffer, this.byteOffset, this.byteLength)).getFloat32(offset, false); +} +export function readDoubleLE(this: BufferExt, offset) { + return (this.$dataView ||= new DataView(this.buffer, this.byteOffset, this.byteLength)).getFloat64(offset, true); +} +export function readDoubleBE(this: BufferExt, offset) { + return (this.$dataView ||= new DataView(this.buffer, this.byteOffset, this.byteLength)).getFloat64(offset, false); +} +export function readBigInt64LE(this: BufferExt, offset) { + return (this.$dataView ||= new DataView(this.buffer, this.byteOffset, this.byteLength)).getBigInt64(offset, true); +} +export function readBigInt64BE(this: BufferExt, offset) { + return (this.$dataView ||= new DataView(this.buffer, this.byteOffset, this.byteLength)).getBigInt64(offset, false); +} +export function readBigUInt64LE(this: BufferExt, offset) { + return (this.$dataView ||= new DataView(this.buffer, this.byteOffset, this.byteLength)).getBigUint64(offset, true); +} +export function readBigUInt64BE(this: BufferExt, offset) { + return (this.$dataView ||= new DataView(this.buffer, this.byteOffset, this.byteLength)).getBigUint64(offset, false); +} + +export function writeInt8(this: BufferExt, value, offset) { + (this.$dataView ||= new DataView(this.buffer, this.byteOffset, this.byteLength)).setInt8(offset, value); + return offset + 1; +} +export function writeUInt8(this: BufferExt, value, offset) { + (this.$dataView ||= new DataView(this.buffer, this.byteOffset, this.byteLength)).setUint8(offset, value); + return offset + 1; +} +export function writeInt16LE(this: BufferExt, value, offset) { + (this.$dataView ||= new DataView(this.buffer, this.byteOffset, this.byteLength)).setInt16(offset, value, true); + return offset + 2; +} +export function writeInt16BE(this: BufferExt, value, offset) { + (this.$dataView ||= new DataView(this.buffer, this.byteOffset, this.byteLength)).setInt16(offset, value, false); + return offset + 2; +} +export function writeUInt16LE(this: BufferExt, value, offset) { + (this.$dataView ||= new DataView(this.buffer, this.byteOffset, this.byteLength)).setUint16(offset, value, true); + return offset + 2; +} +export function writeUInt16BE(this: BufferExt, value, offset) { + (this.$dataView ||= new DataView(this.buffer, this.byteOffset, this.byteLength)).setUint16(offset, value, false); + return offset + 2; +} +export function writeInt32LE(this: BufferExt, value, offset) { + (this.$dataView ||= new DataView(this.buffer, this.byteOffset, this.byteLength)).setInt32(offset, value, true); + return offset + 4; +} +export function writeInt32BE(this: BufferExt, value, offset) { + (this.$dataView ||= new DataView(this.buffer, this.byteOffset, this.byteLength)).setInt32(offset, value, false); + return offset + 4; +} +export function writeUInt32LE(this: BufferExt, value, offset) { + (this.$dataView ||= new DataView(this.buffer, this.byteOffset, this.byteLength)).setUint32(offset, value, true); + return offset + 4; +} +export function writeUInt32BE(this: BufferExt, value, offset) { + (this.$dataView ||= new DataView(this.buffer, this.byteOffset, this.byteLength)).setUint32(offset, value, false); + return offset + 4; +} + +export function writeIntLE(this: BufferExt, value, offset, byteLength) { + const view = (this.$dataView ||= new DataView(this.buffer, this.byteOffset, this.byteLength)); + switch (byteLength) { + case 1: { + view.setInt8(offset, value); + break; + } + case 2: { + view.setInt16(offset, value, true); + break; + } + case 3: { + view.setUint16(offset, value & 0xffff, true); + view.setInt8(offset + 2, Math.floor(value * 2 ** -16)); + break; + } + case 4: { + view.setInt32(offset, value, true); + break; + } + case 5: { + view.setUint32(offset, value | 0, true); + view.setInt8(offset + 4, Math.floor(value * 2 ** -32)); + break; + } + case 6: { + view.setUint32(offset, value | 0, true); + view.setInt16(offset + 4, Math.floor(value * 2 ** -32), true); + break; + } + default: { + throw new RangeError("byteLength must be >= 1 and <= 6"); + } + } + return offset + byteLength; +} +export function writeIntBE(this: BufferExt, value, offset, byteLength) { + const view = (this.$dataView ||= new DataView(this.buffer, this.byteOffset, this.byteLength)); + switch (byteLength) { + case 1: { + view.setInt8(offset, value); + break; + } + case 2: { + view.setInt16(offset, value, false); + break; + } + case 3: { + view.setUint16(offset + 1, value & 0xffff, false); + view.setInt8(offset, Math.floor(value * 2 ** -16)); + break; + } + case 4: { + view.setInt32(offset, value, false); + break; + } + case 5: { + view.setUint32(offset + 1, value | 0, false); + view.setInt8(offset, Math.floor(value * 2 ** -32)); + break; + } + case 6: { + view.setUint32(offset + 2, value | 0, false); + view.setInt16(offset, Math.floor(value * 2 ** -32), false); + break; + } + default: { + throw new RangeError("byteLength must be >= 1 and <= 6"); + } + } + return offset + byteLength; +} +export function writeUIntLE(this: BufferExt, value, offset, byteLength) { + const view = (this.$dataView ||= new DataView(this.buffer, this.byteOffset, this.byteLength)); + switch (byteLength) { + case 1: { + view.setUint8(offset, value); + break; + } + case 2: { + view.setUint16(offset, value, true); + break; + } + case 3: { + view.setUint16(offset, value & 0xffff, true); + view.setUint8(offset + 2, Math.floor(value * 2 ** -16)); + break; + } + case 4: { + view.setUint32(offset, value, true); + break; + } + case 5: { + view.setUint32(offset, value | 0, true); + view.setUint8(offset + 4, Math.floor(value * 2 ** -32)); + break; + } + case 6: { + view.setUint32(offset, value | 0, true); + view.setUint16(offset + 4, Math.floor(value * 2 ** -32), true); + break; + } + default: { + throw new RangeError("byteLength must be >= 1 and <= 6"); + } + } + return offset + byteLength; +} +export function writeUIntBE(this: BufferExt, value, offset, byteLength) { + const view = (this.$dataView ||= new DataView(this.buffer, this.byteOffset, this.byteLength)); + switch (byteLength) { + case 1: { + view.setUint8(offset, value); + break; + } + case 2: { + view.setUint16(offset, value, false); + break; + } + case 3: { + view.setUint16(offset + 1, value & 0xffff, false); + view.setUint8(offset, Math.floor(value * 2 ** -16)); + break; + } + case 4: { + view.setUint32(offset, value, false); + break; + } + case 5: { + view.setUint32(offset + 1, value | 0, false); + view.setUint8(offset, Math.floor(value * 2 ** -32)); + break; + } + case 6: { + view.setUint32(offset + 2, value | 0, false); + view.setUint16(offset, Math.floor(value * 2 ** -32), false); + break; + } + default: { + throw new RangeError("byteLength must be >= 1 and <= 6"); + } + } + return offset + byteLength; +} + +export function writeFloatLE(this: BufferExt, value, offset) { + (this.$dataView ||= new DataView(this.buffer, this.byteOffset, this.byteLength)).setFloat32(offset, value, true); + return offset + 4; +} + +export function writeFloatBE(this: BufferExt, value, offset) { + (this.$dataView ||= new DataView(this.buffer, this.byteOffset, this.byteLength)).setFloat32(offset, value, false); + return offset + 4; +} + +export function writeDoubleLE(this: BufferExt, value, offset) { + (this.$dataView ||= new DataView(this.buffer, this.byteOffset, this.byteLength)).setFloat64(offset, value, true); + return offset + 8; +} + +export function writeDoubleBE(this: BufferExt, value, offset) { + (this.$dataView ||= new DataView(this.buffer, this.byteOffset, this.byteLength)).setFloat64(offset, value, false); + return offset + 8; +} + +export function writeBigInt64LE(this: BufferExt, value, offset) { + (this.$dataView ||= new DataView(this.buffer, this.byteOffset, this.byteLength)).setBigInt64(offset, value, true); + return offset + 8; +} + +export function writeBigInt64BE(this: BufferExt, value, offset) { + (this.$dataView ||= new DataView(this.buffer, this.byteOffset, this.byteLength)).setBigInt64(offset, value, false); + return offset + 8; +} + +export function writeBigUInt64LE(this: BufferExt, value, offset) { + (this.$dataView ||= new DataView(this.buffer, this.byteOffset, this.byteLength)).setBigUint64(offset, value, true); + return offset + 8; +} + +export function writeBigUInt64BE(this: BufferExt, value, offset) { + (this.$dataView ||= new DataView(this.buffer, this.byteOffset, this.byteLength)).setBigUint64(offset, value, false); + return offset + 8; +} + +export function utf8Write(this: BufferExt, text, offset, length) { + return this.write(text, offset, length, "utf8"); +} +export function ucs2Write(this: BufferExt, text, offset, length) { + return this.write(text, offset, length, "ucs2"); +} +export function utf16leWrite(this: BufferExt, text, offset, length) { + return this.write(text, offset, length, "utf16le"); +} +export function latin1Write(this: BufferExt, text, offset, length) { + return this.write(text, offset, length, "latin1"); +} +export function asciiWrite(this: BufferExt, text, offset, length) { + return this.write(text, offset, length, "ascii"); +} +export function base64Write(this: BufferExt, text, offset, length) { + return this.write(text, offset, length, "base64"); +} +export function base64urlWrite(this: BufferExt, text, offset, length) { + return this.write(text, offset, length, "base64url"); +} +export function hexWrite(this: BufferExt, text, offset, length) { + return this.write(text, offset, length, "hex"); +} + +export function utf8Slice(this: BufferExt, offset, length) { + return this.toString(offset, length, "utf8"); +} +export function ucs2Slice(this: BufferExt, offset, length) { + return this.toString(offset, length, "ucs2"); +} +export function utf16leSlice(this: BufferExt, offset, length) { + return this.toString(offset, length, "utf16le"); +} +export function latin1Slice(this: BufferExt, offset, length) { + return this.toString(offset, length, "latin1"); +} +export function asciiSlice(this: BufferExt, offset, length) { + return this.toString(offset, length, "ascii"); +} +export function base64Slice(this: BufferExt, offset, length) { + return this.toString(offset, length, "base64"); +} +export function base64urlSlice(this: BufferExt, offset, length) { + return this.toString(offset, length, "base64url"); +} +export function hexSlice(this: BufferExt, offset, length) { + return this.toString(offset, length, "hex"); +} + +export function toJSON(this: BufferExt) { + const type = "Buffer"; + const data = Array.from(this); + return { type, data }; +} + +export function slice(this: BufferExt, start, end) { + var { buffer, byteOffset, byteLength } = this; + + function adjustOffset(offset, length) { + // Use Math.trunc() to convert offset to an integer value that can be larger + // than an Int32. Hence, don't use offset | 0 or similar techniques. + offset = $trunc(offset); + if (offset === 0 || isNaN(offset)) { + return 0; + } else if (offset < 0) { + offset += length; + return offset > 0 ? offset : 0; + } else { + return offset < length ? offset : length; + } + } + + var start_ = adjustOffset(start, byteLength); + var end_ = end !== undefined ? adjustOffset(end, byteLength) : byteLength; + return new $Buffer(buffer, byteOffset + start_, end_ > start_ ? end_ - start_ : 0); +} + +$getter; +export function parent(this: BufferExt) { + return $isObject(this) && this instanceof $Buffer ? this.buffer : undefined; +} + +$getter; +export function offset(this: BufferExt) { + return $isObject(this) && this instanceof $Buffer ? this.byteOffset : undefined; +} + +export function inspect(this: BufferExt, recurseTimes, ctx) { + return Bun.inspect(this); +} diff --git a/src/bun.js/builtins/ts/ProcessObjectInternals.ts b/src/bun.js/builtins/ts/ProcessObjectInternals.ts new file mode 100644 index 000000000..8b24e68ba --- /dev/null +++ b/src/bun.js/builtins/ts/ProcessObjectInternals.ts @@ -0,0 +1,676 @@ +/* + * Copyright 2023 Codeblog Corp. All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY APPLE INC. ``AS IS'' AND ANY + * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR + * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL APPLE INC. OR + * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, + * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, + * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR + * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY + * OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +export function binding(bindingName) { + if (bindingName !== "constants") + throw new TypeError( + "process.binding() is not supported in Bun. If that breaks something, please file an issue and include a reproducible code sample.", + ); + + var cache = globalThis.Symbol.for("process.bindings.constants"); + var constants = globalThis[cache]; + if (!constants) { + // TODO: make this less hacky. + // This calls require("node:fs").constants + // except, outside an ESM module. + const { constants: fs } = globalThis[globalThis.Symbol.for("Bun.lazy")]("createImportMeta", "node:process").require( + "node:fs", + ); + constants = { + fs, + zlib: {}, + crypto: {}, + os: Bun._Os().constants, + }; + globalThis[cache] = constants; + } + return constants; +} + +export function getStdioWriteStream(fd_, rawRequire) { + var module = { path: "node:process", require: rawRequire }; + var require = path => module.require(path); + + function createStdioWriteStream(fd_) { + var { Duplex, eos, destroy } = require("node:stream"); + var StdioWriteStream = class StdioWriteStream extends Duplex { + #writeStream; + #readStream; + + #readable = true; + #writable = true; + #fdPath; + + #onClose; + #onDrain; + #onFinish; + #onReadable; + #isTTY; + + get isTTY() { + return (this.#isTTY ??= require("node:tty").isatty(fd_)); + } + + get fd() { + return fd_; + } + + constructor(fd) { + super({ readable: true, writable: true }); + this.#fdPath = `/dev/fd/${fd}`; + } + + #onFinished(err) { + const cb = this.#onClose; + this.#onClose = null; + + if (cb) { + cb(err); + } else if (err) { + this.destroy(err); + } else if (!this.#readable && !this.#writable) { + this.destroy(); + } + } + + _destroy(err, callback) { + if (!err && this.#onClose !== null) { + var AbortError = class AbortError extends Error { + code: string; + name: string; + constructor(message = "The operation was aborted", options = void 0) { + if (options !== void 0 && typeof options !== "object") { + throw new Error(`Invalid AbortError options:\n\n${JSON.stringify(options, null, 2)}`); + } + super(message, options); + this.code = "ABORT_ERR"; + this.name = "AbortError"; + } + }; + err = new AbortError(); + } + + this.#onDrain = null; + this.#onFinish = null; + if (this.#onClose === null) { + callback(err); + } else { + this.#onClose = callback; + if (this.#writeStream) destroy(this.#writeStream, err); + if (this.#readStream) destroy(this.#readStream, err); + } + } + + _write(chunk, encoding, callback) { + if (!this.#writeStream) { + var { createWriteStream } = require("node:fs"); + var stream = (this.#writeStream = createWriteStream(this.#fdPath)); + + stream.on("finish", () => { + if (this.#onFinish) { + const cb = this.#onFinish; + this.#onFinish = null; + cb(); + } + }); + + stream.on("drain", () => { + if (this.#onDrain) { + const cb = this.#onDrain; + this.#onDrain = null; + cb(); + } + }); + + eos(stream, err => { + this.#writable = false; + if (err) { + destroy(stream, err); + } + this.#onFinished(err); + }); + } + if (stream.write(chunk, encoding)) { + callback(); + } else { + this.#onDrain = callback; + } + } + + _final(callback) { + this.#writeStream && this.#writeStream.end(); + this.#onFinish = callback; + } + + #loadReadStream() { + var { createReadStream } = require("node:fs"); + + var readStream = (this.#readStream = createReadStream(this.#fdPath)); + + readStream.on("readable", () => { + if (this.#onReadable) { + const cb = this.#onReadable; + this.#onReadable = null; + cb(); + } else { + this.read(); + } + }); + + readStream.on("end", () => { + this.push(null); + }); + + eos(readStream, err => { + this.#readable = false; + if (err) { + destroy(readStream, err); + } + this.#onFinished(err); + }); + return readStream; + } + + _read() { + var stream = this.#readStream; + if (!stream) { + stream = this.#loadReadStream(); + } + + while (true) { + const buf = stream.read(); + if (buf === null || !this.push(buf)) { + return; + } + } + } + }; + return new StdioWriteStream(fd_); + } + + var { EventEmitter } = require("node:events"); + + function isFastEncoding(encoding) { + if (!encoding) return true; + + var normalied = encoding.toLowerCase(); + return normalied === "utf8" || normalied === "utf-8" || normalied === "buffer" || normalied === "binary"; + } + + var readline; + + var FastStdioWriteStream = class StdioWriteStream extends EventEmitter { + #fd; + #innerStream; + #writer; + #isTTY; + + bytesWritten = 0; + + setDefaultEncoding(encoding) { + if (this.#innerStream || !isFastEncoding(encoding)) { + this.#ensureInnerStream(); + return this.#innerStream.setDefaultEncoding(encoding); + } + } + + #createWriter() { + switch (this.#fd) { + case 1: { + var writer = Bun.stdout.writer({ highWaterMark: 0 }); + writer.unref(); + return writer; + } + + case 2: { + var writer = Bun.stderr.writer({ highWaterMark: 0 }); + writer.unref(); + return writer; + } + default: { + throw new Error("Unsupported writer"); + } + } + } + + #getWriter() { + return (this.#writer ??= this.#createWriter()); + } + + constructor(fd_) { + super(); + this.#fd = fd_; + } + + get fd() { + return this.#fd; + } + + get isTTY() { + return (this.#isTTY ??= require("node:tty").isatty(this.#fd)); + } + + cursorTo(x, y, callback) { + return (readline ??= require("readline")).cursorTo(this, x, y, callback); + } + + moveCursor(dx, dy, callback) { + return (readline ??= require("readline")).moveCursor(this, dx, dy, callback); + } + + clearLine(dir, callback) { + return (readline ??= require("readline")).clearLine(this, dir, callback); + } + + clearScreenDown(callback) { + return (readline ??= require("readline")).clearScreenDown(this, callback); + } + + // TODO: once implemented this.columns and this.rows should be uncommented + // getWindowSize() { + // return [this.columns, this.rows]; + // } + + ref() { + this.#getWriter().ref(); + } + + unref() { + this.#getWriter().unref(); + } + + on(event, listener) { + if (event === "close" || event === "finish") { + this.#ensureInnerStream(); + return this.#innerStream.on(event, listener); + } + + if (event === "drain") { + return super.on("drain", listener); + } + + if (event === "error") { + return super.on("error", listener); + } + + return super.on(event, listener); + } + + get _writableState() { + this.#ensureInnerStream(); + return this.#innerStream._writableState; + } + + get _readableState() { + this.#ensureInnerStream(); + return this.#innerStream._readableState; + } + + pipe(destination) { + this.#ensureInnerStream(); + return this.#innerStream.pipe(destination); + } + + unpipe(destination) { + this.#ensureInnerStream(); + return this.#innerStream.unpipe(destination); + } + + #ensureInnerStream() { + if (this.#innerStream) return; + this.#innerStream = createStdioWriteStream(this.#fd); + const events = this.eventNames(); + for (const event of events) { + this.#innerStream.on(event, (...args) => { + this.emit(event, ...args); + }); + } + } + + #write1(chunk) { + var writer = this.#getWriter(); + const writeResult = writer.write(chunk); + this.bytesWritten += writeResult; + const flushResult = writer.flush(false); + return !!(writeResult || flushResult); + } + + #writeWithEncoding(chunk, encoding) { + if (!isFastEncoding(encoding)) { + this.#ensureInnerStream(); + return this.#innerStream.write(chunk, encoding); + } + + return this.#write1(chunk); + } + + #performCallback(cb, err?: any) { + if (err) { + this.emit("error", err); + } + + try { + cb(err ? err : null); + } catch (err2) { + this.emit("error", err2); + } + } + + #writeWithCallbackAndEncoding(chunk, encoding, callback) { + if (!isFastEncoding(encoding)) { + this.#ensureInnerStream(); + return this.#innerStream.write(chunk, encoding, callback); + } + + var writer = this.#getWriter(); + const writeResult = writer.write(chunk); + const flushResult = writer.flush(true); + if (flushResult?.then) { + flushResult.then( + () => { + this.#performCallback(callback); + this.emit("drain"); + }, + err => this.#performCallback(callback, err), + ); + return false; + } + + queueMicrotask(() => { + this.#performCallback(callback); + }); + + return !!(writeResult || flushResult); + } + + write(chunk, encoding, callback) { + const result = this._write(chunk, encoding, callback); + + if (result) { + this.emit("drain"); + } + + return result; + } + + get hasColors() { + return Bun.tty[this.#fd].hasColors; + } + + _write(chunk, encoding, callback) { + var inner = this.#innerStream; + if (inner) { + return inner.write(chunk, encoding, callback); + } + + switch (arguments.length) { + case 0: { + var error = new Error("Invalid arguments"); + error.code = "ERR_INVALID_ARG_TYPE"; + throw error; + } + case 1: { + return this.#write1(chunk); + } + case 2: { + if (typeof encoding === "function") { + return this.#writeWithCallbackAndEncoding(chunk, "", encoding); + } else if (typeof encoding === "string") { + return this.#writeWithEncoding(chunk, encoding); + } + } + default: { + if ( + (typeof encoding !== "undefined" && typeof encoding !== "string") || + (typeof callback !== "undefined" && typeof callback !== "function") + ) { + var error = new Error("Invalid arguments"); + error.code = "ERR_INVALID_ARG_TYPE"; + throw error; + } + + if (typeof callback === "undefined") { + return this.#writeWithEncoding(chunk, encoding); + } + + return this.#writeWithCallbackAndEncoding(chunk, encoding, callback); + } + } + } + + destroy() { + return this; + } + + end() { + return this; + } + }; + + return new FastStdioWriteStream(fd_); +} + +export function getStdinStream(fd_, rawRequire, Bun) { + var module = { path: "node:process", require: rawRequire }; + var require = path => module.require(path); + + var { Duplex, eos, destroy } = require("node:stream"); + + var StdinStream = class StdinStream extends Duplex { + #reader; + // TODO: investigate https://github.com/oven-sh/bun/issues/1607 + + #readRef; + #writeStream; + + #readable = true; + #unrefOnRead = false; + #writable = true; + + #onFinish; + #onClose; + #onDrain; + + get isTTY() { + return require("tty").isatty(fd_); + } + + get fd() { + return fd_; + } + + constructor() { + super({ readable: true, writable: true }); + } + + #onFinished(err?) { + const cb = this.#onClose; + this.#onClose = null; + + if (cb) { + cb(err); + } else if (err) { + this.destroy(err); + } else if (!this.#readable && !this.#writable) { + this.destroy(); + } + } + + _destroy(err, callback) { + if (!err && this.#onClose !== null) { + var AbortError = class AbortError extends Error { + constructor(message = "The operation was aborted", options = void 0) { + if (options !== void 0 && typeof options !== "object") { + throw new Error(`Invalid AbortError options:\n\n${JSON.stringify(options, null, 2)}`); + } + super(message, options); + this.code = "ABORT_ERR"; + this.name = "AbortError"; + } + }; + err = new AbortError(); + } + + if (this.#onClose === null) { + callback(err); + } else { + this.#onClose = callback; + if (this.#writeStream) destroy(this.#writeStream, err); + } + } + + setRawMode(mode) {} + + on(name, callback) { + // Streams don't generally required to present any data when only + // `readable` events are present, i.e. `readableFlowing === false` + // + // However, Node.js has a this quirk whereby `process.stdin.read()` + // blocks under TTY mode, thus looping `.read()` in this particular + // case would not result in truncation. + // + // Therefore the following hack is only specific to `process.stdin` + // and does not apply to the underlying Stream implementation. + if (name === "readable") { + this.ref(); + this.#unrefOnRead = true; + } + return super.on(name, callback); + } + + pause() { + this.unref(); + return super.pause(); + } + + resume() { + this.ref(); + return super.resume(); + } + + ref() { + this.#reader ??= Bun.stdin.stream().getReader(); + this.#readRef ??= setInterval(() => {}, 1 << 30); + } + + unref() { + if (this.#readRef) { + clearInterval(this.#readRef); + this.#readRef = null; + } + } + + async #readInternal() { + try { + var done, value; + const read = this.#reader.readMany(); + + // read same-tick if possible + if (!read?.then) { + ({ done, value } = read); + } else { + ({ done, value } = await read); + } + + if (!done) { + this.push(value[0]); + + // shouldn't actually happen, but just in case + const length = value.length; + for (let i = 1; i < length; i++) { + this.push(value[i]); + } + } else { + this.push(null); + this.pause(); + this.#readable = false; + this.#onFinished(); + } + } catch (err) { + this.#readable = false; + this.#onFinished(err); + } + } + + _read(size) { + if (this.#unrefOnRead) { + this.unref(); + this.#unrefOnRead = false; + } + this.#readInternal(); + } + + #constructWriteStream() { + var { createWriteStream } = require("node:fs"); + var writeStream = (this.#writeStream = createWriteStream("/dev/fd/0")); + + writeStream.on("finish", () => { + if (this.#onFinish) { + const cb = this.#onFinish; + this.#onFinish = null; + cb(); + } + }); + + writeStream.on("drain", () => { + if (this.#onDrain) { + const cb = this.#onDrain; + this.#onDrain = null; + cb(); + } + }); + + eos(writeStream, err => { + this.#writable = false; + if (err) { + destroy(writeStream, err); + } + this.#onFinished(err); + }); + + return writeStream; + } + + _write(chunk, encoding, callback) { + var writeStream = this.#writeStream; + if (!writeStream) { + writeStream = this.#constructWriteStream(); + } + + if (writeStream.write(chunk, encoding)) { + callback(); + } else { + this.#onDrain = callback; + } + } + + _final(callback) { + this.#writeStream.end(); + this.#onFinish = (...args) => callback(...args); + } + }; + + return new StdinStream(); +} diff --git a/src/bun.js/builtins/ts/ReadableByteStreamController.ts b/src/bun.js/builtins/ts/ReadableByteStreamController.ts new file mode 100644 index 000000000..888f241bc --- /dev/null +++ b/src/bun.js/builtins/ts/ReadableByteStreamController.ts @@ -0,0 +1,93 @@ +/* + * Copyright (C) 2016 Canon Inc. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY APPLE INC. ``AS IS'' AND ANY + * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR + * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL APPLE INC. OR + * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, + * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, + * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR + * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY + * OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +export function initializeReadableByteStreamController(this, stream, underlyingByteSource, highWaterMark) { + if (arguments.length !== 4 && arguments[3] !== $isReadableStream) + throw new TypeError("ReadableByteStreamController constructor should not be called directly"); + + return $privateInitializeReadableByteStreamController.$call(this, stream, underlyingByteSource, highWaterMark); +} + +export function enqueue(this, chunk) { + if (!$isReadableByteStreamController(this)) throw $makeThisTypeError("ReadableByteStreamController", "enqueue"); + + if ($getByIdDirectPrivate(this, "closeRequested")) + throw new TypeError("ReadableByteStreamController is requested to close"); + + if ($getByIdDirectPrivate($getByIdDirectPrivate(this, "controlledReadableStream"), "state") !== $streamReadable) + throw new TypeError("ReadableStream is not readable"); + + if (!$isObject(chunk) || !ArrayBuffer.$isView(chunk)) throw new TypeError("Provided chunk is not a TypedArray"); + + return $readableByteStreamControllerEnqueue(this, chunk); +} + +export function error(this, error) { + if (!$isReadableByteStreamController(this)) throw $makeThisTypeError("ReadableByteStreamController", "error"); + + if ($getByIdDirectPrivate($getByIdDirectPrivate(this, "controlledReadableStream"), "state") !== $streamReadable) + throw new TypeError("ReadableStream is not readable"); + + $readableByteStreamControllerError(this, error); +} + +export function close(this) { + if (!$isReadableByteStreamController(this)) throw $makeThisTypeError("ReadableByteStreamController", "close"); + + if ($getByIdDirectPrivate(this, "closeRequested")) throw new TypeError("Close has already been requested"); + + if ($getByIdDirectPrivate($getByIdDirectPrivate(this, "controlledReadableStream"), "state") !== $streamReadable) + throw new TypeError("ReadableStream is not readable"); + + $readableByteStreamControllerClose(this); +} + +$getter; +export function byobRequest(this) { + if (!$isReadableByteStreamController(this)) throw $makeGetterTypeError("ReadableByteStreamController", "byobRequest"); + + var request = $getByIdDirectPrivate(this, "byobRequest"); + if (request === undefined) { + var pending = $getByIdDirectPrivate(this, "pendingPullIntos"); + const firstDescriptor = pending.peek(); + if (firstDescriptor) { + const view = new Uint8Array( + firstDescriptor.buffer, + firstDescriptor.byteOffset + firstDescriptor.bytesFilled, + firstDescriptor.byteLength - firstDescriptor.bytesFilled, + ); + $putByIdDirectPrivate(this, "byobRequest", new ReadableStreamBYOBRequest(this, view, $isReadableStream)); + } + } + + return $getByIdDirectPrivate(this, "byobRequest"); +} + +$getter; +export function desiredSize(this) { + if (!$isReadableByteStreamController(this)) throw $makeGetterTypeError("ReadableByteStreamController", "desiredSize"); + + return $readableByteStreamControllerGetDesiredSize(this); +} diff --git a/src/bun.js/builtins/ts/ReadableByteStreamInternals.ts b/src/bun.js/builtins/ts/ReadableByteStreamInternals.ts new file mode 100644 index 000000000..f44c385b4 --- /dev/null +++ b/src/bun.js/builtins/ts/ReadableByteStreamInternals.ts @@ -0,0 +1,656 @@ +/* + * Copyright (C) 2016 Canon Inc. All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY APPLE INC. ``AS IS'' AND ANY + * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR + * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL APPLE INC. OR + * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, + * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, + * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR + * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY + * OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ +// @internal + +export function privateInitializeReadableByteStreamController(this, stream, underlyingByteSource, highWaterMark) { + if (!$isReadableStream(stream)) throw new TypeError("ReadableByteStreamController needs a ReadableStream"); + + // readableStreamController is initialized with null value. + if ($getByIdDirectPrivate(stream, "readableStreamController") !== null) + throw new TypeError("ReadableStream already has a controller"); + + $putByIdDirectPrivate(this, "controlledReadableStream", stream); + $putByIdDirectPrivate(this, "underlyingByteSource", underlyingByteSource); + $putByIdDirectPrivate(this, "pullAgain", false); + $putByIdDirectPrivate(this, "pulling", false); + $readableByteStreamControllerClearPendingPullIntos(this); + $putByIdDirectPrivate(this, "queue", $newQueue()); + $putByIdDirectPrivate(this, "started", 0); + $putByIdDirectPrivate(this, "closeRequested", false); + + let hwm = $toNumber(highWaterMark); + if (isNaN(hwm) || hwm < 0) throw new RangeError("highWaterMark value is negative or not a number"); + $putByIdDirectPrivate(this, "strategyHWM", hwm); + + let autoAllocateChunkSize = underlyingByteSource.autoAllocateChunkSize; + if (autoAllocateChunkSize !== undefined) { + autoAllocateChunkSize = $toNumber(autoAllocateChunkSize); + if (autoAllocateChunkSize <= 0 || autoAllocateChunkSize === Infinity || autoAllocateChunkSize === -Infinity) + throw new RangeError("autoAllocateChunkSize value is negative or equal to positive or negative infinity"); + } + $putByIdDirectPrivate(this, "autoAllocateChunkSize", autoAllocateChunkSize); + $putByIdDirectPrivate(this, "pendingPullIntos", $createFIFO()); + + const controller = this; + $promiseInvokeOrNoopNoCatch($getByIdDirectPrivate(controller, "underlyingByteSource"), "start", [controller]).$then( + () => { + $putByIdDirectPrivate(controller, "started", 1); + $assert(!$getByIdDirectPrivate(controller, "pulling")); + $assert(!$getByIdDirectPrivate(controller, "pullAgain")); + $readableByteStreamControllerCallPullIfNeeded(controller); + }, + error => { + if ($getByIdDirectPrivate(stream, "state") === $streamReadable) + $readableByteStreamControllerError(controller, error); + }, + ); + + $putByIdDirectPrivate(this, "cancel", $readableByteStreamControllerCancel); + $putByIdDirectPrivate(this, "pull", $readableByteStreamControllerPull); + + return this; +} + +export function readableStreamByteStreamControllerStart(this, controller) { + $putByIdDirectPrivate(controller, "start", undefined); +} + +export function privateInitializeReadableStreamBYOBRequest(this, controller, view) { + $putByIdDirectPrivate(this, "associatedReadableByteStreamController", controller); + $putByIdDirectPrivate(this, "view", view); +} + +export function isReadableByteStreamController(controller) { + // Same test mechanism as in isReadableStreamDefaultController (ReadableStreamInternals.js). + // See corresponding function for explanations. + return $isObject(controller) && !!$getByIdDirectPrivate(controller, "underlyingByteSource"); +} + +export function isReadableStreamBYOBRequest(byobRequest) { + // Same test mechanism as in isReadableStreamDefaultController (ReadableStreamInternals.js). + // See corresponding function for explanations. + return $isObject(byobRequest) && !!$getByIdDirectPrivate(byobRequest, "associatedReadableByteStreamController"); +} + +export function isReadableStreamBYOBReader(reader) { + // Spec tells to return true only if reader has a readIntoRequests internal slot. + // However, since it is a private slot, it cannot be checked using hasOwnProperty(). + // Since readIntoRequests is initialized with an empty array, the following test is ok. + return $isObject(reader) && !!$getByIdDirectPrivate(reader, "readIntoRequests"); +} + +export function readableByteStreamControllerCancel(controller, reason) { + var pendingPullIntos = $getByIdDirectPrivate(controller, "pendingPullIntos"); + var first = pendingPullIntos.peek(); + if (first) first.bytesFilled = 0; + + $putByIdDirectPrivate(controller, "queue", $newQueue()); + return $promiseInvokeOrNoop($getByIdDirectPrivate(controller, "underlyingByteSource"), "cancel", [reason]); +} + +export function readableByteStreamControllerError(controller, e) { + $assert( + $getByIdDirectPrivate($getByIdDirectPrivate(controller, "controlledReadableStream"), "state") === $streamReadable, + ); + $readableByteStreamControllerClearPendingPullIntos(controller); + $putByIdDirectPrivate(controller, "queue", $newQueue()); + $readableStreamError($getByIdDirectPrivate(controller, "controlledReadableStream"), e); +} + +export function readableByteStreamControllerClose(controller) { + $assert(!$getByIdDirectPrivate(controller, "closeRequested")); + $assert( + $getByIdDirectPrivate($getByIdDirectPrivate(controller, "controlledReadableStream"), "state") === $streamReadable, + ); + + if ($getByIdDirectPrivate(controller, "queue").size > 0) { + $putByIdDirectPrivate(controller, "closeRequested", true); + return; + } + + var first = $getByIdDirectPrivate(controller, "pendingPullIntos")?.peek(); + if (first) { + if (first.bytesFilled > 0) { + const e = $makeTypeError("Close requested while there remain pending bytes"); + $readableByteStreamControllerError(controller, e); + throw e; + } + } + + $readableStreamClose($getByIdDirectPrivate(controller, "controlledReadableStream")); +} + +export function readableByteStreamControllerClearPendingPullIntos(controller) { + $readableByteStreamControllerInvalidateBYOBRequest(controller); + var existing = $getByIdDirectPrivate(controller, "pendingPullIntos"); + if (existing !== undefined) { + existing.clear(); + } else { + $putByIdDirectPrivate(controller, "pendingPullIntos", $createFIFO()); + } +} + +export function readableByteStreamControllerGetDesiredSize(controller) { + const stream = $getByIdDirectPrivate(controller, "controlledReadableStream"); + const state = $getByIdDirectPrivate(stream, "state"); + + if (state === $streamErrored) return null; + if (state === $streamClosed) return 0; + + return $getByIdDirectPrivate(controller, "strategyHWM") - $getByIdDirectPrivate(controller, "queue").size; +} + +export function readableStreamHasBYOBReader(stream) { + const reader = $getByIdDirectPrivate(stream, "reader"); + return reader !== undefined && $isReadableStreamBYOBReader(reader); +} + +export function readableStreamHasDefaultReader(stream) { + const reader = $getByIdDirectPrivate(stream, "reader"); + return reader !== undefined && $isReadableStreamDefaultReader(reader); +} + +export function readableByteStreamControllerHandleQueueDrain(controller) { + $assert( + $getByIdDirectPrivate($getByIdDirectPrivate(controller, "controlledReadableStream"), "state") === $streamReadable, + ); + if (!$getByIdDirectPrivate(controller, "queue").size && $getByIdDirectPrivate(controller, "closeRequested")) + $readableStreamClose($getByIdDirectPrivate(controller, "controlledReadableStream")); + else $readableByteStreamControllerCallPullIfNeeded(controller); +} + +export function readableByteStreamControllerPull(controller) { + const stream = $getByIdDirectPrivate(controller, "controlledReadableStream"); + $assert($readableStreamHasDefaultReader(stream)); + if ($getByIdDirectPrivate(controller, "queue").content?.isNotEmpty()) { + const entry = $getByIdDirectPrivate(controller, "queue").content.shift(); + $getByIdDirectPrivate(controller, "queue").size -= entry.byteLength; + $readableByteStreamControllerHandleQueueDrain(controller); + let view; + try { + view = new Uint8Array(entry.buffer, entry.byteOffset, entry.byteLength); + } catch (error) { + return Promise.$reject(error); + } + return $createFulfilledPromise({ value: view, done: false }); + } + + if ($getByIdDirectPrivate(controller, "autoAllocateChunkSize") !== undefined) { + let buffer; + try { + buffer = $createUninitializedArrayBuffer($getByIdDirectPrivate(controller, "autoAllocateChunkSize")); + } catch (error) { + return Promise.$reject(error); + } + const pullIntoDescriptor = { + buffer, + byteOffset: 0, + byteLength: $getByIdDirectPrivate(controller, "autoAllocateChunkSize"), + bytesFilled: 0, + elementSize: 1, + ctor: Uint8Array, + readerType: "default", + }; + $getByIdDirectPrivate(controller, "pendingPullIntos").push(pullIntoDescriptor); + } + + const promise = $readableStreamAddReadRequest(stream); + $readableByteStreamControllerCallPullIfNeeded(controller); + return promise; +} + +export function readableByteStreamControllerShouldCallPull(controller) { + const stream = $getByIdDirectPrivate(controller, "controlledReadableStream"); + + if ($getByIdDirectPrivate(stream, "state") !== $streamReadable) return false; + if ($getByIdDirectPrivate(controller, "closeRequested")) return false; + if (!($getByIdDirectPrivate(controller, "started") > 0)) return false; + const reader = $getByIdDirectPrivate(stream, "reader"); + + if ( + reader && + ($getByIdDirectPrivate(reader, "readRequests")?.isNotEmpty() || !!$getByIdDirectPrivate(reader, "bunNativePtr")) + ) + return true; + if ( + $readableStreamHasBYOBReader(stream) && + $getByIdDirectPrivate($getByIdDirectPrivate(stream, "reader"), "readIntoRequests")?.isNotEmpty() + ) + return true; + if ($readableByteStreamControllerGetDesiredSize(controller) > 0) return true; + return false; +} + +export function readableByteStreamControllerCallPullIfNeeded(controller) { + if (!$readableByteStreamControllerShouldCallPull(controller)) return; + + if ($getByIdDirectPrivate(controller, "pulling")) { + $putByIdDirectPrivate(controller, "pullAgain", true); + return; + } + + $assert(!$getByIdDirectPrivate(controller, "pullAgain")); + $putByIdDirectPrivate(controller, "pulling", true); + $promiseInvokeOrNoop($getByIdDirectPrivate(controller, "underlyingByteSource"), "pull", [controller]).$then( + () => { + $putByIdDirectPrivate(controller, "pulling", false); + if ($getByIdDirectPrivate(controller, "pullAgain")) { + $putByIdDirectPrivate(controller, "pullAgain", false); + $readableByteStreamControllerCallPullIfNeeded(controller); + } + }, + error => { + if ( + $getByIdDirectPrivate($getByIdDirectPrivate(controller, "controlledReadableStream"), "state") === + $streamReadable + ) + $readableByteStreamControllerError(controller, error); + }, + ); +} + +export function transferBufferToCurrentRealm(buffer) { + // FIXME: Determine what should be done here exactly (what is already existing in current + // codebase and what has to be added). According to spec, Transfer operation should be + // performed in order to transfer buffer to current realm. For the moment, simply return + // received buffer. + return buffer; +} + +export function readableStreamReaderKind(reader) { + if (!!$getByIdDirectPrivate(reader, "readRequests")) return $getByIdDirectPrivate(reader, "bunNativePtr") ? 3 : 1; + + if (!!$getByIdDirectPrivate(reader, "readIntoRequests")) return 2; + + return 0; +} + +export function readableByteStreamControllerEnqueue(controller, chunk) { + const stream = $getByIdDirectPrivate(controller, "controlledReadableStream"); + $assert(!$getByIdDirectPrivate(controller, "closeRequested")); + $assert($getByIdDirectPrivate(stream, "state") === $streamReadable); + + switch ( + $getByIdDirectPrivate(stream, "reader") ? $readableStreamReaderKind($getByIdDirectPrivate(stream, "reader")) : 0 + ) { + /* default reader */ + case 1: { + if (!$getByIdDirectPrivate($getByIdDirectPrivate(stream, "reader"), "readRequests")?.isNotEmpty()) + $readableByteStreamControllerEnqueueChunk( + controller, + $transferBufferToCurrentRealm(chunk.buffer), + chunk.byteOffset, + chunk.byteLength, + ); + else { + $assert(!$getByIdDirectPrivate(controller, "queue").content.size()); + const transferredView = + chunk.constructor === Uint8Array ? chunk : new Uint8Array(chunk.buffer, chunk.byteOffset, chunk.byteLength); + $readableStreamFulfillReadRequest(stream, transferredView, false); + } + break; + } + + /* BYOB */ + case 2: { + $readableByteStreamControllerEnqueueChunk( + controller, + $transferBufferToCurrentRealm(chunk.buffer), + chunk.byteOffset, + chunk.byteLength, + ); + $readableByteStreamControllerProcessPullDescriptors(controller); + break; + } + + /* NativeReader */ + case 3: { + // reader.$enqueueNative($getByIdDirectPrivate(reader, "bunNativePtr"), chunk); + + break; + } + + default: { + $assert(!$isReadableStreamLocked(stream)); + $readableByteStreamControllerEnqueueChunk( + controller, + $transferBufferToCurrentRealm(chunk.buffer), + chunk.byteOffset, + chunk.byteLength, + ); + break; + } + } +} + +// Spec name: readableByteStreamControllerEnqueueChunkToQueue. +export function readableByteStreamControllerEnqueueChunk(controller, buffer, byteOffset, byteLength) { + $getByIdDirectPrivate(controller, "queue").content.push({ + buffer: buffer, + byteOffset: byteOffset, + byteLength: byteLength, + }); + $getByIdDirectPrivate(controller, "queue").size += byteLength; +} + +export function readableByteStreamControllerRespondWithNewView(controller, view) { + $assert($getByIdDirectPrivate(controller, "pendingPullIntos").isNotEmpty()); + + let firstDescriptor = $getByIdDirectPrivate(controller, "pendingPullIntos").peek(); + + if (firstDescriptor.byteOffset + firstDescriptor.bytesFilled !== view.byteOffset) + throw new RangeError("Invalid value for view.byteOffset"); + + if (firstDescriptor.byteLength !== view.byteLength) throw new RangeError("Invalid value for view.byteLength"); + + firstDescriptor.buffer = view.buffer; + $readableByteStreamControllerRespondInternal(controller, view.byteLength); +} + +export function readableByteStreamControllerRespond(controller, bytesWritten) { + bytesWritten = $toNumber(bytesWritten); + + if (isNaN(bytesWritten) || bytesWritten === Infinity || bytesWritten < 0) + throw new RangeError("bytesWritten has an incorrect value"); + + $assert($getByIdDirectPrivate(controller, "pendingPullIntos").isNotEmpty()); + + $readableByteStreamControllerRespondInternal(controller, bytesWritten); +} + +export function readableByteStreamControllerRespondInternal(controller, bytesWritten) { + let firstDescriptor = $getByIdDirectPrivate(controller, "pendingPullIntos").peek(); + let stream = $getByIdDirectPrivate(controller, "controlledReadableStream"); + + if ($getByIdDirectPrivate(stream, "state") === $streamClosed) { + if (bytesWritten !== 0) throw new TypeError("bytesWritten is different from 0 even though stream is closed"); + $readableByteStreamControllerRespondInClosedState(controller, firstDescriptor); + } else { + $assert($getByIdDirectPrivate(stream, "state") === $streamReadable); + $readableByteStreamControllerRespondInReadableState(controller, bytesWritten, firstDescriptor); + } +} + +export function readableByteStreamControllerRespondInReadableState(controller, bytesWritten, pullIntoDescriptor) { + if (pullIntoDescriptor.bytesFilled + bytesWritten > pullIntoDescriptor.byteLength) + throw new RangeError("bytesWritten value is too great"); + + $assert( + $getByIdDirectPrivate(controller, "pendingPullIntos").isEmpty() || + $getByIdDirectPrivate(controller, "pendingPullIntos").peek() === pullIntoDescriptor, + ); + $readableByteStreamControllerInvalidateBYOBRequest(controller); + pullIntoDescriptor.bytesFilled += bytesWritten; + + if (pullIntoDescriptor.bytesFilled < pullIntoDescriptor.elementSize) return; + + $readableByteStreamControllerShiftPendingDescriptor(controller); + const remainderSize = pullIntoDescriptor.bytesFilled % pullIntoDescriptor.elementSize; + + if (remainderSize > 0) { + const end = pullIntoDescriptor.byteOffset + pullIntoDescriptor.bytesFilled; + const remainder = $cloneArrayBuffer(pullIntoDescriptor.buffer, end - remainderSize, remainderSize); + $readableByteStreamControllerEnqueueChunk(controller, remainder, 0, remainder.byteLength); + } + + pullIntoDescriptor.buffer = $transferBufferToCurrentRealm(pullIntoDescriptor.buffer); + pullIntoDescriptor.bytesFilled -= remainderSize; + $readableByteStreamControllerCommitDescriptor( + $getByIdDirectPrivate(controller, "controlledReadableStream"), + pullIntoDescriptor, + ); + $readableByteStreamControllerProcessPullDescriptors(controller); +} + +export function readableByteStreamControllerRespondInClosedState(controller, firstDescriptor) { + firstDescriptor.buffer = $transferBufferToCurrentRealm(firstDescriptor.buffer); + $assert(firstDescriptor.bytesFilled === 0); + + if ($readableStreamHasBYOBReader($getByIdDirectPrivate(controller, "controlledReadableStream"))) { + while ( + $getByIdDirectPrivate( + $getByIdDirectPrivate($getByIdDirectPrivate(controller, "controlledReadableStream"), "reader"), + "readIntoRequests", + )?.isNotEmpty() + ) { + let pullIntoDescriptor = $readableByteStreamControllerShiftPendingDescriptor(controller); + $readableByteStreamControllerCommitDescriptor( + $getByIdDirectPrivate(controller, "controlledReadableStream"), + pullIntoDescriptor, + ); + } + } +} + +// Spec name: readableByteStreamControllerProcessPullIntoDescriptorsUsingQueue (shortened for readability). +export function readableByteStreamControllerProcessPullDescriptors(controller) { + $assert(!$getByIdDirectPrivate(controller, "closeRequested")); + while ($getByIdDirectPrivate(controller, "pendingPullIntos").isNotEmpty()) { + if ($getByIdDirectPrivate(controller, "queue").size === 0) return; + let pullIntoDescriptor = $getByIdDirectPrivate(controller, "pendingPullIntos").peek(); + if ($readableByteStreamControllerFillDescriptorFromQueue(controller, pullIntoDescriptor)) { + $readableByteStreamControllerShiftPendingDescriptor(controller); + $readableByteStreamControllerCommitDescriptor( + $getByIdDirectPrivate(controller, "controlledReadableStream"), + pullIntoDescriptor, + ); + } + } +} + +// Spec name: readableByteStreamControllerFillPullIntoDescriptorFromQueue (shortened for readability). +export function readableByteStreamControllerFillDescriptorFromQueue(controller, pullIntoDescriptor) { + const currentAlignedBytes = + pullIntoDescriptor.bytesFilled - (pullIntoDescriptor.bytesFilled % pullIntoDescriptor.elementSize); + const maxBytesToCopy = + $getByIdDirectPrivate(controller, "queue").size < pullIntoDescriptor.byteLength - pullIntoDescriptor.bytesFilled + ? $getByIdDirectPrivate(controller, "queue").size + : pullIntoDescriptor.byteLength - pullIntoDescriptor.bytesFilled; + const maxBytesFilled = pullIntoDescriptor.bytesFilled + maxBytesToCopy; + const maxAlignedBytes = maxBytesFilled - (maxBytesFilled % pullIntoDescriptor.elementSize); + let totalBytesToCopyRemaining = maxBytesToCopy; + let ready = false; + + if (maxAlignedBytes > currentAlignedBytes) { + totalBytesToCopyRemaining = maxAlignedBytes - pullIntoDescriptor.bytesFilled; + ready = true; + } + + while (totalBytesToCopyRemaining > 0) { + let headOfQueue = $getByIdDirectPrivate(controller, "queue").content.peek(); + const bytesToCopy = + totalBytesToCopyRemaining < headOfQueue.byteLength ? totalBytesToCopyRemaining : headOfQueue.byteLength; + // Copy appropriate part of pullIntoDescriptor.buffer to headOfQueue.buffer. + // Remark: this implementation is not completely aligned on the definition of CopyDataBlockBytes + // operation of ECMAScript (the case of Shared Data Block is not considered here, but it doesn't seem to be an issue). + const destStart = pullIntoDescriptor.byteOffset + pullIntoDescriptor.bytesFilled; + // FIXME: As indicated in comments of bug 172717, access to set is not safe. However, using prototype.$set.$call does + // not work ($set is undefined). A safe way to do that is needed. + new Uint8Array(pullIntoDescriptor.buffer).set( + new Uint8Array(headOfQueue.buffer, headOfQueue.byteOffset, bytesToCopy), + destStart, + ); + + if (headOfQueue.byteLength === bytesToCopy) $getByIdDirectPrivate(controller, "queue").content.shift(); + else { + headOfQueue.byteOffset += bytesToCopy; + headOfQueue.byteLength -= bytesToCopy; + } + + $getByIdDirectPrivate(controller, "queue").size -= bytesToCopy; + $assert( + $getByIdDirectPrivate(controller, "pendingPullIntos").isEmpty() || + $getByIdDirectPrivate(controller, "pendingPullIntos").peek() === pullIntoDescriptor, + ); + $readableByteStreamControllerInvalidateBYOBRequest(controller); + pullIntoDescriptor.bytesFilled += bytesToCopy; + totalBytesToCopyRemaining -= bytesToCopy; + } + + if (!ready) { + $assert($getByIdDirectPrivate(controller, "queue").size === 0); + $assert(pullIntoDescriptor.bytesFilled > 0); + $assert(pullIntoDescriptor.bytesFilled < pullIntoDescriptor.elementSize); + } + + return ready; +} + +// Spec name: readableByteStreamControllerShiftPendingPullInto (renamed for consistency). +export function readableByteStreamControllerShiftPendingDescriptor(controller) { + let descriptor = $getByIdDirectPrivate(controller, "pendingPullIntos").shift(); + $readableByteStreamControllerInvalidateBYOBRequest(controller); + return descriptor; +} + +export function readableByteStreamControllerInvalidateBYOBRequest(controller) { + if ($getByIdDirectPrivate(controller, "byobRequest") === undefined) return; + const byobRequest = $getByIdDirectPrivate(controller, "byobRequest"); + $putByIdDirectPrivate(byobRequest, "associatedReadableByteStreamController", undefined); + $putByIdDirectPrivate(byobRequest, "view", undefined); + $putByIdDirectPrivate(controller, "byobRequest", undefined); +} + +// Spec name: readableByteStreamControllerCommitPullIntoDescriptor (shortened for readability). +export function readableByteStreamControllerCommitDescriptor(stream, pullIntoDescriptor) { + $assert($getByIdDirectPrivate(stream, "state") !== $streamErrored); + let done = false; + if ($getByIdDirectPrivate(stream, "state") === $streamClosed) { + $assert(!pullIntoDescriptor.bytesFilled); + done = true; + } + let filledView = $readableByteStreamControllerConvertDescriptor(pullIntoDescriptor); + if (pullIntoDescriptor.readerType === "default") $readableStreamFulfillReadRequest(stream, filledView, done); + else { + $assert(pullIntoDescriptor.readerType === "byob"); + $readableStreamFulfillReadIntoRequest(stream, filledView, done); + } +} + +// Spec name: readableByteStreamControllerConvertPullIntoDescriptor (shortened for readability). +export function readableByteStreamControllerConvertDescriptor(pullIntoDescriptor) { + $assert(pullIntoDescriptor.bytesFilled <= pullIntoDescriptor.byteLength); + $assert(pullIntoDescriptor.bytesFilled % pullIntoDescriptor.elementSize === 0); + + return new pullIntoDescriptor.ctor( + pullIntoDescriptor.buffer, + pullIntoDescriptor.byteOffset, + pullIntoDescriptor.bytesFilled / pullIntoDescriptor.elementSize, + ); +} + +export function readableStreamFulfillReadIntoRequest(stream, chunk, done) { + const readIntoRequest = $getByIdDirectPrivate($getByIdDirectPrivate(stream, "reader"), "readIntoRequests").shift(); + $fulfillPromise(readIntoRequest, { value: chunk, done: done }); +} + +export function readableStreamBYOBReaderRead(reader, view) { + const stream = $getByIdDirectPrivate(reader, "ownerReadableStream"); + $assert(!!stream); + + $putByIdDirectPrivate(stream, "disturbed", true); + if ($getByIdDirectPrivate(stream, "state") === $streamErrored) + return Promise.$reject($getByIdDirectPrivate(stream, "storedError")); + + return $readableByteStreamControllerPullInto($getByIdDirectPrivate(stream, "readableStreamController"), view); +} + +export function readableByteStreamControllerPullInto(controller, view) { + const stream = $getByIdDirectPrivate(controller, "controlledReadableStream"); + let elementSize = 1; + // Spec describes that in the case where view is a TypedArray, elementSize + // should be set to the size of an element (e.g. 2 for UInt16Array). For + // DataView, BYTES_PER_ELEMENT is undefined, contrary to the same property + // for TypedArrays. + // FIXME: Getting BYTES_PER_ELEMENT like this is not safe (property is read-only + // but can be modified if the prototype is redefined). A safe way of getting + // it would be to determine which type of ArrayBufferView view is an instance + // of based on typed arrays private variables. However, this is not possible due + // to bug 167697, which prevents access to typed arrays through their private + // names unless public name has already been met before. + if (view.BYTES_PER_ELEMENT !== undefined) elementSize = view.BYTES_PER_ELEMENT; + + // FIXME: Getting constructor like this is not safe. A safe way of getting + // it would be to determine which type of ArrayBufferView view is an instance + // of, and to assign appropriate constructor based on this (e.g. ctor = + // $Uint8Array). However, this is not possible due to bug 167697, which + // prevents access to typed arrays through their private names unless public + // name has already been met before. + const ctor = view.constructor; + + const pullIntoDescriptor = { + buffer: view.buffer, + byteOffset: view.byteOffset, + byteLength: view.byteLength, + bytesFilled: 0, + elementSize, + ctor, + readerType: "byob", + }; + + var pending = $getByIdDirectPrivate(controller, "pendingPullIntos"); + if (pending?.isNotEmpty()) { + pullIntoDescriptor.buffer = $transferBufferToCurrentRealm(pullIntoDescriptor.buffer); + pending.push(pullIntoDescriptor); + return $readableStreamAddReadIntoRequest(stream); + } + + if ($getByIdDirectPrivate(stream, "state") === $streamClosed) { + const emptyView = new ctor(pullIntoDescriptor.buffer, pullIntoDescriptor.byteOffset, 0); + return $createFulfilledPromise({ value: emptyView, done: true }); + } + + if ($getByIdDirectPrivate(controller, "queue").size > 0) { + if ($readableByteStreamControllerFillDescriptorFromQueue(controller, pullIntoDescriptor)) { + const filledView = $readableByteStreamControllerConvertDescriptor(pullIntoDescriptor); + $readableByteStreamControllerHandleQueueDrain(controller); + return $createFulfilledPromise({ value: filledView, done: false }); + } + if ($getByIdDirectPrivate(controller, "closeRequested")) { + const e = $makeTypeError("Closing stream has been requested"); + $readableByteStreamControllerError(controller, e); + return Promise.$reject(e); + } + } + + pullIntoDescriptor.buffer = $transferBufferToCurrentRealm(pullIntoDescriptor.buffer); + $getByIdDirectPrivate(controller, "pendingPullIntos").push(pullIntoDescriptor); + const promise = $readableStreamAddReadIntoRequest(stream); + $readableByteStreamControllerCallPullIfNeeded(controller); + return promise; +} + +export function readableStreamAddReadIntoRequest(stream) { + $assert($isReadableStreamBYOBReader($getByIdDirectPrivate(stream, "reader"))); + $assert( + $getByIdDirectPrivate(stream, "state") === $streamReadable || + $getByIdDirectPrivate(stream, "state") === $streamClosed, + ); + + const readRequest = $newPromise(); + $getByIdDirectPrivate($getByIdDirectPrivate(stream, "reader"), "readIntoRequests").push(readRequest); + + return readRequest; +} diff --git a/src/bun.js/builtins/ts/ReadableStream.ts b/src/bun.js/builtins/ts/ReadableStream.ts new file mode 100644 index 000000000..22453403d --- /dev/null +++ b/src/bun.js/builtins/ts/ReadableStream.ts @@ -0,0 +1,416 @@ +/* + * Copyright (C) 2015 Canon Inc. + * Copyright (C) 2015 Igalia. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY APPLE INC. ``AS IS'' AND ANY + * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR + * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL APPLE INC. OR + * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, + * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, + * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR + * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY + * OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +export function initializeReadableStream(this: any, underlyingSource: UnderlyingSource, strategy: any) { + if (underlyingSource === undefined) + underlyingSource = { $bunNativeType: 0, $bunNativePtr: 0, $lazy: false } as UnderlyingSource; + if (strategy === undefined) strategy = {}; + + if (!$isObject(underlyingSource)) throw new TypeError("ReadableStream constructor takes an object as first argument"); + + if (strategy !== undefined && !$isObject(strategy)) + throw new TypeError("ReadableStream constructor takes an object as second argument, if any"); + + $putByIdDirectPrivate(this, "state", $streamReadable); + + $putByIdDirectPrivate(this, "reader", undefined); + + $putByIdDirectPrivate(this, "storedError", undefined); + + $putByIdDirectPrivate(this, "disturbed", false); + + // Initialized with null value to enable distinction with undefined case. + $putByIdDirectPrivate(this, "readableStreamController", null); + $putByIdDirectPrivate(this, "bunNativeType", $getByIdDirectPrivate(underlyingSource, "bunNativeType") ?? 0); + $putByIdDirectPrivate(this, "bunNativePtr", $getByIdDirectPrivate(underlyingSource, "bunNativePtr") ?? 0); + + const isDirect = underlyingSource.type === "direct"; + // direct streams are always lazy + const isUnderlyingSourceLazy = !!underlyingSource.$lazy; + const isLazy = isDirect || isUnderlyingSourceLazy; + + // FIXME: We should introduce https://streams.spec.whatwg.org/#create-readable-stream. + // For now, we emulate this with underlyingSource with private properties. + if ($getByIdDirectPrivate(underlyingSource, "pull") !== undefined && !isLazy) { + const size = $getByIdDirectPrivate(strategy, "size"); + const highWaterMark = $getByIdDirectPrivate(strategy, "highWaterMark"); + $putByIdDirectPrivate(this, "highWaterMark", highWaterMark); + $putByIdDirectPrivate(this, "underlyingSource", undefined); + $setupReadableStreamDefaultController( + this, + underlyingSource, + size, + highWaterMark !== undefined ? highWaterMark : 1, + $getByIdDirectPrivate(underlyingSource, "start"), + $getByIdDirectPrivate(underlyingSource, "pull"), + $getByIdDirectPrivate(underlyingSource, "cancel"), + ); + + return this; + } + if (isDirect) { + $putByIdDirectPrivate(this, "underlyingSource", underlyingSource); + $putByIdDirectPrivate(this, "highWaterMark", $getByIdDirectPrivate(strategy, "highWaterMark")); + $putByIdDirectPrivate(this, "start", () => $createReadableStreamController(this, underlyingSource, strategy)); + } else if (isLazy) { + const autoAllocateChunkSize = underlyingSource.autoAllocateChunkSize; + $putByIdDirectPrivate(this, "highWaterMark", undefined); + $putByIdDirectPrivate(this, "underlyingSource", undefined); + $putByIdDirectPrivate( + this, + "highWaterMark", + autoAllocateChunkSize || $getByIdDirectPrivate(strategy, "highWaterMark"), + ); + + $putByIdDirectPrivate(this, "start", () => { + const instance = $lazyLoadStream(this, autoAllocateChunkSize); + if (instance) { + $createReadableStreamController(this, instance, strategy); + } + }); + } else { + $putByIdDirectPrivate(this, "underlyingSource", undefined); + $putByIdDirectPrivate(this, "highWaterMark", $getByIdDirectPrivate(strategy, "highWaterMark")); + $putByIdDirectPrivate(this, "start", undefined); + $createReadableStreamController(this, underlyingSource, strategy); + } + + return this; +} + +$linkTimeConstant; +export function readableStreamToArray(stream) { + // this is a direct stream + var underlyingSource = $getByIdDirectPrivate(stream, "underlyingSource"); + if (underlyingSource !== undefined) { + return $readableStreamToArrayDirect(stream, underlyingSource); + } + + return $readableStreamIntoArray(stream); +} + +$linkTimeConstant; +export function readableStreamToText(stream) { + // this is a direct stream + var underlyingSource = $getByIdDirectPrivate(stream, "underlyingSource"); + if (underlyingSource !== undefined) { + return $readableStreamToTextDirect(stream, underlyingSource); + } + + return $readableStreamIntoText(stream); +} + +$linkTimeConstant; +export function readableStreamToArrayBuffer(stream) { + // this is a direct stream + var underlyingSource = $getByIdDirectPrivate(stream, "underlyingSource"); + + if (underlyingSource !== undefined) { + return $readableStreamToArrayBufferDirect(stream, underlyingSource); + } + + return (Bun.readableStreamToArray(stream) as Promise<any>).$then(Bun.concatArrayBuffers); +} + +$linkTimeConstant; +export function readableStreamToJSON(stream) { + return Bun.readableStreamToText(stream).$then(globalThis.JSON.parse); +} + +$linkTimeConstant; +export function readableStreamToBlob(stream) { + return Promise.resolve(Bun.readableStreamToArray(stream)).$then(array => new Blob(array)); +} + +$linkTimeConstant; +export function consumeReadableStream(nativePtr, nativeType, inputStream) { + const symbol = globalThis.Symbol.for("Bun.consumeReadableStreamPrototype"); + var cached = globalThis[symbol]; + if (!cached) { + cached = globalThis[symbol] = []; + } + var Prototype = cached[nativeType]; + if (Prototype === undefined) { + var [doRead, doError, doReadMany, doClose, onClose, deinit] = + globalThis[globalThis.Symbol.for("Bun.lazy")](nativeType); + + Prototype = class NativeReadableStreamSink { + handleError: any; + handleClosed: any; + processResult: any; + + constructor(reader, ptr) { + this.#ptr = ptr; + this.#reader = reader; + this.#didClose = false; + + this.handleError = this._handleError.bind(this); + this.handleClosed = this._handleClosed.bind(this); + this.processResult = this._processResult.bind(this); + + reader.closed.then(this.handleClosed, this.handleError); + } + + _handleClosed() { + if (this.#didClose) return; + this.#didClose = true; + var ptr = this.#ptr; + this.#ptr = 0; + doClose(ptr); + deinit(ptr); + } + + _handleError(error) { + if (this.#didClose) return; + this.#didClose = true; + var ptr = this.#ptr; + this.#ptr = 0; + doError(ptr, error); + deinit(ptr); + } + + #ptr; + #didClose = false; + #reader; + + _handleReadMany({ value, done, size }) { + if (done) { + this.handleClosed(); + return; + } + + if (this.#didClose) return; + + doReadMany(this.#ptr, value, done, size); + } + + read() { + if (!this.#ptr) return $throwTypeError("ReadableStreamSink is already closed"); + + return this.processResult(this.#reader.read()); + } + + _processResult(result) { + if (result && $isPromise(result)) { + const flags = $getPromiseInternalField(result, $promiseFieldFlags); + if (flags & $promiseStateFulfilled) { + const fulfilledValue = $getPromiseInternalField(result, $promiseFieldReactionsOrResult); + if (fulfilledValue) { + result = fulfilledValue; + } + } + } + + if (result && $isPromise(result)) { + result.then(this.processResult, this.handleError); + return null; + } + + if (result.done) { + this.handleClosed(); + return 0; + } else if (result.value) { + return result.value; + } else { + return -1; + } + } + + readMany() { + if (!this.#ptr) return $throwTypeError("ReadableStreamSink is already closed"); + return this.processResult(this.#reader.readMany()); + } + }; + + const minlength = nativeType + 1; + if (cached.length < minlength) { + cached.length = minlength; + } + $putByValDirect(cached, nativeType, Prototype); + } + + if ($isReadableStreamLocked(inputStream)) { + throw new TypeError("Cannot start reading from a locked stream"); + } + + return new Prototype(inputStream.getReader(), nativePtr); +} + +$linkTimeConstant; +export function createEmptyReadableStream() { + var stream = new ReadableStream({ + pull() {}, + } as any); + $readableStreamClose(stream); + return stream; +} + +$linkTimeConstant; +export function createNativeReadableStream(nativePtr, nativeType, autoAllocateChunkSize) { + return new ReadableStream({ + $lazy: true, + $bunNativeType: nativeType, + $bunNativePtr: nativePtr, + autoAllocateChunkSize: autoAllocateChunkSize, + }); +} + +export function cancel(this, reason) { + if (!$isReadableStream(this)) return Promise.$reject($makeThisTypeError("ReadableStream", "cancel")); + + if ($isReadableStreamLocked(this)) return Promise.$reject($makeTypeError("ReadableStream is locked")); + + return $readableStreamCancel(this, reason); +} + +export function getReader(this, options) { + if (!$isReadableStream(this)) throw $makeThisTypeError("ReadableStream", "getReader"); + + const mode = $toDictionary(options, {}, "ReadableStream.getReader takes an object as first argument").mode; + if (mode === undefined) { + var start_ = $getByIdDirectPrivate(this, "start"); + if (start_) { + $putByIdDirectPrivate(this, "start", undefined); + start_(); + } + + return new ReadableStreamDefaultReader(this); + } + // String conversion is required by spec, hence double equals. + if (mode == "byob") { + return new ReadableStreamBYOBReader(this); + } + + throw new TypeError("Invalid mode is specified"); +} + +export function pipeThrough(this, streams, options) { + const transforms = streams; + + const readable = transforms["readable"]; + if (!$isReadableStream(readable)) throw $makeTypeError("readable should be ReadableStream"); + + const writable = transforms["writable"]; + const internalWritable = $getInternalWritableStream(writable); + if (!$isWritableStream(internalWritable)) throw $makeTypeError("writable should be WritableStream"); + + let preventClose = false; + let preventAbort = false; + let preventCancel = false; + let signal; + if (!$isUndefinedOrNull(options)) { + if (!$isObject(options)) throw $makeTypeError("options must be an object"); + + preventAbort = !!options["preventAbort"]; + preventCancel = !!options["preventCancel"]; + preventClose = !!options["preventClose"]; + + signal = options["signal"]; + if (signal !== undefined && !$isAbortSignal(signal)) throw $makeTypeError("options.signal must be AbortSignal"); + } + + if (!$isReadableStream(this)) throw $makeThisTypeError("ReadableStream", "pipeThrough"); + + if ($isReadableStreamLocked(this)) throw $makeTypeError("ReadableStream is locked"); + + if ($isWritableStreamLocked(internalWritable)) throw $makeTypeError("WritableStream is locked"); + + $readableStreamPipeToWritableStream(this, internalWritable, preventClose, preventAbort, preventCancel, signal); + + return readable; +} + +export function pipeTo(this, destination) { + if (!$isReadableStream(this)) return Promise.$reject($makeThisTypeError("ReadableStream", "pipeTo")); + + if ($isReadableStreamLocked(this)) return Promise.$reject($makeTypeError("ReadableStream is locked")); + + // FIXME: https://bugs.webkit.org/show_bug.cgi?id=159869. + // Built-in generator should be able to parse function signature to compute the function length correctly. + let options = $argument(1); + + let preventClose = false; + let preventAbort = false; + let preventCancel = false; + let signal; + if (!$isUndefinedOrNull(options)) { + if (!$isObject(options)) return Promise.$reject($makeTypeError("options must be an object")); + + try { + preventAbort = !!options["preventAbort"]; + preventCancel = !!options["preventCancel"]; + preventClose = !!options["preventClose"]; + + signal = options["signal"]; + } catch (e) { + return Promise.$reject(e); + } + + if (signal !== undefined && !$isAbortSignal(signal)) + return Promise.$reject(new TypeError("options.signal must be AbortSignal")); + } + + const internalDestination = $getInternalWritableStream(destination); + if (!$isWritableStream(internalDestination)) + return Promise.$reject(new TypeError("ReadableStream pipeTo requires a WritableStream")); + + if ($isWritableStreamLocked(internalDestination)) return Promise.$reject(new TypeError("WritableStream is locked")); + + return $readableStreamPipeToWritableStream( + this, + internalDestination, + preventClose, + preventAbort, + preventCancel, + signal, + ); +} + +export function tee(this) { + if (!$isReadableStream(this)) throw $makeThisTypeError("ReadableStream", "tee"); + + return $readableStreamTee(this, false); +} + +$getter; +export function locked(this) { + if (!$isReadableStream(this)) throw $makeGetterTypeError("ReadableStream", "locked"); + + return $isReadableStreamLocked(this); +} + +export function values(this, options) { + var prototype = ReadableStream.prototype; + $readableStreamDefineLazyIterators(prototype); + return prototype.values.$call(this, options); +} + +$linkTimeConstant; +export function lazyAsyncIterator(this) { + var prototype = ReadableStream.prototype; + $readableStreamDefineLazyIterators(prototype); + return prototype[globalThis.Symbol.asyncIterator].$call(this); +} diff --git a/src/bun.js/builtins/ts/ReadableStreamBYOBReader.ts b/src/bun.js/builtins/ts/ReadableStreamBYOBReader.ts new file mode 100644 index 000000000..5ebfddb19 --- /dev/null +++ b/src/bun.js/builtins/ts/ReadableStreamBYOBReader.ts @@ -0,0 +1,80 @@ +/* + * Copyright (C) 2017 Canon Inc. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY CANON INC. AND ITS CONTRIBUTORS "AS IS" AND ANY + * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED + * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE + * DISCLAIMED. IN NO EVENT SHALL CANON INC. AND ITS CONTRIBUTORS BE LIABLE FOR + * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL + * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR + * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER + * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, + * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +export function initializeReadableStreamBYOBReader(this, stream) { + if (!$isReadableStream(stream)) throw new TypeError("ReadableStreamBYOBReader needs a ReadableStream"); + if (!$isReadableByteStreamController($getByIdDirectPrivate(stream, "readableStreamController"))) + throw new TypeError("ReadableStreamBYOBReader needs a ReadableByteStreamController"); + if ($isReadableStreamLocked(stream)) throw new TypeError("ReadableStream is locked"); + + $readableStreamReaderGenericInitialize(this, stream); + $putByIdDirectPrivate(this, "readIntoRequests", $createFIFO()); + + return this; +} + +export function cancel(this, reason) { + if (!$isReadableStreamBYOBReader(this)) + return Promise.$reject($makeThisTypeError("ReadableStreamBYOBReader", "cancel")); + + if (!$getByIdDirectPrivate(this, "ownerReadableStream")) + return Promise.$reject($makeTypeError("cancel() called on a reader owned by no readable stream")); + + return $readableStreamReaderGenericCancel(this, reason); +} + +export function read(this, view: DataView) { + if (!$isReadableStreamBYOBReader(this)) + return Promise.$reject($makeThisTypeError("ReadableStreamBYOBReader", "read")); + + if (!$getByIdDirectPrivate(this, "ownerReadableStream")) + return Promise.$reject($makeTypeError("read() called on a reader owned by no readable stream")); + + if (!$isObject(view)) return Promise.$reject($makeTypeError("Provided view is not an object")); + + if (!ArrayBuffer.$isView(view)) return Promise.$reject($makeTypeError("Provided view is not an ArrayBufferView")); + + if (view.byteLength === 0) return Promise.$reject($makeTypeError("Provided view cannot have a 0 byteLength")); + + return $readableStreamBYOBReaderRead(this, view); +} + +export function releaseLock(this) { + if (!$isReadableStreamBYOBReader(this)) throw $makeThisTypeError("ReadableStreamBYOBReader", "releaseLock"); + + if (!$getByIdDirectPrivate(this, "ownerReadableStream")) return; + + if ($getByIdDirectPrivate(this, "readIntoRequests")?.isNotEmpty()) + throw new TypeError("There are still pending read requests, cannot release the lock"); + + $readableStreamReaderGenericRelease(this); +} + +$getter; +export function closed(this) { + if (!$isReadableStreamBYOBReader(this)) + return Promise.$reject($makeGetterTypeError("ReadableStreamBYOBReader", "closed")); + + return $getByIdDirectPrivate(this, "closedPromiseCapability").$promise; +} diff --git a/src/bun.js/builtins/ts/ReadableStreamBYOBRequest.ts b/src/bun.js/builtins/ts/ReadableStreamBYOBRequest.ts new file mode 100644 index 000000000..1354f9349 --- /dev/null +++ b/src/bun.js/builtins/ts/ReadableStreamBYOBRequest.ts @@ -0,0 +1,66 @@ +/* + * Copyright (C) 2017 Canon Inc. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY APPLE INC. ``AS IS'' AND ANY + * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR + * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL APPLE INC. OR + * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, + * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, + * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR + * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY + * OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +export function initializeReadableStreamBYOBRequest(this, controller, view) { + if (arguments.length !== 3 && arguments[2] !== $isReadableStream) + throw new TypeError("ReadableStreamBYOBRequest constructor should not be called directly"); + + return $privateInitializeReadableStreamBYOBRequest.$call(this, controller, view); +} + +export function respond(this, bytesWritten) { + if (!$isReadableStreamBYOBRequest(this)) throw $makeThisTypeError("ReadableStreamBYOBRequest", "respond"); + + if ($getByIdDirectPrivate(this, "associatedReadableByteStreamController") === undefined) + throw new TypeError("ReadableStreamBYOBRequest.associatedReadableByteStreamController is undefined"); + + return $readableByteStreamControllerRespond( + $getByIdDirectPrivate(this, "associatedReadableByteStreamController"), + bytesWritten, + ); +} + +export function respondWithNewView(this, view) { + if (!$isReadableStreamBYOBRequest(this)) throw $makeThisTypeError("ReadableStreamBYOBRequest", "respond"); + + if ($getByIdDirectPrivate(this, "associatedReadableByteStreamController") === undefined) + throw new TypeError("ReadableStreamBYOBRequest.associatedReadableByteStreamController is undefined"); + + if (!$isObject(view)) throw new TypeError("Provided view is not an object"); + + if (!ArrayBuffer.$isView(view)) throw new TypeError("Provided view is not an ArrayBufferView"); + + return $readableByteStreamControllerRespondWithNewView( + $getByIdDirectPrivate(this, "associatedReadableByteStreamController"), + view, + ); +} + +$getter; +export function view(this) { + if (!$isReadableStreamBYOBRequest(this)) throw $makeGetterTypeError("ReadableStreamBYOBRequest", "view"); + + return $getByIdDirectPrivate(this, "view"); +} diff --git a/src/bun.js/builtins/ts/ReadableStreamDefaultController.ts b/src/bun.js/builtins/ts/ReadableStreamDefaultController.ts new file mode 100644 index 000000000..912cd1acb --- /dev/null +++ b/src/bun.js/builtins/ts/ReadableStreamDefaultController.ts @@ -0,0 +1,63 @@ +/* + * Copyright (C) 2015 Canon Inc. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY APPLE INC. ``AS IS'' AND ANY + * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR + * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL APPLE INC. OR + * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, + * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, + * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR + * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY + * OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +export function initializeReadableStreamDefaultController(this, stream, underlyingSource, size, highWaterMark) { + if (arguments.length !== 5 && arguments[4] !== $isReadableStream) + throw new TypeError("ReadableStreamDefaultController constructor should not be called directly"); + + return $privateInitializeReadableStreamDefaultController.$call(this, stream, underlyingSource, size, highWaterMark); +} + +export function enqueue(this, chunk) { + if (!$isReadableStreamDefaultController(this)) throw $makeThisTypeError("ReadableStreamDefaultController", "enqueue"); + + if (!$readableStreamDefaultControllerCanCloseOrEnqueue(this)) + throw new TypeError("ReadableStreamDefaultController is not in a state where chunk can be enqueued"); + + return $readableStreamDefaultControllerEnqueue(this, chunk); +} + +export function error(this, err) { + if (!$isReadableStreamDefaultController(this)) throw $makeThisTypeError("ReadableStreamDefaultController", "error"); + + $readableStreamDefaultControllerError(this, err); +} + +export function close(this) { + if (!$isReadableStreamDefaultController(this)) throw $makeThisTypeError("ReadableStreamDefaultController", "close"); + + if (!$readableStreamDefaultControllerCanCloseOrEnqueue(this)) + throw new TypeError("ReadableStreamDefaultController is not in a state where it can be closed"); + + $readableStreamDefaultControllerClose(this); +} + +$getter; +export function desiredSize(this) { + if (!$isReadableStreamDefaultController(this)) + throw $makeGetterTypeError("ReadableStreamDefaultController", "desiredSize"); + + return $readableStreamDefaultControllerGetDesiredSize(this); +} diff --git a/src/bun.js/builtins/ts/ReadableStreamDefaultReader.ts b/src/bun.js/builtins/ts/ReadableStreamDefaultReader.ts new file mode 100644 index 000000000..ecd553ed5 --- /dev/null +++ b/src/bun.js/builtins/ts/ReadableStreamDefaultReader.ts @@ -0,0 +1,185 @@ +/* + * Copyright (C) 2015 Canon Inc. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY APPLE INC. ``AS IS'' AND ANY + * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR + * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL APPLE INC. OR + * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, + * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, + * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR + * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY + * OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +export function initializeReadableStreamDefaultReader(this, stream) { + if (!$isReadableStream(stream)) throw new TypeError("ReadableStreamDefaultReader needs a ReadableStream"); + if ($isReadableStreamLocked(stream)) throw new TypeError("ReadableStream is locked"); + + $readableStreamReaderGenericInitialize(this, stream); + $putByIdDirectPrivate(this, "readRequests", $createFIFO()); + + return this; +} + +export function cancel(this, reason) { + if (!$isReadableStreamDefaultReader(this)) + return Promise.$reject($makeThisTypeError("ReadableStreamDefaultReader", "cancel")); + + if (!$getByIdDirectPrivate(this, "ownerReadableStream")) + return Promise.$reject(new TypeError("cancel() called on a reader owned by no readable stream")); + + return $readableStreamReaderGenericCancel(this, reason); +} + +export function readMany(this) { + if (!$isReadableStreamDefaultReader(this)) + throw new TypeError("ReadableStreamDefaultReader.readMany() should not be called directly"); + + const stream = $getByIdDirectPrivate(this, "ownerReadableStream"); + if (!stream) throw new TypeError("readMany() called on a reader owned by no readable stream"); + + const state = $getByIdDirectPrivate(stream, "state"); + $putByIdDirectPrivate(stream, "disturbed", true); + if (state === $streamClosed) return { value: [], size: 0, done: true }; + else if (state === $streamErrored) { + throw $getByIdDirectPrivate(stream, "storedError"); + } + + var controller = $getByIdDirectPrivate(stream, "readableStreamController"); + var queue = $getByIdDirectPrivate(controller, "queue"); + + if (!queue) { + // This is a ReadableStream direct controller implemented in JS + // It hasn't been started yet. + return controller.$pull(controller).$then(function ({ done, value }) { + return done ? { done: true, value: [], size: 0 } : { value: [value], size: 1, done: false }; + }); + } + + const content = queue.content; + var size = queue.size; + var values = content.toArray(false); + + var length = values.length; + + if (length > 0) { + var outValues = $newArrayWithSize(length); + if ($isReadableByteStreamController(controller)) { + { + const buf = values[0]; + if (!(ArrayBuffer.$isView(buf) || buf instanceof ArrayBuffer)) { + $putByValDirect(outValues, 0, new Uint8Array(buf.buffer, buf.byteOffset, buf.byteLength)); + } else { + $putByValDirect(outValues, 0, buf); + } + } + + for (var i = 1; i < length; i++) { + const buf = values[i]; + if (!(ArrayBuffer.$isView(buf) || buf instanceof ArrayBuffer)) { + $putByValDirect(outValues, i, new Uint8Array(buf.buffer, buf.byteOffset, buf.byteLength)); + } else { + $putByValDirect(outValues, i, buf); + } + } + } else { + $putByValDirect(outValues, 0, values[0].value); + for (var i = 1; i < length; i++) { + $putByValDirect(outValues, i, values[i].value); + } + } + + $resetQueue($getByIdDirectPrivate(controller, "queue")); + + if ($getByIdDirectPrivate(controller, "closeRequested")) + $readableStreamClose($getByIdDirectPrivate(controller, "controlledReadableStream")); + else if ($isReadableStreamDefaultController(controller)) + $readableStreamDefaultControllerCallPullIfNeeded(controller); + else if ($isReadableByteStreamController(controller)) $readableByteStreamControllerCallPullIfNeeded(controller); + + return { value: outValues, size, done: false }; + } + + var onPullMany = result => { + if (result.done) { + return { value: [], size: 0, done: true }; + } + var controller = $getByIdDirectPrivate(stream, "readableStreamController"); + + var queue = $getByIdDirectPrivate(controller, "queue"); + var value = [result.value].concat(queue.content.toArray(false)); + var length = value.length; + + if ($isReadableByteStreamController(controller)) { + for (var i = 0; i < length; i++) { + const buf = value[i]; + if (!(ArrayBuffer.$isView(buf) || buf instanceof ArrayBuffer)) { + const { buffer, byteOffset, byteLength } = buf; + $putByValDirect(value, i, new Uint8Array(buffer, byteOffset, byteLength)); + } + } + } else { + for (var i = 1; i < length; i++) { + $putByValDirect(value, i, value[i].value); + } + } + + var size = queue.size; + $resetQueue(queue); + + if ($getByIdDirectPrivate(controller, "closeRequested")) + $readableStreamClose($getByIdDirectPrivate(controller, "controlledReadableStream")); + else if ($isReadableStreamDefaultController(controller)) + $readableStreamDefaultControllerCallPullIfNeeded(controller); + else if ($isReadableByteStreamController(controller)) $readableByteStreamControllerCallPullIfNeeded(controller); + + return { value: value, size: size, done: false }; + }; + + var pullResult = controller.$pull(controller); + if (pullResult && $isPromise(pullResult)) { + return pullResult.$then(onPullMany); + } + + return onPullMany(pullResult); +} + +export function read(this) { + if (!$isReadableStreamDefaultReader(this)) + return Promise.$reject($makeThisTypeError("ReadableStreamDefaultReader", "read")); + if (!$getByIdDirectPrivate(this, "ownerReadableStream")) + return Promise.$reject(new TypeError("read() called on a reader owned by no readable stream")); + + return $readableStreamDefaultReaderRead(this); +} + +export function releaseLock(this) { + if (!$isReadableStreamDefaultReader(this)) throw $makeThisTypeError("ReadableStreamDefaultReader", "releaseLock"); + + if (!$getByIdDirectPrivate(this, "ownerReadableStream")) return; + + if ($getByIdDirectPrivate(this, "readRequests")?.isNotEmpty()) + throw new TypeError("There are still pending read requests, cannot release the lock"); + + $readableStreamReaderGenericRelease(this); +} + +$getter; +export function closed(this) { + if (!$isReadableStreamDefaultReader(this)) + return Promise.$reject($makeGetterTypeError("ReadableStreamDefaultReader", "closed")); + + return $getByIdDirectPrivate(this, "closedPromiseCapability").$promise; +} diff --git a/src/bun.js/builtins/ts/ReadableStreamInternals.ts b/src/bun.js/builtins/ts/ReadableStreamInternals.ts new file mode 100644 index 000000000..c0867445f --- /dev/null +++ b/src/bun.js/builtins/ts/ReadableStreamInternals.ts @@ -0,0 +1,1801 @@ +/* + * Copyright (C) 2015 Canon Inc. All rights reserved. + * Copyright (C) 2015 Igalia. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY APPLE INC. ``AS IS'' AND ANY + * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR + * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL APPLE INC. OR + * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, + * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, + * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR + * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY + * OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +// @internal + +export function readableStreamReaderGenericInitialize(reader, stream) { + $putByIdDirectPrivate(reader, "ownerReadableStream", stream); + $putByIdDirectPrivate(stream, "reader", reader); + if ($getByIdDirectPrivate(stream, "state") === $streamReadable) + $putByIdDirectPrivate(reader, "closedPromiseCapability", $newPromiseCapability(Promise)); + else if ($getByIdDirectPrivate(stream, "state") === $streamClosed) + $putByIdDirectPrivate(reader, "closedPromiseCapability", { + $promise: Promise.$resolve(), + }); + else { + $assert($getByIdDirectPrivate(stream, "state") === $streamErrored); + $putByIdDirectPrivate(reader, "closedPromiseCapability", { + $promise: $newHandledRejectedPromise($getByIdDirectPrivate(stream, "storedError")), + }); + } +} + +export function privateInitializeReadableStreamDefaultController(this, stream, underlyingSource, size, highWaterMark) { + if (!$isReadableStream(stream)) throw new TypeError("ReadableStreamDefaultController needs a ReadableStream"); + + // readableStreamController is initialized with null value. + if ($getByIdDirectPrivate(stream, "readableStreamController") !== null) + throw new TypeError("ReadableStream already has a controller"); + + $putByIdDirectPrivate(this, "controlledReadableStream", stream); + $putByIdDirectPrivate(this, "underlyingSource", underlyingSource); + $putByIdDirectPrivate(this, "queue", $newQueue()); + $putByIdDirectPrivate(this, "started", -1); + $putByIdDirectPrivate(this, "closeRequested", false); + $putByIdDirectPrivate(this, "pullAgain", false); + $putByIdDirectPrivate(this, "pulling", false); + $putByIdDirectPrivate(this, "strategy", $validateAndNormalizeQueuingStrategy(size, highWaterMark)); + + return this; +} + +export function readableStreamDefaultControllerError(controller, error) { + const stream = $getByIdDirectPrivate(controller, "controlledReadableStream"); + if ($getByIdDirectPrivate(stream, "state") !== $streamReadable) return; + $putByIdDirectPrivate(controller, "queue", $newQueue()); + + $readableStreamError(stream, error); +} + +export function readableStreamPipeTo(stream, sink) { + $assert($isReadableStream(stream)); + + const reader = new ReadableStreamDefaultReader(stream); + + $getByIdDirectPrivate(reader, "closedPromiseCapability").$promise.$then( + () => {}, + e => { + sink.error(e); + }, + ); + + function doPipe() { + $readableStreamDefaultReaderRead(reader).$then( + function (result) { + if (result.done) { + sink.close(); + return; + } + try { + sink.enqueue(result.value); + } catch (e) { + sink.error("ReadableStream chunk enqueueing in the sink failed"); + return; + } + doPipe(); + }, + function (e) { + sink.error(e); + }, + ); + } + doPipe(); +} + +export function acquireReadableStreamDefaultReader(stream) { + var start = $getByIdDirectPrivate(stream, "start"); + if (start) { + start.$call(stream); + } + + return new ReadableStreamDefaultReader(stream); +} + +// https://streams.spec.whatwg.org/#set-up-readable-stream-default-controller, starting from step 6. +// The other part is implemented in privateInitializeReadableStreamDefaultController. +export function setupReadableStreamDefaultController( + stream, + underlyingSource, + size, + highWaterMark, + startMethod, + pullMethod, + cancelMethod, +) { + const controller = new ReadableStreamDefaultController( + stream, + underlyingSource, + size, + highWaterMark, + $isReadableStream, + ); + + const pullAlgorithm = () => $promiseInvokeOrNoopMethod(underlyingSource, pullMethod, [controller]); + const cancelAlgorithm = reason => $promiseInvokeOrNoopMethod(underlyingSource, cancelMethod, [reason]); + + $putByIdDirectPrivate(controller, "pullAlgorithm", pullAlgorithm); + $putByIdDirectPrivate(controller, "cancelAlgorithm", cancelAlgorithm); + $putByIdDirectPrivate(controller, "pull", $readableStreamDefaultControllerPull); + $putByIdDirectPrivate(controller, "cancel", $readableStreamDefaultControllerCancel); + $putByIdDirectPrivate(stream, "readableStreamController", controller); + + $readableStreamDefaultControllerStart(controller); +} + +export function createReadableStreamController(stream, underlyingSource, strategy) { + const type = underlyingSource.type; + const typeString = $toString(type); + + if (typeString === "bytes") { + // if (!$readableByteStreamAPIEnabled()) + // $throwTypeError("ReadableByteStreamController is not implemented"); + + if (strategy.highWaterMark === undefined) strategy.highWaterMark = 0; + if (strategy.size !== undefined) $throwRangeError("Strategy for a ReadableByteStreamController cannot have a size"); + + $putByIdDirectPrivate( + stream, + "readableStreamController", + new ReadableByteStreamController(stream, underlyingSource, strategy.highWaterMark, $isReadableStream), + ); + } else if (typeString === "direct") { + var highWaterMark = strategy?.highWaterMark; + $initializeArrayBufferStream.$call(stream, underlyingSource, highWaterMark); + } else if (type === undefined) { + if (strategy.highWaterMark === undefined) strategy.highWaterMark = 1; + + $setupReadableStreamDefaultController( + stream, + underlyingSource, + strategy.size, + strategy.highWaterMark, + underlyingSource.start, + underlyingSource.pull, + underlyingSource.cancel, + ); + } else throw new RangeError("Invalid type for underlying source"); +} + +export function readableStreamDefaultControllerStart(controller) { + if ($getByIdDirectPrivate(controller, "started") !== -1) return; + + const underlyingSource = $getByIdDirectPrivate(controller, "underlyingSource"); + const startMethod = underlyingSource.start; + $putByIdDirectPrivate(controller, "started", 0); + + $promiseInvokeOrNoopMethodNoCatch(underlyingSource, startMethod, [controller]).$then( + () => { + $putByIdDirectPrivate(controller, "started", 1); + $assert(!$getByIdDirectPrivate(controller, "pulling")); + $assert(!$getByIdDirectPrivate(controller, "pullAgain")); + $readableStreamDefaultControllerCallPullIfNeeded(controller); + }, + error => { + $readableStreamDefaultControllerError(controller, error); + }, + ); +} + +// FIXME: Replace readableStreamPipeTo by below function. +// This method implements the latest https://streams.spec.whatwg.org/#readable-stream-pipe-to. +export function readableStreamPipeToWritableStream( + source, + destination, + preventClose, + preventAbort, + preventCancel, + signal, +) { + // const isDirectStream = !!$getByIdDirectPrivate(source, "start"); + + $assert($isReadableStream(source)); + $assert($isWritableStream(destination)); + $assert(!$isReadableStreamLocked(source)); + $assert(!$isWritableStreamLocked(destination)); + $assert(signal === undefined || $isAbortSignal(signal)); + + if ($getByIdDirectPrivate(source, "underlyingByteSource") !== undefined) + return Promise.$reject("Piping to a readable bytestream is not supported"); + + let pipeState: any = { + source: source, + destination: destination, + preventAbort: preventAbort, + preventCancel: preventCancel, + preventClose: preventClose, + signal: signal, + }; + + pipeState.reader = $acquireReadableStreamDefaultReader(source); + pipeState.writer = $acquireWritableStreamDefaultWriter(destination); + + $putByIdDirectPrivate(source, "disturbed", true); + + pipeState.finalized = false; + pipeState.shuttingDown = false; + pipeState.promiseCapability = $newPromiseCapability(Promise); + pipeState.pendingReadPromiseCapability = $newPromiseCapability(Promise); + pipeState.pendingReadPromiseCapability.$resolve.$call(); + pipeState.pendingWritePromise = Promise.$resolve(); + + if (signal !== undefined) { + const algorithm = reason => { + if (pipeState.finalized) return; + + $pipeToShutdownWithAction( + pipeState, + () => { + const shouldAbortDestination = + !pipeState.preventAbort && $getByIdDirectPrivate(pipeState.destination, "state") === "writable"; + const promiseDestination = shouldAbortDestination + ? $writableStreamAbort(pipeState.destination, reason) + : Promise.$resolve(); + + const shouldAbortSource = + !pipeState.preventCancel && $getByIdDirectPrivate(pipeState.source, "state") === $streamReadable; + const promiseSource = shouldAbortSource + ? $readableStreamCancel(pipeState.source, reason) + : Promise.$resolve(); + + let promiseCapability = $newPromiseCapability(Promise); + let shouldWait = true; + let handleResolvedPromise = () => { + if (shouldWait) { + shouldWait = false; + return; + } + promiseCapability.$resolve.$call(); + }; + let handleRejectedPromise = e => { + promiseCapability.$reject.$call(undefined, e); + }; + promiseDestination.$then(handleResolvedPromise, handleRejectedPromise); + promiseSource.$then(handleResolvedPromise, handleRejectedPromise); + return promiseCapability.$promise; + }, + reason, + ); + }; + if ($whenSignalAborted(signal, algorithm)) return pipeState.promiseCapability.$promise; + } + + $pipeToErrorsMustBePropagatedForward(pipeState); + $pipeToErrorsMustBePropagatedBackward(pipeState); + $pipeToClosingMustBePropagatedForward(pipeState); + $pipeToClosingMustBePropagatedBackward(pipeState); + + $pipeToLoop(pipeState); + + return pipeState.promiseCapability.$promise; +} + +export function pipeToLoop(pipeState) { + if (pipeState.shuttingDown) return; + + $pipeToDoReadWrite(pipeState).$then(result => { + if (result) $pipeToLoop(pipeState); + }); +} + +export function pipeToDoReadWrite(pipeState) { + $assert(!pipeState.shuttingDown); + + pipeState.pendingReadPromiseCapability = $newPromiseCapability(Promise); + $getByIdDirectPrivate(pipeState.writer, "readyPromise").$promise.$then( + () => { + if (pipeState.shuttingDown) { + pipeState.pendingReadPromiseCapability.$resolve.$call(undefined, false); + return; + } + + $readableStreamDefaultReaderRead(pipeState.reader).$then( + result => { + const canWrite = !result.done && $getByIdDirectPrivate(pipeState.writer, "stream") !== undefined; + pipeState.pendingReadPromiseCapability.$resolve.$call(undefined, canWrite); + if (!canWrite) return; + + pipeState.pendingWritePromise = $writableStreamDefaultWriterWrite(pipeState.writer, result.value); + }, + e => { + pipeState.pendingReadPromiseCapability.$resolve.$call(undefined, false); + }, + ); + }, + e => { + pipeState.pendingReadPromiseCapability.$resolve.$call(undefined, false); + }, + ); + return pipeState.pendingReadPromiseCapability.$promise; +} + +export function pipeToErrorsMustBePropagatedForward(pipeState) { + const action = () => { + pipeState.pendingReadPromiseCapability.$resolve.$call(undefined, false); + const error = $getByIdDirectPrivate(pipeState.source, "storedError"); + if (!pipeState.preventAbort) { + $pipeToShutdownWithAction(pipeState, () => $writableStreamAbort(pipeState.destination, error), error); + return; + } + $pipeToShutdown(pipeState, error); + }; + + if ($getByIdDirectPrivate(pipeState.source, "state") === $streamErrored) { + action(); + return; + } + + $getByIdDirectPrivate(pipeState.reader, "closedPromiseCapability").$promise.$then(undefined, action); +} + +export function pipeToErrorsMustBePropagatedBackward(pipeState) { + const action = () => { + const error = $getByIdDirectPrivate(pipeState.destination, "storedError"); + if (!pipeState.preventCancel) { + $pipeToShutdownWithAction(pipeState, () => $readableStreamCancel(pipeState.source, error), error); + return; + } + $pipeToShutdown(pipeState, error); + }; + if ($getByIdDirectPrivate(pipeState.destination, "state") === "errored") { + action(); + return; + } + $getByIdDirectPrivate(pipeState.writer, "closedPromise").$promise.$then(undefined, action); +} + +export function pipeToClosingMustBePropagatedForward(pipeState) { + const action = () => { + pipeState.pendingReadPromiseCapability.$resolve.$call(undefined, false); + // const error = $getByIdDirectPrivate(pipeState.source, "storedError"); + if (!pipeState.preventClose) { + $pipeToShutdownWithAction(pipeState, () => + $writableStreamDefaultWriterCloseWithErrorPropagation(pipeState.writer), + ); + return; + } + $pipeToShutdown(pipeState); + }; + if ($getByIdDirectPrivate(pipeState.source, "state") === $streamClosed) { + action(); + return; + } + $getByIdDirectPrivate(pipeState.reader, "closedPromiseCapability").$promise.$then(action, undefined); +} + +export function pipeToClosingMustBePropagatedBackward(pipeState) { + if ( + !$writableStreamCloseQueuedOrInFlight(pipeState.destination) && + $getByIdDirectPrivate(pipeState.destination, "state") !== "closed" + ) + return; + + // $assert no chunks have been read/written + + const error = new TypeError("closing is propagated backward"); + if (!pipeState.preventCancel) { + $pipeToShutdownWithAction(pipeState, () => $readableStreamCancel(pipeState.source, error), error); + return; + } + $pipeToShutdown(pipeState, error); +} + +export function pipeToShutdownWithAction(pipeState, action) { + if (pipeState.shuttingDown) return; + + pipeState.shuttingDown = true; + + const hasError = arguments.length > 2; + const error = arguments[2]; + const finalize = () => { + const promise = action(); + promise.$then( + () => { + if (hasError) $pipeToFinalize(pipeState, error); + else $pipeToFinalize(pipeState); + }, + e => { + $pipeToFinalize(pipeState, e); + }, + ); + }; + + if ( + $getByIdDirectPrivate(pipeState.destination, "state") === "writable" && + !$writableStreamCloseQueuedOrInFlight(pipeState.destination) + ) { + pipeState.pendingReadPromiseCapability.$promise.$then( + () => { + pipeState.pendingWritePromise.$then(finalize, finalize); + }, + e => $pipeToFinalize(pipeState, e), + ); + return; + } + + finalize(); +} + +export function pipeToShutdown(pipeState) { + if (pipeState.shuttingDown) return; + + pipeState.shuttingDown = true; + + const hasError = arguments.length > 1; + const error = arguments[1]; + const finalize = () => { + if (hasError) $pipeToFinalize(pipeState, error); + else $pipeToFinalize(pipeState); + }; + + if ( + $getByIdDirectPrivate(pipeState.destination, "state") === "writable" && + !$writableStreamCloseQueuedOrInFlight(pipeState.destination) + ) { + pipeState.pendingReadPromiseCapability.$promise.$then( + () => { + pipeState.pendingWritePromise.$then(finalize, finalize); + }, + e => $pipeToFinalize(pipeState, e), + ); + return; + } + finalize(); +} + +export function pipeToFinalize(pipeState) { + $writableStreamDefaultWriterRelease(pipeState.writer); + $readableStreamReaderGenericRelease(pipeState.reader); + + // Instead of removing the abort algorithm as per spec, we make it a no-op which is equivalent. + pipeState.finalized = true; + + if (arguments.length > 1) pipeState.promiseCapability.$reject.$call(undefined, arguments[1]); + else pipeState.promiseCapability.$resolve.$call(); +} + +export function readableStreamTee(stream, shouldClone) { + $assert($isReadableStream(stream)); + $assert(typeof shouldClone === "boolean"); + + var start_ = $getByIdDirectPrivate(stream, "start"); + if (start_) { + $putByIdDirectPrivate(stream, "start", undefined); + start_(); + } + + const reader = new $ReadableStreamDefaultReader(stream); + + const teeState = { + closedOrErrored: false, + canceled1: false, + canceled2: false, + reason1: undefined, + reason2: undefined, + }; + + teeState.cancelPromiseCapability = $newPromiseCapability(Promise); + + const pullFunction = $readableStreamTeePullFunction(teeState, reader, shouldClone); + + const branch1Source = {}; + $putByIdDirectPrivate(branch1Source, "pull", pullFunction); + $putByIdDirectPrivate(branch1Source, "cancel", $readableStreamTeeBranch1CancelFunction(teeState, stream)); + + const branch2Source = {}; + $putByIdDirectPrivate(branch2Source, "pull", pullFunction); + $putByIdDirectPrivate(branch2Source, "cancel", $readableStreamTeeBranch2CancelFunction(teeState, stream)); + + const branch1 = new $ReadableStream(branch1Source); + const branch2 = new $ReadableStream(branch2Source); + + $getByIdDirectPrivate(reader, "closedPromiseCapability").$promise.$then(undefined, function (e) { + if (teeState.closedOrErrored) return; + $readableStreamDefaultControllerError(branch1.$readableStreamController, e); + $readableStreamDefaultControllerError(branch2.$readableStreamController, e); + teeState.closedOrErrored = true; + if (!teeState.canceled1 || !teeState.canceled2) teeState.cancelPromiseCapability.$resolve.$call(); + }); + + // Additional fields compared to the spec, as they are needed within pull/cancel functions. + teeState.branch1 = branch1; + teeState.branch2 = branch2; + + return [branch1, branch2]; +} + +export function readableStreamTeePullFunction(teeState, reader, shouldClone) { + return function () { + Promise.prototype.$then.$call($readableStreamDefaultReaderRead(reader), function (result) { + $assert($isObject(result)); + $assert(typeof result.done === "boolean"); + if (result.done && !teeState.closedOrErrored) { + if (!teeState.canceled1) $readableStreamDefaultControllerClose(teeState.branch1.$readableStreamController); + if (!teeState.canceled2) $readableStreamDefaultControllerClose(teeState.branch2.$readableStreamController); + teeState.closedOrErrored = true; + if (!teeState.canceled1 || !teeState.canceled2) teeState.cancelPromiseCapability.$resolve.$call(); + } + if (teeState.closedOrErrored) return; + if (!teeState.canceled1) + $readableStreamDefaultControllerEnqueue(teeState.branch1.$readableStreamController, result.value); + if (!teeState.canceled2) + $readableStreamDefaultControllerEnqueue( + teeState.branch2.$readableStreamController, + shouldClone ? $structuredCloneForStream(result.value) : result.value, + ); + }); + }; +} + +export function readableStreamTeeBranch1CancelFunction(teeState, stream) { + return function (r) { + teeState.canceled1 = true; + teeState.reason1 = r; + if (teeState.canceled2) { + $readableStreamCancel(stream, [teeState.reason1, teeState.reason2]).$then( + teeState.cancelPromiseCapability.$resolve, + teeState.cancelPromiseCapability.$reject, + ); + } + return teeState.cancelPromiseCapability.$promise; + }; +} + +export function readableStreamTeeBranch2CancelFunction(teeState, stream) { + return function (r) { + teeState.canceled2 = true; + teeState.reason2 = r; + if (teeState.canceled1) { + $readableStreamCancel(stream, [teeState.reason1, teeState.reason2]).$then( + teeState.cancelPromiseCapability.$resolve, + teeState.cancelPromiseCapability.$reject, + ); + } + return teeState.cancelPromiseCapability.$promise; + }; +} + +export function isReadableStream(stream) { + // Spec tells to return true only if stream has a readableStreamController internal slot. + // However, since it is a private slot, it cannot be checked using hasOwnProperty(). + // Therefore, readableStreamController is initialized with null value. + return $isObject(stream) && $getByIdDirectPrivate(stream, "readableStreamController") !== undefined; +} + +export function isReadableStreamDefaultReader(reader) { + // Spec tells to return true only if reader has a readRequests internal slot. + // However, since it is a private slot, it cannot be checked using hasOwnProperty(). + // Since readRequests is initialized with an empty array, the following test is ok. + return $isObject(reader) && !!$getByIdDirectPrivate(reader, "readRequests"); +} + +export function isReadableStreamDefaultController(controller) { + // Spec tells to return true only if controller has an underlyingSource internal slot. + // However, since it is a private slot, it cannot be checked using hasOwnProperty(). + // underlyingSource is obtained in ReadableStream constructor: if undefined, it is set + // to an empty object. Therefore, following test is ok. + return $isObject(controller) && !!$getByIdDirectPrivate(controller, "underlyingSource"); +} + +export function readDirectStream(stream, sink, underlyingSource) { + $putByIdDirectPrivate(stream, "underlyingSource", undefined); + $putByIdDirectPrivate(stream, "start", undefined); + + function close(stream, reason) { + if (reason && underlyingSource?.cancel) { + try { + var prom = underlyingSource.cancel(reason); + $markPromiseAsHandled(prom); + } catch (e) {} + + underlyingSource = undefined; + } + + if (stream) { + $putByIdDirectPrivate(stream, "readableStreamController", undefined); + $putByIdDirectPrivate(stream, "reader", undefined); + if (reason) { + $putByIdDirectPrivate(stream, "state", $streamErrored); + $putByIdDirectPrivate(stream, "storedError", reason); + } else { + $putByIdDirectPrivate(stream, "state", $streamClosed); + } + stream = undefined; + } + } + + if (!underlyingSource.pull) { + close(); + return; + } + + if (!$isCallable(underlyingSource.pull)) { + close(); + $throwTypeError("pull is not a function"); + return; + } + + $putByIdDirectPrivate(stream, "readableStreamController", sink); + const highWaterMark = $getByIdDirectPrivate(stream, "highWaterMark"); + + sink.start({ + highWaterMark: !highWaterMark || highWaterMark < 64 ? 64 : highWaterMark, + }); + + $startDirectStream.$call(sink, stream, underlyingSource.pull, close); + $putByIdDirectPrivate(stream, "reader", {}); + + var maybePromise = underlyingSource.pull(sink); + sink = undefined; + if (maybePromise && $isPromise(maybePromise)) { + return maybePromise.$then(() => {}); + } +} + +$linkTimeConstant; +export function assignToStream(stream, sink) { + // The stream is either a direct stream or a "default" JS stream + var underlyingSource = $getByIdDirectPrivate(stream, "underlyingSource"); + + // we know it's a direct stream when $underlyingSource is set + if (underlyingSource) { + try { + return $readDirectStream(stream, sink, underlyingSource); + } catch (e) { + throw e; + } finally { + underlyingSource = undefined; + stream = undefined; + sink = undefined; + } + } + + return $readStreamIntoSink(stream, sink, true); +} + +export async function readStreamIntoSink(stream, sink, isNative) { + var didClose = false; + var didThrow = false; + try { + var reader = stream.getReader(); + var many = reader.readMany(); + if (many && $isPromise(many)) { + many = await many; + } + if (many.done) { + didClose = true; + return sink.end(); + } + var wroteCount = many.value.length; + const highWaterMark = $getByIdDirectPrivate(stream, "highWaterMark"); + if (isNative) + $startDirectStream.$call(sink, stream, undefined, () => !didThrow && $markPromiseAsHandled(stream.cancel())); + + sink.start({ highWaterMark: highWaterMark || 0 }); + + for (var i = 0, values = many.value, length = many.value.length; i < length; i++) { + sink.write(values[i]); + } + + var streamState = $getByIdDirectPrivate(stream, "state"); + if (streamState === $streamClosed) { + didClose = true; + return sink.end(); + } + + while (true) { + var { value, done } = await reader.read(); + if (done) { + didClose = true; + return sink.end(); + } + + sink.write(value); + } + } catch (e) { + didThrow = true; + + try { + reader = undefined; + const prom = stream.cancel(e); + $markPromiseAsHandled(prom); + } catch (j) {} + + if (sink && !didClose) { + didClose = true; + try { + sink.close(e); + } catch (j) { + throw new globalThis.AggregateError([e, j]); + } + } + + throw e; + } finally { + if (reader) { + try { + reader.releaseLock(); + } catch (e) {} + reader = undefined; + } + sink = undefined; + var streamState = $getByIdDirectPrivate(stream, "state"); + if (stream) { + // make it easy for this to be GC'd + // but don't do property transitions + var readableStreamController = $getByIdDirectPrivate(stream, "readableStreamController"); + if (readableStreamController) { + if ($getByIdDirectPrivate(readableStreamController, "underlyingSource")) + $putByIdDirectPrivate(readableStreamController, "underlyingSource", undefined); + if ($getByIdDirectPrivate(readableStreamController, "controlledReadableStream")) + $putByIdDirectPrivate(readableStreamController, "controlledReadableStream", undefined); + + $putByIdDirectPrivate(stream, "readableStreamController", null); + if ($getByIdDirectPrivate(stream, "underlyingSource")) + $putByIdDirectPrivate(stream, "underlyingSource", undefined); + readableStreamController = undefined; + } + + if (!didThrow && streamState !== $streamClosed && streamState !== $streamErrored) { + $readableStreamClose(stream); + } + stream = undefined; + } + } +} + +export function handleDirectStreamError(e) { + var controller = this; + var sink = controller.$sink; + if (sink) { + $putByIdDirectPrivate(controller, "sink", undefined); + try { + sink.close(e); + } catch (f) {} + } + + this.error = this.flush = this.write = this.close = this.end = $onReadableStreamDirectControllerClosed; + + if (typeof this.$underlyingSource.close === "function") { + try { + this.$underlyingSource.close.$call(this.$underlyingSource, e); + } catch (e) {} + } + + try { + var pend = controller._pendingRead; + if (pend) { + controller._pendingRead = undefined; + $rejectPromise(pend, e); + } + } catch (f) {} + var stream = controller.$controlledReadableStream; + if (stream) $readableStreamError(stream, e); +} + +export function handleDirectStreamErrorReject(e) { + $handleDirectStreamError.$call(this, e); + return Promise.$reject(e); +} + +export function onPullDirectStream(controller) { + var stream = controller.$controlledReadableStream; + if (!stream || $getByIdDirectPrivate(stream, "state") !== $streamReadable) return; + + // pull is in progress + // this is a recursive call + // ignore it + if (controller._deferClose === -1) { + return; + } + + controller._deferClose = -1; + controller._deferFlush = -1; + var deferClose; + var deferFlush; + + // Direct streams allow $pull to be called multiple times, unlike the spec. + // Backpressure is handled by the destination, not by the underlying source. + // In this case, we rely on the heuristic that repeatedly draining in the same tick + // is bad for performance + // this code is only run when consuming a direct stream from JS + // without the HTTP server or anything else + try { + var result = controller.$underlyingSource.pull(controller); + + if (result && $isPromise(result)) { + if (controller._handleError === undefined) { + controller._handleError = $handleDirectStreamErrorReject.bind(controller); + } + + Promise.prototype.catch.$call(result, controller._handleError); + } + } catch (e) { + return $handleDirectStreamErrorReject.$call(controller, e); + } finally { + deferClose = controller._deferClose; + deferFlush = controller._deferFlush; + controller._deferFlush = controller._deferClose = 0; + } + + var promiseToReturn; + + if (controller._pendingRead === undefined) { + controller._pendingRead = promiseToReturn = $newPromise(); + } else { + promiseToReturn = $readableStreamAddReadRequest(stream); + } + + // they called close during $pull() + // we delay that + if (deferClose === 1) { + var reason = controller._deferCloseReason; + controller._deferCloseReason = undefined; + $onCloseDirectStream.$call(controller, reason); + return promiseToReturn; + } + + // not done, but they called flush() + if (deferFlush === 1) { + $onFlushDirectStream.$call(controller); + } + + return promiseToReturn; +} + +export function noopDoneFunction() { + return Promise.$resolve({ value: undefined, done: true }); +} + +export function onReadableStreamDirectControllerClosed(reason) { + $throwTypeError("ReadableStreamDirectController is now closed"); +} + +export function onCloseDirectStream(reason) { + var stream = this.$controlledReadableStream; + if (!stream || $getByIdDirectPrivate(stream, "state") !== $streamReadable) return; + + if (this._deferClose !== 0) { + this._deferClose = 1; + this._deferCloseReason = reason; + return; + } + + $putByIdDirectPrivate(stream, "state", $streamClosing); + if (typeof this.$underlyingSource.close === "function") { + try { + this.$underlyingSource.close.$call(this.$underlyingSource, reason); + } catch (e) {} + } + + var flushed; + try { + flushed = this.$sink.end(); + $putByIdDirectPrivate(this, "sink", undefined); + } catch (e) { + if (this._pendingRead) { + var read = this._pendingRead; + this._pendingRead = undefined; + $rejectPromise(read, e); + } + $readableStreamError(stream, e); + return; + } + + this.error = this.flush = this.write = this.close = this.end = $onReadableStreamDirectControllerClosed; + + var reader = $getByIdDirectPrivate(stream, "reader"); + + if (reader && $isReadableStreamDefaultReader(reader)) { + var _pendingRead = this._pendingRead; + if (_pendingRead && $isPromise(_pendingRead) && flushed?.byteLength) { + this._pendingRead = undefined; + $fulfillPromise(_pendingRead, { value: flushed, done: false }); + $readableStreamClose(stream); + return; + } + } + + if (flushed?.byteLength) { + var requests = $getByIdDirectPrivate(reader, "readRequests"); + if (requests?.isNotEmpty()) { + $readableStreamFulfillReadRequest(stream, flushed, false); + $readableStreamClose(stream); + return; + } + + $putByIdDirectPrivate(stream, "state", $streamReadable); + this.$pull = () => { + var thisResult = $createFulfilledPromise({ + value: flushed, + done: false, + }); + flushed = undefined; + $readableStreamClose(stream); + stream = undefined; + return thisResult; + }; + } else if (this._pendingRead) { + var read = this._pendingRead; + this._pendingRead = undefined; + $putByIdDirectPrivate(this, "pull", $noopDoneFunction); + $fulfillPromise(read, { value: undefined, done: true }); + } + + $readableStreamClose(stream); +} + +export function onFlushDirectStream() { + var stream = this.$controlledReadableStream; + var reader = $getByIdDirectPrivate(stream, "reader"); + if (!reader || !$isReadableStreamDefaultReader(reader)) { + return; + } + + var _pendingRead = this._pendingRead; + this._pendingRead = undefined; + if (_pendingRead && $isPromise(_pendingRead)) { + var flushed = this.$sink.flush(); + if (flushed?.byteLength) { + this._pendingRead = $getByIdDirectPrivate(stream, "readRequests")?.shift(); + $fulfillPromise(_pendingRead, { value: flushed, done: false }); + } else { + this._pendingRead = _pendingRead; + } + } else if ($getByIdDirectPrivate(stream, "readRequests")?.isNotEmpty()) { + var flushed = this.$sink.flush(); + if (flushed?.byteLength) { + $readableStreamFulfillReadRequest(stream, flushed, false); + } + } else if (this._deferFlush === -1) { + this._deferFlush = 1; + } +} + +export function createTextStream(highWaterMark) { + var sink; + var array = []; + var hasString = false; + var hasBuffer = false; + var rope = ""; + var estimatedLength = $toLength(0); + var capability = $newPromiseCapability(Promise); + var calledDone = false; + + sink = { + start() {}, + write(chunk) { + if (typeof chunk === "string") { + var chunkLength = $toLength(chunk.length); + if (chunkLength > 0) { + rope += chunk; + hasString = true; + // TODO: utf16 byte length + estimatedLength += chunkLength; + } + + return chunkLength; + } + + if (!chunk || !($ArrayBuffer.$isView(chunk) || chunk instanceof $ArrayBuffer)) { + $throwTypeError("Expected text, ArrayBuffer or ArrayBufferView"); + } + + const byteLength = $toLength(chunk.byteLength); + if (byteLength > 0) { + hasBuffer = true; + if (rope.length > 0) { + $arrayPush(array, rope, chunk); + rope = ""; + } else { + $arrayPush(array, chunk); + } + } + estimatedLength += byteLength; + return byteLength; + }, + + flush() { + return 0; + }, + + end() { + if (calledDone) { + return ""; + } + return sink.fulfill(); + }, + + fulfill() { + calledDone = true; + const result = sink.finishInternal(); + + $fulfillPromise(capability.$promise, result); + return result; + }, + + finishInternal() { + if (!hasString && !hasBuffer) { + return ""; + } + + if (hasString && !hasBuffer) { + return rope; + } + + if (hasBuffer && !hasString) { + return new globalThis.TextDecoder().decode($Bun.concatArrayBuffers(array)); + } + + // worst case: mixed content + + var arrayBufferSink = new $Bun.ArrayBufferSink(); + arrayBufferSink.start({ + highWaterMark: estimatedLength, + asUint8Array: true, + }); + for (let item of array) { + arrayBufferSink.write(item); + } + array.length = 0; + if (rope.length > 0) { + arrayBufferSink.write(rope); + rope = ""; + } + + // TODO: use builtin + return new globalThis.TextDecoder().decode(arrayBufferSink.end()); + }, + + close() { + try { + if (!calledDone) { + calledDone = true; + sink.fulfill(); + } + } catch (e) {} + }, + }; + + return [sink, capability]; +} + +export function initializeTextStream(underlyingSource, highWaterMark) { + var [sink, closingPromise] = $createTextStream(highWaterMark); + + var controller = { + $underlyingSource: underlyingSource, + $pull: $onPullDirectStream, + $controlledReadableStream: this, + $sink: sink, + close: $onCloseDirectStream, + write: sink.write, + error: $handleDirectStreamError, + end: $onCloseDirectStream, + $close: $onCloseDirectStream, + flush: $onFlushDirectStream, + _pendingRead: undefined, + _deferClose: 0, + _deferFlush: 0, + _deferCloseReason: undefined, + _handleError: undefined, + }; + + $putByIdDirectPrivate(this, "readableStreamController", controller); + $putByIdDirectPrivate(this, "underlyingSource", undefined); + $putByIdDirectPrivate(this, "start", undefined); + return closingPromise; +} + +export function initializeArrayStream(underlyingSource, highWaterMark) { + var array = []; + var closingPromise = $newPromiseCapability(Promise); + var calledDone = false; + + function fulfill() { + calledDone = true; + closingPromise.$resolve.$call(undefined, array); + return array; + } + + var sink = { + start() {}, + write(chunk) { + $arrayPush(array, chunk); + return chunk.byteLength || chunk.length; + }, + + flush() { + return 0; + }, + + end() { + if (calledDone) { + return []; + } + return fulfill(); + }, + + close() { + if (!calledDone) { + fulfill(); + } + }, + }; + + var controller = { + $underlyingSource: underlyingSource, + $pull: $onPullDirectStream, + $controlledReadableStream: this, + $sink: sink, + close: $onCloseDirectStream, + write: sink.write, + error: $handleDirectStreamError, + end: $onCloseDirectStream, + $close: $onCloseDirectStream, + flush: $onFlushDirectStream, + _pendingRead: undefined, + _deferClose: 0, + _deferFlush: 0, + _deferCloseReason: undefined, + _handleError: undefined, + }; + + $putByIdDirectPrivate(this, "readableStreamController", controller); + $putByIdDirectPrivate(this, "underlyingSource", undefined); + $putByIdDirectPrivate(this, "start", undefined); + return closingPromise; +} + +export function initializeArrayBufferStream(underlyingSource, highWaterMark) { + // This is the fallback implementation for direct streams + // When we don't know what the destination type is + // We assume it is a Uint8Array. + + var opts = + highWaterMark && typeof highWaterMark === "number" + ? { highWaterMark, stream: true, asUint8Array: true } + : { stream: true, asUint8Array: true }; + var sink = new $Bun.ArrayBufferSink(); + sink.start(opts); + + var controller = { + $underlyingSource: underlyingSource, + $pull: $onPullDirectStream, + $controlledReadableStream: this, + $sink: sink, + close: $onCloseDirectStream, + write: sink.write.bind(sink), + error: $handleDirectStreamError, + end: $onCloseDirectStream, + $close: $onCloseDirectStream, + flush: $onFlushDirectStream, + _pendingRead: undefined, + _deferClose: 0, + _deferFlush: 0, + _deferCloseReason: undefined, + _handleError: undefined, + }; + + $putByIdDirectPrivate(this, "readableStreamController", controller); + $putByIdDirectPrivate(this, "underlyingSource", undefined); + $putByIdDirectPrivate(this, "start", undefined); +} + +export function readableStreamError(stream, error) { + $assert($isReadableStream(stream)); + $assert($getByIdDirectPrivate(stream, "state") === $streamReadable); + $putByIdDirectPrivate(stream, "state", $streamErrored); + $putByIdDirectPrivate(stream, "storedError", error); + + const reader = $getByIdDirectPrivate(stream, "reader"); + + if (!reader) return; + + if ($isReadableStreamDefaultReader(reader)) { + const requests = $getByIdDirectPrivate(reader, "readRequests"); + $putByIdDirectPrivate(reader, "readRequests", $createFIFO()); + for (var request = requests.shift(); request; request = requests.shift()) $rejectPromise(request, error); + } else { + $assert($isReadableStreamBYOBReader(reader)); + const requests = $getByIdDirectPrivate(reader, "readIntoRequests"); + $putByIdDirectPrivate(reader, "readIntoRequests", $createFIFO()); + for (var request = requests.shift(); request; request = requests.shift()) $rejectPromise(request, error); + } + + $getByIdDirectPrivate(reader, "closedPromiseCapability").$reject.$call(undefined, error); + const promise = $getByIdDirectPrivate(reader, "closedPromiseCapability").$promise; + $markPromiseAsHandled(promise); +} + +export function readableStreamDefaultControllerShouldCallPull(controller) { + const stream = $getByIdDirectPrivate(controller, "controlledReadableStream"); + + if (!$readableStreamDefaultControllerCanCloseOrEnqueue(controller)) return false; + if (!($getByIdDirectPrivate(controller, "started") === 1)) return false; + if ( + (!$isReadableStreamLocked(stream) || + !$getByIdDirectPrivate($getByIdDirectPrivate(stream, "reader"), "readRequests")?.isNotEmpty()) && + $readableStreamDefaultControllerGetDesiredSize(controller) <= 0 + ) + return false; + const desiredSize = $readableStreamDefaultControllerGetDesiredSize(controller); + $assert(desiredSize !== null); + return desiredSize > 0; +} + +export function readableStreamDefaultControllerCallPullIfNeeded(controller) { + // FIXME: use $readableStreamDefaultControllerShouldCallPull + const stream = $getByIdDirectPrivate(controller, "controlledReadableStream"); + + if (!$readableStreamDefaultControllerCanCloseOrEnqueue(controller)) return; + if (!($getByIdDirectPrivate(controller, "started") === 1)) return; + if ( + (!$isReadableStreamLocked(stream) || + !$getByIdDirectPrivate($getByIdDirectPrivate(stream, "reader"), "readRequests")?.isNotEmpty()) && + $readableStreamDefaultControllerGetDesiredSize(controller) <= 0 + ) + return; + + if ($getByIdDirectPrivate(controller, "pulling")) { + $putByIdDirectPrivate(controller, "pullAgain", true); + return; + } + + $assert(!$getByIdDirectPrivate(controller, "pullAgain")); + $putByIdDirectPrivate(controller, "pulling", true); + + $getByIdDirectPrivate(controller, "pullAlgorithm") + .$call(undefined) + .$then( + function () { + $putByIdDirectPrivate(controller, "pulling", false); + if ($getByIdDirectPrivate(controller, "pullAgain")) { + $putByIdDirectPrivate(controller, "pullAgain", false); + + $readableStreamDefaultControllerCallPullIfNeeded(controller); + } + }, + function (error) { + $readableStreamDefaultControllerError(controller, error); + }, + ); +} + +export function isReadableStreamLocked(stream) { + $assert($isReadableStream(stream)); + return !!$getByIdDirectPrivate(stream, "reader"); +} + +export function readableStreamDefaultControllerGetDesiredSize(controller) { + const stream = $getByIdDirectPrivate(controller, "controlledReadableStream"); + const state = $getByIdDirectPrivate(stream, "state"); + + if (state === $streamErrored) return null; + if (state === $streamClosed) return 0; + + return $getByIdDirectPrivate(controller, "strategy").highWaterMark - $getByIdDirectPrivate(controller, "queue").size; +} + +export function readableStreamReaderGenericCancel(reader, reason) { + const stream = $getByIdDirectPrivate(reader, "ownerReadableStream"); + $assert(!!stream); + return $readableStreamCancel(stream, reason); +} + +export function readableStreamCancel(stream, reason) { + $putByIdDirectPrivate(stream, "disturbed", true); + const state = $getByIdDirectPrivate(stream, "state"); + if (state === $streamClosed) return Promise.$resolve(); + if (state === $streamErrored) return Promise.$reject($getByIdDirectPrivate(stream, "storedError")); + $readableStreamClose(stream); + + var controller = $getByIdDirectPrivate(stream, "readableStreamController"); + var cancel = controller.$cancel; + if (cancel) { + return cancel(controller, reason).$then(function () {}); + } + + var close = controller.close; + if (close) { + return Promise.$resolve(controller.close(reason)); + } + + $throwTypeError("ReadableStreamController has no cancel or close method"); +} + +export function readableStreamDefaultControllerCancel(controller, reason) { + $putByIdDirectPrivate(controller, "queue", $newQueue()); + return $getByIdDirectPrivate(controller, "cancelAlgorithm").$call(undefined, reason); +} + +export function readableStreamDefaultControllerPull(controller) { + var queue = $getByIdDirectPrivate(controller, "queue"); + if (queue.content.isNotEmpty()) { + const chunk = $dequeueValue(queue); + if ($getByIdDirectPrivate(controller, "closeRequested") && queue.content.isEmpty()) + $readableStreamClose($getByIdDirectPrivate(controller, "controlledReadableStream")); + else $readableStreamDefaultControllerCallPullIfNeeded(controller); + + return $createFulfilledPromise({ value: chunk, done: false }); + } + const pendingPromise = $readableStreamAddReadRequest($getByIdDirectPrivate(controller, "controlledReadableStream")); + $readableStreamDefaultControllerCallPullIfNeeded(controller); + return pendingPromise; +} + +export function readableStreamDefaultControllerClose(controller) { + $assert($readableStreamDefaultControllerCanCloseOrEnqueue(controller)); + $putByIdDirectPrivate(controller, "closeRequested", true); + if ($getByIdDirectPrivate(controller, "queue")?.content?.isEmpty()) + $readableStreamClose($getByIdDirectPrivate(controller, "controlledReadableStream")); +} + +export function readableStreamClose(stream) { + $assert($getByIdDirectPrivate(stream, "state") === $streamReadable); + $putByIdDirectPrivate(stream, "state", $streamClosed); + if (!$getByIdDirectPrivate(stream, "reader")) return; + + if ($isReadableStreamDefaultReader($getByIdDirectPrivate(stream, "reader"))) { + const requests = $getByIdDirectPrivate($getByIdDirectPrivate(stream, "reader"), "readRequests"); + if (requests.isNotEmpty()) { + $putByIdDirectPrivate($getByIdDirectPrivate(stream, "reader"), "readRequests", $createFIFO()); + + for (var request = requests.shift(); request; request = requests.shift()) + $fulfillPromise(request, { value: undefined, done: true }); + } + } + + $getByIdDirectPrivate($getByIdDirectPrivate(stream, "reader"), "closedPromiseCapability").$resolve.$call(); +} + +export function readableStreamFulfillReadRequest(stream, chunk, done) { + const readRequest = $getByIdDirectPrivate($getByIdDirectPrivate(stream, "reader"), "readRequests").shift(); + $fulfillPromise(readRequest, { value: chunk, done: done }); +} + +export function readableStreamDefaultControllerEnqueue(controller, chunk) { + const stream = $getByIdDirectPrivate(controller, "controlledReadableStream"); + // this is checked by callers + $assert($readableStreamDefaultControllerCanCloseOrEnqueue(controller)); + + if ( + $isReadableStreamLocked(stream) && + $getByIdDirectPrivate($getByIdDirectPrivate(stream, "reader"), "readRequests")?.isNotEmpty() + ) { + $readableStreamFulfillReadRequest(stream, chunk, false); + $readableStreamDefaultControllerCallPullIfNeeded(controller); + return; + } + + try { + let chunkSize = 1; + if ($getByIdDirectPrivate(controller, "strategy").size !== undefined) + chunkSize = $getByIdDirectPrivate(controller, "strategy").size(chunk); + $enqueueValueWithSize($getByIdDirectPrivate(controller, "queue"), chunk, chunkSize); + } catch (error) { + $readableStreamDefaultControllerError(controller, error); + throw error; + } + $readableStreamDefaultControllerCallPullIfNeeded(controller); +} + +export function readableStreamDefaultReaderRead(reader) { + const stream = $getByIdDirectPrivate(reader, "ownerReadableStream"); + $assert(!!stream); + const state = $getByIdDirectPrivate(stream, "state"); + + $putByIdDirectPrivate(stream, "disturbed", true); + if (state === $streamClosed) return $createFulfilledPromise({ value: undefined, done: true }); + if (state === $streamErrored) return Promise.$reject($getByIdDirectPrivate(stream, "storedError")); + $assert(state === $streamReadable); + + return $getByIdDirectPrivate(stream, "readableStreamController").$pull( + $getByIdDirectPrivate(stream, "readableStreamController"), + ); +} + +export function readableStreamAddReadRequest(stream) { + $assert($isReadableStreamDefaultReader($getByIdDirectPrivate(stream, "reader"))); + $assert($getByIdDirectPrivate(stream, "state") == $streamReadable); + + const readRequest = $newPromise(); + + $getByIdDirectPrivate($getByIdDirectPrivate(stream, "reader"), "readRequests").push(readRequest); + + return readRequest; +} + +export function isReadableStreamDisturbed(stream) { + $assert($isReadableStream(stream)); + return $getByIdDirectPrivate(stream, "disturbed"); +} + +export function readableStreamReaderGenericRelease(reader) { + $assert(!!$getByIdDirectPrivate(reader, "ownerReadableStream")); + $assert($getByIdDirectPrivate($getByIdDirectPrivate(reader, "ownerReadableStream"), "reader") === reader); + + if ($getByIdDirectPrivate($getByIdDirectPrivate(reader, "ownerReadableStream"), "state") === $streamReadable) + $getByIdDirectPrivate(reader, "closedPromiseCapability").$reject.$call( + undefined, + $makeTypeError("releasing lock of reader whose stream is still in readable state"), + ); + else + $putByIdDirectPrivate(reader, "closedPromiseCapability", { + $promise: $newHandledRejectedPromise($makeTypeError("reader released lock")), + }); + + const promise = $getByIdDirectPrivate(reader, "closedPromiseCapability").$promise; + $markPromiseAsHandled(promise); + $putByIdDirectPrivate($getByIdDirectPrivate(reader, "ownerReadableStream"), "reader", undefined); + $putByIdDirectPrivate(reader, "ownerReadableStream", undefined); +} + +export function readableStreamDefaultControllerCanCloseOrEnqueue(controller) { + return ( + !$getByIdDirectPrivate(controller, "closeRequested") && + $getByIdDirectPrivate($getByIdDirectPrivate(controller, "controlledReadableStream"), "state") === $streamReadable + ); +} + +export function lazyLoadStream(stream, autoAllocateChunkSize) { + var nativeType = $getByIdDirectPrivate(stream, "bunNativeType"); + var nativePtr = $getByIdDirectPrivate(stream, "bunNativePtr"); + var Prototype = $lazyStreamPrototypeMap.$get(nativeType); + if (Prototype === undefined) { + var [pull, start, cancel, setClose, deinit, setRefOrUnref, drain] = $lazyLoad(nativeType); + var closer = [false]; + var handleResult; + function handleNativeReadableStreamPromiseResult(val) { + var { c, v } = this; + this.c = undefined; + this.v = undefined; + handleResult(val, c, v); + } + + function callClose(controller) { + try { + controller.close(); + } catch (e) { + globalThis.reportError(e); + } + } + + handleResult = function handleResult(result, controller, view) { + if (result && $isPromise(result)) { + return result.then( + handleNativeReadableStreamPromiseResult.bind({ + c: controller, + v: view, + }), + err => controller.error(err), + ); + } else if (typeof result === "number") { + if (view && view.byteLength === result && view.buffer === controller.byobRequest?.view?.buffer) { + controller.byobRequest.respondWithNewView(view); + } else { + controller.byobRequest.respond(result); + } + } else if (result.constructor === $Uint8Array) { + controller.enqueue(result); + } + + if (closer[0] || result === false) { + $enqueueJob(callClose, controller); + closer[0] = false; + } + }; + + function createResult(tag, controller, view, closer) { + closer[0] = false; + + var result; + try { + result = pull(tag, view, closer); + } catch (err) { + return controller.error(err); + } + + return handleResult(result, controller, view); + } + + const registry = deinit ? new FinalizationRegistry(deinit) : null; + Prototype = class NativeReadableStreamSource { + constructor(tag, autoAllocateChunkSize, drainValue) { + this.#tag = tag; + this.#cancellationToken = {}; + this.pull = this.#pull.bind(this); + this.cancel = this.#cancel.bind(this); + this.autoAllocateChunkSize = autoAllocateChunkSize; + + if (drainValue !== undefined) { + this.start = controller => { + controller.enqueue(drainValue); + }; + } + + if (registry) { + registry.register(this, tag, this.#cancellationToken); + } + } + + #cancellationToken; + pull; + cancel; + start; + + #tag; + type = "bytes"; + autoAllocateChunkSize = 0; + + static startSync = start; + + #pull(controller) { + var tag = this.#tag; + + if (!tag) { + controller.close(); + return; + } + + createResult(tag, controller, controller.byobRequest.view, closer); + } + + #cancel(reason) { + var tag = this.#tag; + + registry && registry.unregister(this.#cancellationToken); + setRefOrUnref && setRefOrUnref(tag, false); + cancel(tag, reason); + } + static deinit = deinit; + static drain = drain; + }; + $lazyStreamPrototypeMap.$set(nativeType, Prototype); + } + + const chunkSize = Prototype.startSync(nativePtr, autoAllocateChunkSize); + var drainValue; + const { drain: drainFn, deinit: deinitFn } = Prototype; + if (drainFn) { + drainValue = drainFn(nativePtr); + } + + // empty file, no need for native back-and-forth on this + if (chunkSize === 0) { + deinit && nativePtr && $enqueueJob(deinit, nativePtr); + + if ((drainValue?.byteLength ?? 0) > 0) { + return { + start(controller) { + controller.enqueue(drainValue); + controller.close(); + }, + type: "bytes", + }; + } + + return { + start(controller) { + controller.close(); + }, + type: "bytes", + }; + } + + return new Prototype(nativePtr, chunkSize, drainValue); +} + +export function readableStreamIntoArray(stream) { + var reader = stream.getReader(); + var manyResult = reader.readMany(); + + async function processManyResult(result) { + if (result.done) { + return []; + } + + var chunks = result.value || []; + + while (true) { + var thisResult = await reader.read(); + if (thisResult.done) { + break; + } + chunks = chunks.concat(thisResult.value); + } + + return chunks; + } + + if (manyResult && $isPromise(manyResult)) { + return manyResult.$then(processManyResult); + } + + return processManyResult(manyResult); +} + +export function readableStreamIntoText(stream) { + const [textStream, closer] = $createTextStream($getByIdDirectPrivate(stream, "highWaterMark")); + const prom = $readStreamIntoSink(stream, textStream, false); + if (prom && $isPromise(prom)) { + return Promise.$resolve(prom).$then(closer.$promise); + } + return closer.$promise; +} + +export function readableStreamToArrayBufferDirect(stream, underlyingSource) { + var sink = new $Bun.ArrayBufferSink(); + $putByIdDirectPrivate(stream, "underlyingSource", undefined); + var highWaterMark = $getByIdDirectPrivate(stream, "highWaterMark"); + sink.start(highWaterMark ? { highWaterMark } : {}); + var capability = $newPromiseCapability(Promise); + var ended = false; + var pull = underlyingSource.pull; + var close = underlyingSource.close; + + var controller = { + start() {}, + close(reason) { + if (!ended) { + ended = true; + if (close) { + close(); + } + + $fulfillPromise(capability.$promise, sink.end()); + } + }, + end() { + if (!ended) { + ended = true; + if (close) { + close(); + } + $fulfillPromise(capability.$promise, sink.end()); + } + }, + flush() { + return 0; + }, + write: sink.write.bind(sink), + }; + + var didError = false; + try { + const firstPull = pull(controller); + if (firstPull && $isObject(firstPull) && $isPromise(firstPull)) { + return (async function (controller, promise, pull) { + while (!ended) { + await pull(controller); + } + return await promise; + })(controller, promise, pull); + } + + return capability.$promise; + } catch (e) { + didError = true; + $readableStreamError(stream, e); + return Promise.$reject(e); + } finally { + if (!didError && stream) $readableStreamClose(stream); + controller = close = sink = pull = stream = undefined; + } +} + +export async function readableStreamToTextDirect(stream, underlyingSource) { + const capability = $initializeTextStream.$call(stream, underlyingSource, undefined); + var reader = stream.getReader(); + + while ($getByIdDirectPrivate(stream, "state") === $streamReadable) { + var thisResult = await reader.read(); + if (thisResult.done) { + break; + } + } + + try { + reader.releaseLock(); + } catch (e) {} + reader = undefined; + stream = undefined; + + return capability.$promise; +} + +export async function readableStreamToArrayDirect(stream, underlyingSource) { + const capability = $initializeArrayStream.$call(stream, underlyingSource, undefined); + underlyingSource = undefined; + var reader = stream.getReader(); + try { + while ($getByIdDirectPrivate(stream, "state") === $streamReadable) { + var thisResult = await reader.read(); + if (thisResult.done) { + break; + } + } + + try { + reader.releaseLock(); + } catch (e) {} + reader = undefined; + + return Promise.$resolve(capability.$promise); + } catch (e) { + throw e; + } finally { + stream = undefined; + reader = undefined; + } + + return capability.$promise; +} + +export function readableStreamDefineLazyIterators(prototype) { + var asyncIterator = globalThis.Symbol.asyncIterator; + + var ReadableStreamAsyncIterator = async function* ReadableStreamAsyncIterator(stream, preventCancel) { + var reader = stream.getReader(); + var deferredError; + try { + while (true) { + var done, value; + const firstResult = reader.readMany(); + if ($isPromise(firstResult)) { + ({ done, value } = await firstResult); + } else { + ({ done, value } = firstResult); + } + + if (done) { + return; + } + yield* value; + } + } catch (e) { + deferredError = e; + } finally { + reader.releaseLock(); + + if (!preventCancel) { + stream.cancel(deferredError); + } + + if (deferredError) { + throw deferredError; + } + } + }; + var createAsyncIterator = function asyncIterator() { + return ReadableStreamAsyncIterator(this, false); + }; + var createValues = function values({ preventCancel = false } = { preventCancel: false }) { + return ReadableStreamAsyncIterator(this, preventCancel); + }; + $Object.$defineProperty(prototype, asyncIterator, { value: createAsyncIterator }); + $Object.$defineProperty(prototype, "values", { value: createValues }); + return prototype; +} diff --git a/src/bun.js/builtins/ts/StreamInternals.ts b/src/bun.js/builtins/ts/StreamInternals.ts new file mode 100644 index 000000000..b42dc2f57 --- /dev/null +++ b/src/bun.js/builtins/ts/StreamInternals.ts @@ -0,0 +1,268 @@ +/* + * Copyright (C) 2015 Canon Inc. + * Copyright (C) 2015 Igalia. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY APPLE INC. ``AS IS'' AND ANY + * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR + * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL APPLE INC. OR + * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, + * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, + * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR + * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY + * OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +// @internal + +export function markPromiseAsHandled(promise: Promise<unknown>) { + $assert($isPromise(promise)); + $putPromiseInternalField( + promise, + $promiseFieldFlags, + $getPromiseInternalField(promise, $promiseFieldFlags) | $promiseFlagsIsHandled, + ); +} + +export function shieldingPromiseResolve(result) { + const promise = Promise.$resolve(result); + if (promise.$then === undefined) promise.$then = Promise.prototype.$then; + return promise; +} + +export function promiseInvokeOrNoopMethodNoCatch(object, method, args) { + if (method === undefined) return Promise.$resolve(); + return $shieldingPromiseResolve(method.$apply(object, args)); +} + +export function promiseInvokeOrNoopNoCatch(object, key, args) { + return $promiseInvokeOrNoopMethodNoCatch(object, object[key], args); +} + +export function promiseInvokeOrNoopMethod(object, method, args) { + try { + return $promiseInvokeOrNoopMethodNoCatch(object, method, args); + } catch (error) { + return Promise.$reject(error); + } +} + +export function promiseInvokeOrNoop(object, key, args) { + try { + return $promiseInvokeOrNoopNoCatch(object, key, args); + } catch (error) { + return Promise.$reject(error); + } +} + +export function promiseInvokeOrFallbackOrNoop(object, key1, args1, key2, args2) { + try { + const method = object[key1]; + if (method === undefined) return $promiseInvokeOrNoopNoCatch(object, key2, args2); + return $shieldingPromiseResolve(method.$apply(object, args1)); + } catch (error) { + return Promise.$reject(error); + } +} + +export function validateAndNormalizeQueuingStrategy(size, highWaterMark) { + if (size !== undefined && typeof size !== "function") throw new TypeError("size parameter must be a function"); + + const newHighWaterMark = $toNumber(highWaterMark); + + if (isNaN(newHighWaterMark) || newHighWaterMark < 0) + throw new RangeError("highWaterMark value is negative or not a number"); + + return { size: size, highWaterMark: newHighWaterMark }; +} + +$linkTimeConstant; +export function createFIFO() { + var slice = Array.prototype.slice; + + class Denqueue { + constructor() { + this._head = 0; + this._tail = 0; + // this._capacity = 0; + this._capacityMask = 0x3; + this._list = $newArrayWithSize(4); + } + + _head; + _tail; + _capacityMask; + _list; + + size() { + if (this._head === this._tail) return 0; + if (this._head < this._tail) return this._tail - this._head; + else return this._capacityMask + 1 - (this._head - this._tail); + } + + isEmpty() { + return this.size() == 0; + } + + isNotEmpty() { + return this.size() > 0; + } + + shift() { + var { _head: head, _tail, _list, _capacityMask } = this; + if (head === _tail) return undefined; + var item = _list[head]; + $putByValDirect(_list, head, undefined); + head = this._head = (head + 1) & _capacityMask; + if (head < 2 && _tail > 10000 && _tail <= _list.length >>> 2) this._shrinkArray(); + return item; + } + + peek() { + if (this._head === this._tail) return undefined; + return this._list[this._head]; + } + + push(item) { + var tail = this._tail; + $putByValDirect(this._list, tail, item); + this._tail = (tail + 1) & this._capacityMask; + if (this._tail === this._head) { + this._growArray(); + } + // if (this._capacity && this.size() > this._capacity) { + // this.shift(); + // } + } + + toArray(fullCopy) { + var list = this._list; + var len = $toLength(list.length); + + if (fullCopy || this._head > this._tail) { + var _head = $toLength(this._head); + var _tail = $toLength(this._tail); + var total = $toLength(len - _head + _tail); + var array = $newArrayWithSize(total); + var j = 0; + for (var i = _head; i < len; i++) $putByValDirect(array, j++, list[i]); + for (var i = 0; i < _tail; i++) $putByValDirect(array, j++, list[i]); + return array; + } else { + return slice.$call(list, this._head, this._tail); + } + } + + clear() { + this._head = 0; + this._tail = 0; + this._list.fill(undefined); + } + + _growArray() { + if (this._head) { + // copy existing data, head to end, then beginning to tail. + this._list = this.toArray(true); + this._head = 0; + } + + // head is at 0 and array is now full, safe to extend + this._tail = $toLength(this._list.length); + + this._list.length <<= 1; + this._capacityMask = (this._capacityMask << 1) | 1; + } + + shrinkArray() { + this._list.length >>>= 1; + this._capacityMask >>>= 1; + } + } + + return new Denqueue(); +} + +export function newQueue() { + return { content: $createFIFO(), size: 0 }; +} + +export function dequeueValue(queue) { + const record = queue.content.shift(); + queue.size -= record.size; + // As described by spec, below case may occur due to rounding errors. + if (queue.size < 0) queue.size = 0; + return record.value; +} + +export function enqueueValueWithSize(queue, value, size) { + size = $toNumber(size); + if (!isFinite(size) || size < 0) throw new RangeError("size has an incorrect value"); + + queue.content.push({ value, size }); + queue.size += size; +} + +export function peekQueueValue(queue) { + return queue.content.peek()?.value; +} + +export function resetQueue(queue) { + $assert("content" in queue); + $assert("size" in queue); + queue.content.clear(); + queue.size = 0; +} + +export function extractSizeAlgorithm(strategy) { + const sizeAlgorithm = strategy.size; + + if (sizeAlgorithm === undefined) return () => 1; + + if (typeof sizeAlgorithm !== "function") throw new TypeError("strategy.size must be a function"); + + return chunk => { + return sizeAlgorithm(chunk); + }; +} + +export function extractHighWaterMark(strategy, defaultHWM) { + const highWaterMark = strategy.highWaterMark; + + if (highWaterMark === undefined) return defaultHWM; + + if (isNaN(highWaterMark) || highWaterMark < 0) + throw new RangeError("highWaterMark value is negative or not a number"); + + return $toNumber(highWaterMark); +} + +export function extractHighWaterMarkFromQueuingStrategyInit(init: { highWaterMark?: number }) { + if (!$isObject(init)) throw new TypeError("QueuingStrategyInit argument must be an object."); + const { highWaterMark } = init; + if (highWaterMark === undefined) throw new TypeError("QueuingStrategyInit.highWaterMark member is required."); + + return $toNumber(highWaterMark); +} + +export function createFulfilledPromise(value) { + const promise = $newPromise(); + $fulfillPromise(promise, value); + return promise; +} + +export function toDictionary(value, defaultValue, errorMessage) { + if (value === undefined || value === null) return defaultValue; + if (!$isObject(value)) throw new TypeError(errorMessage); + return value; +} diff --git a/src/bun.js/builtins/ts/TransformStream.ts b/src/bun.js/builtins/ts/TransformStream.ts new file mode 100644 index 000000000..54467db39 --- /dev/null +++ b/src/bun.js/builtins/ts/TransformStream.ts @@ -0,0 +1,106 @@ +/* + * Copyright (C) 2020 Apple Inc. All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY APPLE INC. AND ITS CONTRIBUTORS ``AS IS'' + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, + * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR + * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL APPLE INC. OR ITS CONTRIBUTORS + * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF + * THE POSSIBILITY OF SUCH DAMAGE. + */ + +export function initializeTransformStream(this) { + let transformer = arguments[0]; + + // This is the path for CreateTransformStream. + if ($isObject(transformer) && $getByIdDirectPrivate(transformer, "TransformStream")) return this; + + let writableStrategy = arguments[1]; + let readableStrategy = arguments[2]; + + if (transformer === undefined) transformer = null; + + if (readableStrategy === undefined) readableStrategy = {}; + + if (writableStrategy === undefined) writableStrategy = {}; + + let transformerDict = {}; + if (transformer !== null) { + if ("start" in transformer) { + transformerDict["start"] = transformer["start"]; + if (typeof transformerDict["start"] !== "function") $throwTypeError("transformer.start should be a function"); + } + if ("transform" in transformer) { + transformerDict["transform"] = transformer["transform"]; + if (typeof transformerDict["transform"] !== "function") + $throwTypeError("transformer.transform should be a function"); + } + if ("flush" in transformer) { + transformerDict["flush"] = transformer["flush"]; + if (typeof transformerDict["flush"] !== "function") $throwTypeError("transformer.flush should be a function"); + } + + if ("readableType" in transformer) throw new RangeError("TransformStream transformer has a readableType"); + if ("writableType" in transformer) throw new RangeError("TransformStream transformer has a writableType"); + } + + const readableHighWaterMark = $extractHighWaterMark(readableStrategy, 0); + const readableSizeAlgorithm = $extractSizeAlgorithm(readableStrategy); + + const writableHighWaterMark = $extractHighWaterMark(writableStrategy, 1); + const writableSizeAlgorithm = $extractSizeAlgorithm(writableStrategy); + + const startPromiseCapability = $newPromiseCapability(Promise); + $initializeTransformStream( + this, + startPromiseCapability.$promise, + writableHighWaterMark, + writableSizeAlgorithm, + readableHighWaterMark, + readableSizeAlgorithm, + ); + $setUpTransformStreamDefaultControllerFromTransformer(this, transformer, transformerDict); + + if ("start" in transformerDict) { + const controller = $getByIdDirectPrivate(this, "controller"); + const startAlgorithm = () => $promiseInvokeOrNoopMethodNoCatch(transformer, transformerDict["start"], [controller]); + startAlgorithm().$then( + () => { + // FIXME: We probably need to resolve start promise with the result of the start algorithm. + startPromiseCapability.$resolve.$call(); + }, + error => { + startPromiseCapability.$reject.$call(undefined, error); + }, + ); + } else startPromiseCapability.$resolve.$call(); + + return this; +} + +$getter; +export function readable() { + if (!$isTransformStream(this)) throw $makeThisTypeError("TransformStream", "readable"); + + return $getByIdDirectPrivate(this, "readable"); +} + +export function writable() { + if (!$isTransformStream(this)) throw $makeThisTypeError("TransformStream", "writable"); + + return $getByIdDirectPrivate(this, "writable"); +} diff --git a/src/bun.js/builtins/ts/TransformStreamDefaultController.ts b/src/bun.js/builtins/ts/TransformStreamDefaultController.ts new file mode 100644 index 000000000..1045498b8 --- /dev/null +++ b/src/bun.js/builtins/ts/TransformStreamDefaultController.ts @@ -0,0 +1,60 @@ +/* + * Copyright (C) 2020 Apple Inc. All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY APPLE INC. AND ITS CONTRIBUTORS ``AS IS'' + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, + * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR + * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL APPLE INC. OR ITS CONTRIBUTORS + * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF + * THE POSSIBILITY OF SUCH DAMAGE. + */ + +export function initializeTransformStreamDefaultController(this) { + return this; +} + +$getter; +export function desiredSize(this) { + if (!$isTransformStreamDefaultController(this)) + throw $makeThisTypeError("TransformStreamDefaultController", "enqueue"); + + const stream = $getByIdDirectPrivate(this, "stream"); + const readable = $getByIdDirectPrivate(stream, "readable"); + const readableController = $getByIdDirectPrivate(readable, "readableStreamController"); + + return $readableStreamDefaultControllerGetDesiredSize(readableController); +} + +export function enqueue(this, chunk) { + if (!$isTransformStreamDefaultController(this)) + throw $makeThisTypeError("TransformStreamDefaultController", "enqueue"); + + $transformStreamDefaultControllerEnqueue(this, chunk); +} + +export function error(this, e) { + if (!$isTransformStreamDefaultController(this)) throw $makeThisTypeError("TransformStreamDefaultController", "error"); + + $transformStreamDefaultControllerError(this, e); +} + +export function terminate(this) { + if (!$isTransformStreamDefaultController(this)) + throw $makeThisTypeError("TransformStreamDefaultController", "terminate"); + + $transformStreamDefaultControllerTerminate(this); +} diff --git a/src/bun.js/builtins/ts/TransformStreamInternals.ts b/src/bun.js/builtins/ts/TransformStreamInternals.ts new file mode 100644 index 000000000..9994d1282 --- /dev/null +++ b/src/bun.js/builtins/ts/TransformStreamInternals.ts @@ -0,0 +1,348 @@ +/* + * Copyright (C) 2020 Apple Inc. All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY APPLE INC. AND ITS CONTRIBUTORS ``AS IS'' + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, + * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR + * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL APPLE INC. OR ITS CONTRIBUTORS + * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF + * THE POSSIBILITY OF SUCH DAMAGE. + */ + +// @internal + +export function isTransformStream(stream) { + return $isObject(stream) && !!$getByIdDirectPrivate(stream, "readable"); +} + +export function isTransformStreamDefaultController(controller) { + return $isObject(controller) && !!$getByIdDirectPrivate(controller, "transformAlgorithm"); +} + +export function createTransformStream( + startAlgorithm, + transformAlgorithm, + flushAlgorithm, + writableHighWaterMark, + writableSizeAlgorithm, + readableHighWaterMark, + readableSizeAlgorithm, +) { + if (writableHighWaterMark === undefined) writableHighWaterMark = 1; + if (writableSizeAlgorithm === undefined) writableSizeAlgorithm = () => 1; + if (readableHighWaterMark === undefined) readableHighWaterMark = 0; + if (readableSizeAlgorithm === undefined) readableSizeAlgorithm = () => 1; + $assert(writableHighWaterMark >= 0); + $assert(readableHighWaterMark >= 0); + + const transform = {}; + $putByIdDirectPrivate(transform, "TransformStream", true); + + const stream = new TransformStream(transform); + const startPromiseCapability = $newPromiseCapability(Promise); + $initializeTransformStream( + stream, + startPromiseCapability.$promise, + writableHighWaterMark, + writableSizeAlgorithm, + readableHighWaterMark, + readableSizeAlgorithm, + ); + + const controller = new TransformStreamDefaultController(); + $setUpTransformStreamDefaultController(stream, controller, transformAlgorithm, flushAlgorithm); + + startAlgorithm().$then( + () => { + startPromiseCapability.$resolve.$call(); + }, + error => { + startPromiseCapability.$reject.$call(undefined, error); + }, + ); + + return stream; +} + +export function initializeTransformStream( + stream, + startPromise, + writableHighWaterMark, + writableSizeAlgorithm, + readableHighWaterMark, + readableSizeAlgorithm, +) { + const startAlgorithm = () => { + return startPromise; + }; + const writeAlgorithm = chunk => { + return $transformStreamDefaultSinkWriteAlgorithm(stream, chunk); + }; + const abortAlgorithm = reason => { + return $transformStreamDefaultSinkAbortAlgorithm(stream, reason); + }; + const closeAlgorithm = () => { + return $transformStreamDefaultSinkCloseAlgorithm(stream); + }; + const writable = $createWritableStream( + startAlgorithm, + writeAlgorithm, + closeAlgorithm, + abortAlgorithm, + writableHighWaterMark, + writableSizeAlgorithm, + ); + + const pullAlgorithm = () => { + return $transformStreamDefaultSourcePullAlgorithm(stream); + }; + const cancelAlgorithm = reason => { + $transformStreamErrorWritableAndUnblockWrite(stream, reason); + return Promise.$resolve(); + }; + const underlyingSource = {}; + $putByIdDirectPrivate(underlyingSource, "start", startAlgorithm); + $putByIdDirectPrivate(underlyingSource, "pull", pullAlgorithm); + $putByIdDirectPrivate(underlyingSource, "cancel", cancelAlgorithm); + const options = {}; + $putByIdDirectPrivate(options, "size", readableSizeAlgorithm); + $putByIdDirectPrivate(options, "highWaterMark", readableHighWaterMark); + const readable = new ReadableStream(underlyingSource, options); + + // The writable to expose to JS through writable getter. + $putByIdDirectPrivate(stream, "writable", writable); + // The writable to use for the actual transform algorithms. + $putByIdDirectPrivate(stream, "internalWritable", $getInternalWritableStream(writable)); + + $putByIdDirectPrivate(stream, "readable", readable); + $putByIdDirectPrivate(stream, "backpressure", undefined); + $putByIdDirectPrivate(stream, "backpressureChangePromise", undefined); + + $transformStreamSetBackpressure(stream, true); + $putByIdDirectPrivate(stream, "controller", undefined); +} + +export function transformStreamError(stream, e) { + const readable = $getByIdDirectPrivate(stream, "readable"); + const readableController = $getByIdDirectPrivate(readable, "readableStreamController"); + $readableStreamDefaultControllerError(readableController, e); + + $transformStreamErrorWritableAndUnblockWrite(stream, e); +} + +export function transformStreamErrorWritableAndUnblockWrite(stream, e) { + $transformStreamDefaultControllerClearAlgorithms($getByIdDirectPrivate(stream, "controller")); + + const writable = $getByIdDirectPrivate(stream, "internalWritable"); + $writableStreamDefaultControllerErrorIfNeeded($getByIdDirectPrivate(writable, "controller"), e); + + if ($getByIdDirectPrivate(stream, "backpressure")) $transformStreamSetBackpressure(stream, false); +} + +export function transformStreamSetBackpressure(stream, backpressure) { + $assert($getByIdDirectPrivate(stream, "backpressure") !== backpressure); + + const backpressureChangePromise = $getByIdDirectPrivate(stream, "backpressureChangePromise"); + if (backpressureChangePromise !== undefined) backpressureChangePromise.$resolve.$call(); + + $putByIdDirectPrivate(stream, "backpressureChangePromise", $newPromiseCapability(Promise)); + $putByIdDirectPrivate(stream, "backpressure", backpressure); +} + +export function setUpTransformStreamDefaultController(stream, controller, transformAlgorithm, flushAlgorithm) { + $assert($isTransformStream(stream)); + $assert($getByIdDirectPrivate(stream, "controller") === undefined); + + $putByIdDirectPrivate(controller, "stream", stream); + $putByIdDirectPrivate(stream, "controller", controller); + $putByIdDirectPrivate(controller, "transformAlgorithm", transformAlgorithm); + $putByIdDirectPrivate(controller, "flushAlgorithm", flushAlgorithm); +} + +export function setUpTransformStreamDefaultControllerFromTransformer(stream, transformer, transformerDict) { + const controller = new TransformStreamDefaultController(); + let transformAlgorithm = chunk => { + try { + $transformStreamDefaultControllerEnqueue(controller, chunk); + } catch (e) { + return Promise.$reject(e); + } + return Promise.$resolve(); + }; + let flushAlgorithm = () => { + return Promise.$resolve(); + }; + + if ("transform" in transformerDict) + transformAlgorithm = chunk => { + return $promiseInvokeOrNoopMethod(transformer, transformerDict["transform"], [chunk, controller]); + }; + + if ("flush" in transformerDict) { + flushAlgorithm = () => { + return $promiseInvokeOrNoopMethod(transformer, transformerDict["flush"], [controller]); + }; + } + + $setUpTransformStreamDefaultController(stream, controller, transformAlgorithm, flushAlgorithm); +} + +export function transformStreamDefaultControllerClearAlgorithms(controller) { + // We set transformAlgorithm to true to allow GC but keep the isTransformStreamDefaultController check. + $putByIdDirectPrivate(controller, "transformAlgorithm", true); + $putByIdDirectPrivate(controller, "flushAlgorithm", undefined); +} + +export function transformStreamDefaultControllerEnqueue(controller, chunk) { + const stream = $getByIdDirectPrivate(controller, "stream"); + const readable = $getByIdDirectPrivate(stream, "readable"); + const readableController = $getByIdDirectPrivate(readable, "readableStreamController"); + + $assert(readableController !== undefined); + if (!$readableStreamDefaultControllerCanCloseOrEnqueue(readableController)) + $throwTypeError("TransformStream.readable cannot close or enqueue"); + + try { + $readableStreamDefaultControllerEnqueue(readableController, chunk); + } catch (e) { + $transformStreamErrorWritableAndUnblockWrite(stream, e); + throw $getByIdDirectPrivate(readable, "storedError"); + } + + const backpressure = !$readableStreamDefaultControllerShouldCallPull(readableController); + if (backpressure !== $getByIdDirectPrivate(stream, "backpressure")) { + $assert(backpressure); + $transformStreamSetBackpressure(stream, true); + } +} + +export function transformStreamDefaultControllerError(controller, e) { + $transformStreamError($getByIdDirectPrivate(controller, "stream"), e); +} + +export function transformStreamDefaultControllerPerformTransform(controller, chunk) { + const promiseCapability = $newPromiseCapability(Promise); + + const transformPromise = $getByIdDirectPrivate(controller, "transformAlgorithm").$call(undefined, chunk); + transformPromise.$then( + () => { + promiseCapability.$resolve(); + }, + r => { + $transformStreamError($getByIdDirectPrivate(controller, "stream"), r); + promiseCapability.$reject.$call(undefined, r); + }, + ); + return promiseCapability.$promise; +} + +export function transformStreamDefaultControllerTerminate(controller) { + const stream = $getByIdDirectPrivate(controller, "stream"); + const readable = $getByIdDirectPrivate(stream, "readable"); + const readableController = $getByIdDirectPrivate(readable, "readableStreamController"); + + // FIXME: Update readableStreamDefaultControllerClose to make this check. + if ($readableStreamDefaultControllerCanCloseOrEnqueue(readableController)) + $readableStreamDefaultControllerClose(readableController); + const error = $makeTypeError("the stream has been terminated"); + $transformStreamErrorWritableAndUnblockWrite(stream, error); +} + +export function transformStreamDefaultSinkWriteAlgorithm(stream, chunk) { + const writable = $getByIdDirectPrivate(stream, "internalWritable"); + + $assert($getByIdDirectPrivate(writable, "state") === "writable"); + + const controller = $getByIdDirectPrivate(stream, "controller"); + + if ($getByIdDirectPrivate(stream, "backpressure")) { + const promiseCapability = $newPromiseCapability(Promise); + + const backpressureChangePromise = $getByIdDirectPrivate(stream, "backpressureChangePromise"); + $assert(backpressureChangePromise !== undefined); + backpressureChangePromise.$promise.$then( + () => { + const state = $getByIdDirectPrivate(writable, "state"); + if (state === "erroring") { + promiseCapability.$reject.$call(undefined, $getByIdDirectPrivate(writable, "storedError")); + return; + } + + $assert(state === "writable"); + $transformStreamDefaultControllerPerformTransform(controller, chunk).$then( + () => { + promiseCapability.$resolve(); + }, + e => { + promiseCapability.$reject.$call(undefined, e); + }, + ); + }, + e => { + promiseCapability.$reject.$call(undefined, e); + }, + ); + + return promiseCapability.$promise; + } + return $transformStreamDefaultControllerPerformTransform(controller, chunk); +} + +export function transformStreamDefaultSinkAbortAlgorithm(stream, reason) { + $transformStreamError(stream, reason); + return Promise.$resolve(); +} + +export function transformStreamDefaultSinkCloseAlgorithm(stream) { + const readable = $getByIdDirectPrivate(stream, "readable"); + const controller = $getByIdDirectPrivate(stream, "controller"); + const readableController = $getByIdDirectPrivate(readable, "readableStreamController"); + + const flushAlgorithm = $getByIdDirectPrivate(controller, "flushAlgorithm"); + $assert(flushAlgorithm !== undefined); + const flushPromise = $getByIdDirectPrivate(controller, "flushAlgorithm").$call(); + $transformStreamDefaultControllerClearAlgorithms(controller); + + const promiseCapability = $newPromiseCapability(Promise); + flushPromise.$then( + () => { + if ($getByIdDirectPrivate(readable, "state") === $streamErrored) { + promiseCapability.$reject.$call(undefined, $getByIdDirectPrivate(readable, "storedError")); + return; + } + + // FIXME: Update readableStreamDefaultControllerClose to make this check. + if ($readableStreamDefaultControllerCanCloseOrEnqueue(readableController)) + $readableStreamDefaultControllerClose(readableController); + promiseCapability.$resolve(); + }, + r => { + $transformStreamError($getByIdDirectPrivate(controller, "stream"), r); + promiseCapability.$reject.$call(undefined, $getByIdDirectPrivate(readable, "storedError")); + }, + ); + return promiseCapability.$promise; +} + +export function transformStreamDefaultSourcePullAlgorithm(stream) { + $assert($getByIdDirectPrivate(stream, "backpressure")); + $assert($getByIdDirectPrivate(stream, "backpressureChangePromise") !== undefined); + + $transformStreamSetBackpressure(stream, false); + + return $getByIdDirectPrivate(stream, "backpressureChangePromise").$promise; +} diff --git a/src/bun.js/builtins/ts/WritableStreamDefaultController.ts b/src/bun.js/builtins/ts/WritableStreamDefaultController.ts new file mode 100644 index 000000000..1a3ddc290 --- /dev/null +++ b/src/bun.js/builtins/ts/WritableStreamDefaultController.ts @@ -0,0 +1,48 @@ +/* + * Copyright (C) 2020 Apple Inc. All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY APPLE INC. AND ITS CONTRIBUTORS ``AS IS'' + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, + * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR + * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL APPLE INC. OR ITS CONTRIBUTORS + * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF + * THE POSSIBILITY OF SUCH DAMAGE. + */ + +export function initializeWritableStreamDefaultController(this) { + $putByIdDirectPrivate(this, "queue", $newQueue()); + $putByIdDirectPrivate(this, "abortSteps", reason => { + const result = $getByIdDirectPrivate(this, "abortAlgorithm").$call(undefined, reason); + $writableStreamDefaultControllerClearAlgorithms(this); + return result; + }); + + $putByIdDirectPrivate(this, "errorSteps", () => { + $resetQueue($getByIdDirectPrivate(this, "queue")); + }); + + return this; +} + +export function error(this, e) { + if ($getByIdDirectPrivate(this, "abortSteps") === undefined) + throw $makeThisTypeError("WritableStreamDefaultController", "error"); + + const stream = $getByIdDirectPrivate(this, "stream"); + if ($getByIdDirectPrivate(stream, "state") !== "writable") return; + $writableStreamDefaultControllerError(this, e); +} diff --git a/src/bun.js/builtins/ts/WritableStreamDefaultWriter.ts b/src/bun.js/builtins/ts/WritableStreamDefaultWriter.ts new file mode 100644 index 000000000..795b43892 --- /dev/null +++ b/src/bun.js/builtins/ts/WritableStreamDefaultWriter.ts @@ -0,0 +1,104 @@ +/* + * Copyright (C) 2020 Apple Inc. All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY APPLE INC. AND ITS CONTRIBUTORS ``AS IS'' + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, + * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR + * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL APPLE INC. OR ITS CONTRIBUTORS + * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF + * THE POSSIBILITY OF SUCH DAMAGE. + */ + +export function initializeWritableStreamDefaultWriter(stream) { + // stream can be a WritableStream if WritableStreamDefaultWriter constructor is called directly from JS + // or an InternalWritableStream in other code paths. + const internalStream = $getInternalWritableStream(stream); + if (internalStream) stream = internalStream; + + if (!$isWritableStream(stream)) $throwTypeError("WritableStreamDefaultWriter constructor takes a WritableStream"); + + $setUpWritableStreamDefaultWriter(this, stream); + return this; +} + +$getter; +export function closed() { + if (!$isWritableStreamDefaultWriter(this)) + return Promise.$reject($makeGetterTypeError("WritableStreamDefaultWriter", "closed")); + + return $getByIdDirectPrivate(this, "closedPromise").$promise; +} + +$getter; +export function desiredSize() { + if (!$isWritableStreamDefaultWriter(this)) throw $makeThisTypeError("WritableStreamDefaultWriter", "desiredSize"); + + if ($getByIdDirectPrivate(this, "stream") === undefined) $throwTypeError("WritableStreamDefaultWriter has no stream"); + + return $writableStreamDefaultWriterGetDesiredSize(this); +} + +$getter; +export function ready() { + if (!$isWritableStreamDefaultWriter(this)) + return Promise.$reject($makeThisTypeError("WritableStreamDefaultWriter", "ready")); + + return $getByIdDirectPrivate(this, "readyPromise").$promise; +} + +export function abort(reason) { + if (!$isWritableStreamDefaultWriter(this)) + return Promise.$reject($makeThisTypeError("WritableStreamDefaultWriter", "abort")); + + if ($getByIdDirectPrivate(this, "stream") === undefined) + return Promise.$reject($makeTypeError("WritableStreamDefaultWriter has no stream")); + + return $writableStreamDefaultWriterAbort(this, reason); +} + +export function close() { + if (!$isWritableStreamDefaultWriter(this)) + return Promise.$reject($makeThisTypeError("WritableStreamDefaultWriter", "close")); + + const stream = $getByIdDirectPrivate(this, "stream"); + if (stream === undefined) return Promise.$reject($makeTypeError("WritableStreamDefaultWriter has no stream")); + + if ($writableStreamCloseQueuedOrInFlight(stream)) + return Promise.$reject($makeTypeError("WritableStreamDefaultWriter is being closed")); + + return $writableStreamDefaultWriterClose(this); +} + +export function releaseLock() { + if (!$isWritableStreamDefaultWriter(this)) throw $makeThisTypeError("WritableStreamDefaultWriter", "releaseLock"); + + const stream = $getByIdDirectPrivate(this, "stream"); + if (stream === undefined) return; + + $assert($getByIdDirectPrivate(stream, "writer") !== undefined); + $writableStreamDefaultWriterRelease(this); +} + +export function write(chunk) { + if (!$isWritableStreamDefaultWriter(this)) + return Promise.$reject($makeThisTypeError("WritableStreamDefaultWriter", "write")); + + if ($getByIdDirectPrivate(this, "stream") === undefined) + return Promise.$reject($makeTypeError("WritableStreamDefaultWriter has no stream")); + + return $writableStreamDefaultWriterWrite(this, chunk); +} diff --git a/src/bun.js/builtins/ts/WritableStreamInternals.ts b/src/bun.js/builtins/ts/WritableStreamInternals.ts new file mode 100644 index 000000000..f436a285e --- /dev/null +++ b/src/bun.js/builtins/ts/WritableStreamInternals.ts @@ -0,0 +1,790 @@ +/* + * Copyright (C) 2015 Canon Inc. + * Copyright (C) 2015 Igalia + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY APPLE INC. ``AS IS'' AND ANY + * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR + * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL APPLE INC. OR + * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, + * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, + * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR + * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY + * OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +// @internal + +export function isWritableStream(stream) { + return $isObject(stream) && !!$getByIdDirectPrivate(stream, "underlyingSink"); +} + +export function isWritableStreamDefaultWriter(writer) { + return $isObject(writer) && !!$getByIdDirectPrivate(writer, "closedPromise"); +} + +export function acquireWritableStreamDefaultWriter(stream) { + return new WritableStreamDefaultWriter(stream); +} + +// https://streams.spec.whatwg.org/#create-writable-stream +export function createWritableStream( + startAlgorithm, + writeAlgorithm, + closeAlgorithm, + abortAlgorithm, + highWaterMark, + sizeAlgorithm, +) { + $assert(typeof highWaterMark === "number" && !isNaN(highWaterMark) && highWaterMark >= 0); + + const internalStream = {}; + $initializeWritableStreamSlots(internalStream, {}); + const controller = new WritableStreamDefaultController(); + + $setUpWritableStreamDefaultController( + internalStream, + controller, + startAlgorithm, + writeAlgorithm, + closeAlgorithm, + abortAlgorithm, + highWaterMark, + sizeAlgorithm, + ); + + return $createWritableStreamFromInternal(internalStream); +} + +export function createInternalWritableStreamFromUnderlyingSink(underlyingSink, strategy) { + const stream = {}; + + if (underlyingSink === undefined) underlyingSink = {}; + + if (strategy === undefined) strategy = {}; + + if (!$isObject(underlyingSink)) $throwTypeError("WritableStream constructor takes an object as first argument"); + + if ("type" in underlyingSink) $throwRangeError("Invalid type is specified"); + + const sizeAlgorithm = $extractSizeAlgorithm(strategy); + const highWaterMark = $extractHighWaterMark(strategy, 1); + + const underlyingSinkDict = {}; + if ("start" in underlyingSink) { + underlyingSinkDict["start"] = underlyingSink["start"]; + if (typeof underlyingSinkDict["start"] !== "function") $throwTypeError("underlyingSink.start should be a function"); + } + if ("write" in underlyingSink) { + underlyingSinkDict["write"] = underlyingSink["write"]; + if (typeof underlyingSinkDict["write"] !== "function") $throwTypeError("underlyingSink.write should be a function"); + } + if ("close" in underlyingSink) { + underlyingSinkDict["close"] = underlyingSink["close"]; + if (typeof underlyingSinkDict["close"] !== "function") $throwTypeError("underlyingSink.close should be a function"); + } + if ("abort" in underlyingSink) { + underlyingSinkDict["abort"] = underlyingSink["abort"]; + if (typeof underlyingSinkDict["abort"] !== "function") $throwTypeError("underlyingSink.abort should be a function"); + } + + $initializeWritableStreamSlots(stream, underlyingSink); + $setUpWritableStreamDefaultControllerFromUnderlyingSink( + stream, + underlyingSink, + underlyingSinkDict, + highWaterMark, + sizeAlgorithm, + ); + + return stream; +} + +export function initializeWritableStreamSlots(stream, underlyingSink) { + $putByIdDirectPrivate(stream, "state", "writable"); + $putByIdDirectPrivate(stream, "storedError", undefined); + $putByIdDirectPrivate(stream, "writer", undefined); + $putByIdDirectPrivate(stream, "controller", undefined); + $putByIdDirectPrivate(stream, "inFlightWriteRequest", undefined); + $putByIdDirectPrivate(stream, "closeRequest", undefined); + $putByIdDirectPrivate(stream, "inFlightCloseRequest", undefined); + $putByIdDirectPrivate(stream, "pendingAbortRequest", undefined); + $putByIdDirectPrivate(stream, "writeRequests", $createFIFO()); + $putByIdDirectPrivate(stream, "backpressure", false); + $putByIdDirectPrivate(stream, "underlyingSink", underlyingSink); +} + +export function writableStreamCloseForBindings(stream) { + if ($isWritableStreamLocked(stream)) + return Promise.$reject($makeTypeError("WritableStream.close method can only be used on non locked WritableStream")); + + if ($writableStreamCloseQueuedOrInFlight(stream)) + return Promise.$reject( + $makeTypeError("WritableStream.close method can only be used on a being close WritableStream"), + ); + + return $writableStreamClose(stream); +} + +export function writableStreamAbortForBindings(stream, reason) { + if ($isWritableStreamLocked(stream)) + return Promise.$reject($makeTypeError("WritableStream.abort method can only be used on non locked WritableStream")); + + return $writableStreamAbort(stream, reason); +} + +export function isWritableStreamLocked(stream) { + return $getByIdDirectPrivate(stream, "writer") !== undefined; +} + +export function setUpWritableStreamDefaultWriter(writer, stream) { + if ($isWritableStreamLocked(stream)) $throwTypeError("WritableStream is locked"); + + $putByIdDirectPrivate(writer, "stream", stream); + $putByIdDirectPrivate(stream, "writer", writer); + + const readyPromiseCapability = $newPromiseCapability(Promise); + const closedPromiseCapability = $newPromiseCapability(Promise); + $putByIdDirectPrivate(writer, "readyPromise", readyPromiseCapability); + $putByIdDirectPrivate(writer, "closedPromise", closedPromiseCapability); + + const state = $getByIdDirectPrivate(stream, "state"); + if (state === "writable") { + if ($writableStreamCloseQueuedOrInFlight(stream) || !$getByIdDirectPrivate(stream, "backpressure")) + readyPromiseCapability.$resolve.$call(); + } else if (state === "erroring") { + readyPromiseCapability.$reject.$call(undefined, $getByIdDirectPrivate(stream, "storedError")); + $markPromiseAsHandled(readyPromiseCapability.$promise); + } else if (state === "closed") { + readyPromiseCapability.$resolve.$call(); + closedPromiseCapability.$resolve.$call(); + } else { + $assert(state === "errored"); + const storedError = $getByIdDirectPrivate(stream, "storedError"); + readyPromiseCapability.$reject.$call(undefined, storedError); + $markPromiseAsHandled(readyPromiseCapability.$promise); + closedPromiseCapability.$reject.$call(undefined, storedError); + $markPromiseAsHandled(closedPromiseCapability.$promise); + } +} + +export function writableStreamAbort(stream, reason) { + const state = $getByIdDirectPrivate(stream, "state"); + if (state === "closed" || state === "errored") return Promise.$resolve(); + + const pendingAbortRequest = $getByIdDirectPrivate(stream, "pendingAbortRequest"); + if (pendingAbortRequest !== undefined) return pendingAbortRequest.promise.$promise; + + $assert(state === "writable" || state === "erroring"); + let wasAlreadyErroring = false; + if (state === "erroring") { + wasAlreadyErroring = true; + reason = undefined; + } + + const abortPromiseCapability = $newPromiseCapability(Promise); + $putByIdDirectPrivate(stream, "pendingAbortRequest", { + promise: abortPromiseCapability, + reason: reason, + wasAlreadyErroring: wasAlreadyErroring, + }); + + if (!wasAlreadyErroring) $writableStreamStartErroring(stream, reason); + return abortPromiseCapability.$promise; +} + +export function writableStreamClose(stream) { + const state = $getByIdDirectPrivate(stream, "state"); + if (state === "closed" || state === "errored") + return Promise.$reject($makeTypeError("Cannot close a writable stream that is closed or errored")); + + $assert(state === "writable" || state === "erroring"); + $assert(!$writableStreamCloseQueuedOrInFlight(stream)); + + const closePromiseCapability = $newPromiseCapability(Promise); + $putByIdDirectPrivate(stream, "closeRequest", closePromiseCapability); + + const writer = $getByIdDirectPrivate(stream, "writer"); + if (writer !== undefined && $getByIdDirectPrivate(stream, "backpressure") && state === "writable") + $getByIdDirectPrivate(writer, "readyPromise").$resolve.$call(); + + $writableStreamDefaultControllerClose($getByIdDirectPrivate(stream, "controller")); + + return closePromiseCapability.$promise; +} + +export function writableStreamAddWriteRequest(stream) { + $assert($isWritableStreamLocked(stream)); + $assert($getByIdDirectPrivate(stream, "state") === "writable"); + + const writePromiseCapability = $newPromiseCapability(Promise); + const writeRequests = $getByIdDirectPrivate(stream, "writeRequests"); + writeRequests.push(writePromiseCapability); + return writePromiseCapability.$promise; +} + +export function writableStreamCloseQueuedOrInFlight(stream) { + return ( + $getByIdDirectPrivate(stream, "closeRequest") !== undefined || + $getByIdDirectPrivate(stream, "inFlightCloseRequest") !== undefined + ); +} + +export function writableStreamDealWithRejection(stream, error) { + const state = $getByIdDirectPrivate(stream, "state"); + if (state === "writable") { + $writableStreamStartErroring(stream, error); + return; + } + + $assert(state === "erroring"); + $writableStreamFinishErroring(stream); +} + +export function writableStreamFinishErroring(stream) { + $assert($getByIdDirectPrivate(stream, "state") === "erroring"); + $assert(!$writableStreamHasOperationMarkedInFlight(stream)); + + $putByIdDirectPrivate(stream, "state", "errored"); + + const controller = $getByIdDirectPrivate(stream, "controller"); + $getByIdDirectPrivate(controller, "errorSteps").$call(); + + const storedError = $getByIdDirectPrivate(stream, "storedError"); + const requests = $getByIdDirectPrivate(stream, "writeRequests"); + for (var request = requests.shift(); request; request = requests.shift()) + request.$reject.$call(undefined, storedError); + + // TODO: is this still necessary? + $putByIdDirectPrivate(stream, "writeRequests", $createFIFO()); + + const abortRequest = $getByIdDirectPrivate(stream, "pendingAbortRequest"); + if (abortRequest === undefined) { + $writableStreamRejectCloseAndClosedPromiseIfNeeded(stream); + return; + } + + $putByIdDirectPrivate(stream, "pendingAbortRequest", undefined); + if (abortRequest.wasAlreadyErroring) { + abortRequest.promise.$reject.$call(undefined, storedError); + $writableStreamRejectCloseAndClosedPromiseIfNeeded(stream); + return; + } + + $getByIdDirectPrivate(controller, "abortSteps") + .$call(undefined, abortRequest.reason) + .$then( + () => { + abortRequest.promise.$resolve.$call(); + $writableStreamRejectCloseAndClosedPromiseIfNeeded(stream); + }, + reason => { + abortRequest.promise.$reject.$call(undefined, reason); + $writableStreamRejectCloseAndClosedPromiseIfNeeded(stream); + }, + ); +} + +export function writableStreamFinishInFlightClose(stream) { + const inFlightCloseRequest = $getByIdDirectPrivate(stream, "inFlightCloseRequest"); + inFlightCloseRequest.$resolve.$call(); + + $putByIdDirectPrivate(stream, "inFlightCloseRequest", undefined); + + const state = $getByIdDirectPrivate(stream, "state"); + $assert(state === "writable" || state === "erroring"); + + if (state === "erroring") { + $putByIdDirectPrivate(stream, "storedError", undefined); + const abortRequest = $getByIdDirectPrivate(stream, "pendingAbortRequest"); + if (abortRequest !== undefined) { + abortRequest.promise.$resolve.$call(); + $putByIdDirectPrivate(stream, "pendingAbortRequest", undefined); + } + } + + $putByIdDirectPrivate(stream, "state", "closed"); + + const writer = $getByIdDirectPrivate(stream, "writer"); + if (writer !== undefined) $getByIdDirectPrivate(writer, "closedPromise").$resolve.$call(); + + $assert($getByIdDirectPrivate(stream, "pendingAbortRequest") === undefined); + $assert($getByIdDirectPrivate(stream, "storedError") === undefined); +} + +export function writableStreamFinishInFlightCloseWithError(stream, error) { + const inFlightCloseRequest = $getByIdDirectPrivate(stream, "inFlightCloseRequest"); + $assert(inFlightCloseRequest !== undefined); + inFlightCloseRequest.$reject.$call(undefined, error); + + $putByIdDirectPrivate(stream, "inFlightCloseRequest", undefined); + + const state = $getByIdDirectPrivate(stream, "state"); + $assert(state === "writable" || state === "erroring"); + + const abortRequest = $getByIdDirectPrivate(stream, "pendingAbortRequest"); + if (abortRequest !== undefined) { + abortRequest.promise.$reject.$call(undefined, error); + $putByIdDirectPrivate(stream, "pendingAbortRequest", undefined); + } + + $writableStreamDealWithRejection(stream, error); +} + +export function writableStreamFinishInFlightWrite(stream) { + const inFlightWriteRequest = $getByIdDirectPrivate(stream, "inFlightWriteRequest"); + $assert(inFlightWriteRequest !== undefined); + inFlightWriteRequest.$resolve.$call(); + + $putByIdDirectPrivate(stream, "inFlightWriteRequest", undefined); +} + +export function writableStreamFinishInFlightWriteWithError(stream, error) { + const inFlightWriteRequest = $getByIdDirectPrivate(stream, "inFlightWriteRequest"); + $assert(inFlightWriteRequest !== undefined); + inFlightWriteRequest.$reject.$call(undefined, error); + + $putByIdDirectPrivate(stream, "inFlightWriteRequest", undefined); + + const state = $getByIdDirectPrivate(stream, "state"); + $assert(state === "writable" || state === "erroring"); + + $writableStreamDealWithRejection(stream, error); +} + +export function writableStreamHasOperationMarkedInFlight(stream) { + return ( + $getByIdDirectPrivate(stream, "inFlightWriteRequest") !== undefined || + $getByIdDirectPrivate(stream, "inFlightCloseRequest") !== undefined + ); +} + +export function writableStreamMarkCloseRequestInFlight(stream) { + const closeRequest = $getByIdDirectPrivate(stream, "closeRequest"); + $assert($getByIdDirectPrivate(stream, "inFlightCloseRequest") === undefined); + $assert(closeRequest !== undefined); + + $putByIdDirectPrivate(stream, "inFlightCloseRequest", closeRequest); + $putByIdDirectPrivate(stream, "closeRequest", undefined); +} + +export function writableStreamMarkFirstWriteRequestInFlight(stream) { + const writeRequests = $getByIdDirectPrivate(stream, "writeRequests"); + $assert($getByIdDirectPrivate(stream, "inFlightWriteRequest") === undefined); + $assert(writeRequests.isNotEmpty()); + + const writeRequest = writeRequests.shift(); + $putByIdDirectPrivate(stream, "inFlightWriteRequest", writeRequest); +} + +export function writableStreamRejectCloseAndClosedPromiseIfNeeded(stream) { + $assert($getByIdDirectPrivate(stream, "state") === "errored"); + + const storedError = $getByIdDirectPrivate(stream, "storedError"); + + const closeRequest = $getByIdDirectPrivate(stream, "closeRequest"); + if (closeRequest !== undefined) { + $assert($getByIdDirectPrivate(stream, "inFlightCloseRequest") === undefined); + closeRequest.$reject.$call(undefined, storedError); + $putByIdDirectPrivate(stream, "closeRequest", undefined); + } + + const writer = $getByIdDirectPrivate(stream, "writer"); + if (writer !== undefined) { + const closedPromise = $getByIdDirectPrivate(writer, "closedPromise"); + closedPromise.$reject.$call(undefined, storedError); + $markPromiseAsHandled(closedPromise.$promise); + } +} + +export function writableStreamStartErroring(stream, reason) { + $assert($getByIdDirectPrivate(stream, "storedError") === undefined); + $assert($getByIdDirectPrivate(stream, "state") === "writable"); + + const controller = $getByIdDirectPrivate(stream, "controller"); + $assert(controller !== undefined); + + $putByIdDirectPrivate(stream, "state", "erroring"); + $putByIdDirectPrivate(stream, "storedError", reason); + + const writer = $getByIdDirectPrivate(stream, "writer"); + if (writer !== undefined) $writableStreamDefaultWriterEnsureReadyPromiseRejected(writer, reason); + + if (!$writableStreamHasOperationMarkedInFlight(stream) && $getByIdDirectPrivate(controller, "started") === 1) + $writableStreamFinishErroring(stream); +} + +export function writableStreamUpdateBackpressure(stream, backpressure) { + $assert($getByIdDirectPrivate(stream, "state") === "writable"); + $assert(!$writableStreamCloseQueuedOrInFlight(stream)); + + const writer = $getByIdDirectPrivate(stream, "writer"); + if (writer !== undefined && backpressure !== $getByIdDirectPrivate(stream, "backpressure")) { + if (backpressure) $putByIdDirectPrivate(writer, "readyPromise", $newPromiseCapability(Promise)); + else $getByIdDirectPrivate(writer, "readyPromise").$resolve.$call(); + } + $putByIdDirectPrivate(stream, "backpressure", backpressure); +} + +export function writableStreamDefaultWriterAbort(writer, reason) { + const stream = $getByIdDirectPrivate(writer, "stream"); + $assert(stream !== undefined); + return $writableStreamAbort(stream, reason); +} + +export function writableStreamDefaultWriterClose(writer) { + const stream = $getByIdDirectPrivate(writer, "stream"); + $assert(stream !== undefined); + return $writableStreamClose(stream); +} + +export function writableStreamDefaultWriterCloseWithErrorPropagation(writer) { + const stream = $getByIdDirectPrivate(writer, "stream"); + $assert(stream !== undefined); + + const state = $getByIdDirectPrivate(stream, "state"); + + if ($writableStreamCloseQueuedOrInFlight(stream) || state === "closed") return Promise.$resolve(); + + if (state === "errored") return Promise.$reject($getByIdDirectPrivate(stream, "storedError")); + + $assert(state === "writable" || state === "erroring"); + return $writableStreamDefaultWriterClose(writer); +} + +export function writableStreamDefaultWriterEnsureClosedPromiseRejected(writer, error) { + let closedPromiseCapability = $getByIdDirectPrivate(writer, "closedPromise"); + let closedPromise = closedPromiseCapability.$promise; + + if (($getPromiseInternalField(closedPromise, $promiseFieldFlags) & $promiseStateMask) !== $promiseStatePending) { + closedPromiseCapability = $newPromiseCapability(Promise); + closedPromise = closedPromiseCapability.$promise; + $putByIdDirectPrivate(writer, "closedPromise", closedPromiseCapability); + } + + closedPromiseCapability.$reject.$call(undefined, error); + $markPromiseAsHandled(closedPromise); +} + +export function writableStreamDefaultWriterEnsureReadyPromiseRejected(writer, error) { + let readyPromiseCapability = $getByIdDirectPrivate(writer, "readyPromise"); + let readyPromise = readyPromiseCapability.$promise; + + if (($getPromiseInternalField(readyPromise, $promiseFieldFlags) & $promiseStateMask) !== $promiseStatePending) { + readyPromiseCapability = $newPromiseCapability(Promise); + readyPromise = readyPromiseCapability.$promise; + $putByIdDirectPrivate(writer, "readyPromise", readyPromiseCapability); + } + + readyPromiseCapability.$reject.$call(undefined, error); + $markPromiseAsHandled(readyPromise); +} + +export function writableStreamDefaultWriterGetDesiredSize(writer) { + const stream = $getByIdDirectPrivate(writer, "stream"); + $assert(stream !== undefined); + + const state = $getByIdDirectPrivate(stream, "state"); + + if (state === "errored" || state === "erroring") return null; + + if (state === "closed") return 0; + + return $writableStreamDefaultControllerGetDesiredSize($getByIdDirectPrivate(stream, "controller")); +} + +export function writableStreamDefaultWriterRelease(writer) { + const stream = $getByIdDirectPrivate(writer, "stream"); + $assert(stream !== undefined); + $assert($getByIdDirectPrivate(stream, "writer") === writer); + + const releasedError = $makeTypeError("writableStreamDefaultWriterRelease"); + + $writableStreamDefaultWriterEnsureReadyPromiseRejected(writer, releasedError); + $writableStreamDefaultWriterEnsureClosedPromiseRejected(writer, releasedError); + + $putByIdDirectPrivate(stream, "writer", undefined); + $putByIdDirectPrivate(writer, "stream", undefined); +} + +export function writableStreamDefaultWriterWrite(writer, chunk) { + const stream = $getByIdDirectPrivate(writer, "stream"); + $assert(stream !== undefined); + + const controller = $getByIdDirectPrivate(stream, "controller"); + $assert(controller !== undefined); + const chunkSize = $writableStreamDefaultControllerGetChunkSize(controller, chunk); + + if (stream !== $getByIdDirectPrivate(writer, "stream")) + return Promise.$reject($makeTypeError("writer is not stream's writer")); + + const state = $getByIdDirectPrivate(stream, "state"); + if (state === "errored") return Promise.$reject($getByIdDirectPrivate(stream, "storedError")); + + if ($writableStreamCloseQueuedOrInFlight(stream) || state === "closed") + return Promise.$reject($makeTypeError("stream is closing or closed")); + + if ($writableStreamCloseQueuedOrInFlight(stream) || state === "closed") + return Promise.$reject($makeTypeError("stream is closing or closed")); + + if (state === "erroring") return Promise.$reject($getByIdDirectPrivate(stream, "storedError")); + + $assert(state === "writable"); + + const promise = $writableStreamAddWriteRequest(stream); + $writableStreamDefaultControllerWrite(controller, chunk, chunkSize); + return promise; +} + +export function setUpWritableStreamDefaultController( + stream, + controller, + startAlgorithm, + writeAlgorithm, + closeAlgorithm, + abortAlgorithm, + highWaterMark, + sizeAlgorithm, +) { + $assert($isWritableStream(stream)); + $assert($getByIdDirectPrivate(stream, "controller") === undefined); + + $putByIdDirectPrivate(controller, "stream", stream); + $putByIdDirectPrivate(stream, "controller", controller); + + $resetQueue($getByIdDirectPrivate(controller, "queue")); + + $putByIdDirectPrivate(controller, "started", -1); + $putByIdDirectPrivate(controller, "startAlgorithm", startAlgorithm); + $putByIdDirectPrivate(controller, "strategySizeAlgorithm", sizeAlgorithm); + $putByIdDirectPrivate(controller, "strategyHWM", highWaterMark); + $putByIdDirectPrivate(controller, "writeAlgorithm", writeAlgorithm); + $putByIdDirectPrivate(controller, "closeAlgorithm", closeAlgorithm); + $putByIdDirectPrivate(controller, "abortAlgorithm", abortAlgorithm); + + const backpressure = $writableStreamDefaultControllerGetBackpressure(controller); + $writableStreamUpdateBackpressure(stream, backpressure); + + $writableStreamDefaultControllerStart(controller); +} + +export function writableStreamDefaultControllerStart(controller) { + if ($getByIdDirectPrivate(controller, "started") !== -1) return; + + $putByIdDirectPrivate(controller, "started", 0); + + const startAlgorithm = $getByIdDirectPrivate(controller, "startAlgorithm"); + $putByIdDirectPrivate(controller, "startAlgorithm", undefined); + const stream = $getByIdDirectPrivate(controller, "stream"); + return Promise.$resolve(startAlgorithm.$call()).$then( + () => { + const state = $getByIdDirectPrivate(stream, "state"); + $assert(state === "writable" || state === "erroring"); + $putByIdDirectPrivate(controller, "started", 1); + $writableStreamDefaultControllerAdvanceQueueIfNeeded(controller); + }, + error => { + const state = $getByIdDirectPrivate(stream, "state"); + $assert(state === "writable" || state === "erroring"); + $putByIdDirectPrivate(controller, "started", 1); + $writableStreamDealWithRejection(stream, error); + }, + ); +} + +export function setUpWritableStreamDefaultControllerFromUnderlyingSink( + stream, + underlyingSink, + underlyingSinkDict, + highWaterMark, + sizeAlgorithm, +) { + const controller = new $WritableStreamDefaultController(); + + let startAlgorithm = () => {}; + let writeAlgorithm = () => { + return Promise.$resolve(); + }; + let closeAlgorithm = () => { + return Promise.$resolve(); + }; + let abortAlgorithm = () => { + return Promise.$resolve(); + }; + + if ("start" in underlyingSinkDict) { + const startMethod = underlyingSinkDict["start"]; + startAlgorithm = () => $promiseInvokeOrNoopMethodNoCatch(underlyingSink, startMethod, [controller]); + } + if ("write" in underlyingSinkDict) { + const writeMethod = underlyingSinkDict["write"]; + writeAlgorithm = chunk => $promiseInvokeOrNoopMethod(underlyingSink, writeMethod, [chunk, controller]); + } + if ("close" in underlyingSinkDict) { + const closeMethod = underlyingSinkDict["close"]; + closeAlgorithm = () => $promiseInvokeOrNoopMethod(underlyingSink, closeMethod, []); + } + if ("abort" in underlyingSinkDict) { + const abortMethod = underlyingSinkDict["abort"]; + abortAlgorithm = reason => $promiseInvokeOrNoopMethod(underlyingSink, abortMethod, [reason]); + } + + $setUpWritableStreamDefaultController( + stream, + controller, + startAlgorithm, + writeAlgorithm, + closeAlgorithm, + abortAlgorithm, + highWaterMark, + sizeAlgorithm, + ); +} + +export function writableStreamDefaultControllerAdvanceQueueIfNeeded(controller) { + const stream = $getByIdDirectPrivate(controller, "stream"); + + if ($getByIdDirectPrivate(controller, "started") !== 1) return; + + $assert(stream !== undefined); + if ($getByIdDirectPrivate(stream, "inFlightWriteRequest") !== undefined) return; + + const state = $getByIdDirectPrivate(stream, "state"); + $assert(state !== "closed" || state !== "errored"); + if (state === "erroring") { + $writableStreamFinishErroring(stream); + return; + } + + const queue = $getByIdDirectPrivate(controller, "queue"); + + if (queue.content?.isEmpty() ?? false) return; + + const value = $peekQueueValue(queue); + if (value === $isCloseSentinel) $writableStreamDefaultControllerProcessClose(controller); + else $writableStreamDefaultControllerProcessWrite(controller, value); +} + +export function isCloseSentinel() {} + +export function writableStreamDefaultControllerClearAlgorithms(controller) { + $putByIdDirectPrivate(controller, "writeAlgorithm", undefined); + $putByIdDirectPrivate(controller, "closeAlgorithm", undefined); + $putByIdDirectPrivate(controller, "abortAlgorithm", undefined); + $putByIdDirectPrivate(controller, "strategySizeAlgorithm", undefined); +} + +export function writableStreamDefaultControllerClose(controller) { + $enqueueValueWithSize($getByIdDirectPrivate(controller, "queue"), $isCloseSentinel, 0); + $writableStreamDefaultControllerAdvanceQueueIfNeeded(controller); +} + +export function writableStreamDefaultControllerError(controller, error) { + const stream = $getByIdDirectPrivate(controller, "stream"); + $assert(stream !== undefined); + $assert($getByIdDirectPrivate(stream, "state") === "writable"); + + $writableStreamDefaultControllerClearAlgorithms(controller); + $writableStreamStartErroring(stream, error); +} + +export function writableStreamDefaultControllerErrorIfNeeded(controller, error) { + const stream = $getByIdDirectPrivate(controller, "stream"); + if ($getByIdDirectPrivate(stream, "state") === "writable") $writableStreamDefaultControllerError(controller, error); +} + +export function writableStreamDefaultControllerGetBackpressure(controller) { + const desiredSize = $writableStreamDefaultControllerGetDesiredSize(controller); + return desiredSize <= 0; +} + +export function writableStreamDefaultControllerGetChunkSize(controller, chunk) { + try { + return $getByIdDirectPrivate(controller, "strategySizeAlgorithm").$call(undefined, chunk); + } catch (e) { + $writableStreamDefaultControllerErrorIfNeeded(controller, e); + return 1; + } +} + +export function writableStreamDefaultControllerGetDesiredSize(controller) { + return $getByIdDirectPrivate(controller, "strategyHWM") - $getByIdDirectPrivate(controller, "queue").size; +} + +export function writableStreamDefaultControllerProcessClose(controller) { + const stream = $getByIdDirectPrivate(controller, "stream"); + + $writableStreamMarkCloseRequestInFlight(stream); + $dequeueValue($getByIdDirectPrivate(controller, "queue")); + + $assert($getByIdDirectPrivate(controller, "queue").content?.isEmpty()); + + const sinkClosePromise = $getByIdDirectPrivate(controller, "closeAlgorithm").$call(); + $writableStreamDefaultControllerClearAlgorithms(controller); + + sinkClosePromise.$then( + () => { + $writableStreamFinishInFlightClose(stream); + }, + reason => { + $writableStreamFinishInFlightCloseWithError(stream, reason); + }, + ); +} + +export function writableStreamDefaultControllerProcessWrite(controller, chunk) { + const stream = $getByIdDirectPrivate(controller, "stream"); + + $writableStreamMarkFirstWriteRequestInFlight(stream); + + const sinkWritePromise = $getByIdDirectPrivate(controller, "writeAlgorithm").$call(undefined, chunk); + + sinkWritePromise.$then( + () => { + $writableStreamFinishInFlightWrite(stream); + const state = $getByIdDirectPrivate(stream, "state"); + $assert(state === "writable" || state === "erroring"); + + $dequeueValue($getByIdDirectPrivate(controller, "queue")); + if (!$writableStreamCloseQueuedOrInFlight(stream) && state === "writable") { + const backpressure = $writableStreamDefaultControllerGetBackpressure(controller); + $writableStreamUpdateBackpressure(stream, backpressure); + } + $writableStreamDefaultControllerAdvanceQueueIfNeeded(controller); + }, + reason => { + const state = $getByIdDirectPrivate(stream, "state"); + if (state === "writable") $writableStreamDefaultControllerClearAlgorithms(controller); + + $writableStreamFinishInFlightWriteWithError(stream, reason); + }, + ); +} + +export function writableStreamDefaultControllerWrite(controller, chunk, chunkSize) { + try { + $enqueueValueWithSize($getByIdDirectPrivate(controller, "queue"), chunk, chunkSize); + + const stream = $getByIdDirectPrivate(controller, "stream"); + + const state = $getByIdDirectPrivate(stream, "state"); + if (!$writableStreamCloseQueuedOrInFlight(stream) && state === "writable") { + const backpressure = $writableStreamDefaultControllerGetBackpressure(controller); + $writableStreamUpdateBackpressure(stream, backpressure); + } + $writableStreamDefaultControllerAdvanceQueueIfNeeded(controller); + } catch (e) { + $writableStreamDefaultControllerErrorIfNeeded(controller, e); + } +} |