diff options
Diffstat (limited to 'src/js/builtins/ReadableStreamInternals.ts')
-rw-r--r-- | src/js/builtins/ReadableStreamInternals.ts | 1799 |
1 files changed, 1799 insertions, 0 deletions
diff --git a/src/js/builtins/ReadableStreamInternals.ts b/src/js/builtins/ReadableStreamInternals.ts new file mode 100644 index 000000000..0c4e816f4 --- /dev/null +++ b/src/js/builtins/ReadableStreamInternals.ts @@ -0,0 +1,1799 @@ +/* + * Copyright (C) 2015 Canon Inc. All rights reserved. + * Copyright (C) 2015 Igalia. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY APPLE INC. ``AS IS'' AND ANY + * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR + * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL APPLE INC. OR + * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, + * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, + * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR + * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY + * OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +// @internal + +export function readableStreamReaderGenericInitialize(reader, stream) { + $putByIdDirectPrivate(reader, "ownerReadableStream", stream); + $putByIdDirectPrivate(stream, "reader", reader); + if ($getByIdDirectPrivate(stream, "state") === $streamReadable) + $putByIdDirectPrivate(reader, "closedPromiseCapability", $newPromiseCapability(Promise)); + else if ($getByIdDirectPrivate(stream, "state") === $streamClosed) + $putByIdDirectPrivate(reader, "closedPromiseCapability", { + $promise: Promise.$resolve(), + }); + else { + $assert($getByIdDirectPrivate(stream, "state") === $streamErrored); + $putByIdDirectPrivate(reader, "closedPromiseCapability", { + $promise: $newHandledRejectedPromise($getByIdDirectPrivate(stream, "storedError")), + }); + } +} + +export function privateInitializeReadableStreamDefaultController(this, stream, underlyingSource, size, highWaterMark) { + if (!$isReadableStream(stream)) throw new TypeError("ReadableStreamDefaultController needs a ReadableStream"); + + // readableStreamController is initialized with null value. + if ($getByIdDirectPrivate(stream, "readableStreamController") !== null) + throw new TypeError("ReadableStream already has a controller"); + + $putByIdDirectPrivate(this, "controlledReadableStream", stream); + $putByIdDirectPrivate(this, "underlyingSource", underlyingSource); + $putByIdDirectPrivate(this, "queue", $newQueue()); + $putByIdDirectPrivate(this, "started", -1); + $putByIdDirectPrivate(this, "closeRequested", false); + $putByIdDirectPrivate(this, "pullAgain", false); + $putByIdDirectPrivate(this, "pulling", false); + $putByIdDirectPrivate(this, "strategy", $validateAndNormalizeQueuingStrategy(size, highWaterMark)); + + return this; +} + +export function readableStreamDefaultControllerError(controller, error) { + const stream = $getByIdDirectPrivate(controller, "controlledReadableStream"); + if ($getByIdDirectPrivate(stream, "state") !== $streamReadable) return; + $putByIdDirectPrivate(controller, "queue", $newQueue()); + + $readableStreamError(stream, error); +} + +export function readableStreamPipeTo(stream, sink) { + $assert($isReadableStream(stream)); + + const reader = new ReadableStreamDefaultReader(stream); + + $getByIdDirectPrivate(reader, "closedPromiseCapability").$promise.$then( + () => {}, + e => { + sink.error(e); + }, + ); + + function doPipe() { + $readableStreamDefaultReaderRead(reader).$then( + function (result) { + if (result.done) { + sink.close(); + return; + } + try { + sink.enqueue(result.value); + } catch (e) { + sink.error("ReadableStream chunk enqueueing in the sink failed"); + return; + } + doPipe(); + }, + function (e) { + sink.error(e); + }, + ); + } + doPipe(); +} + +export function acquireReadableStreamDefaultReader(stream) { + var start = $getByIdDirectPrivate(stream, "start"); + if (start) { + start.$call(stream); + } + + return new ReadableStreamDefaultReader(stream); +} + +// https://streams.spec.whatwg.org/#set-up-readable-stream-default-controller, starting from step 6. +// The other part is implemented in privateInitializeReadableStreamDefaultController. +export function setupReadableStreamDefaultController( + stream, + underlyingSource, + size, + highWaterMark, + startMethod, + pullMethod, + cancelMethod, +) { + const controller = new ReadableStreamDefaultController( + stream, + underlyingSource, + size, + highWaterMark, + $isReadableStream, + ); + + const pullAlgorithm = () => $promiseInvokeOrNoopMethod(underlyingSource, pullMethod, [controller]); + const cancelAlgorithm = reason => $promiseInvokeOrNoopMethod(underlyingSource, cancelMethod, [reason]); + + $putByIdDirectPrivate(controller, "pullAlgorithm", pullAlgorithm); + $putByIdDirectPrivate(controller, "cancelAlgorithm", cancelAlgorithm); + $putByIdDirectPrivate(controller, "pull", $readableStreamDefaultControllerPull); + $putByIdDirectPrivate(controller, "cancel", $readableStreamDefaultControllerCancel); + $putByIdDirectPrivate(stream, "readableStreamController", controller); + + $readableStreamDefaultControllerStart(controller); +} + +export function createReadableStreamController(stream, underlyingSource, strategy) { + const type = underlyingSource.type; + const typeString = $toString(type); + + if (typeString === "bytes") { + // if (!$readableByteStreamAPIEnabled()) + // $throwTypeError("ReadableByteStreamController is not implemented"); + + if (strategy.highWaterMark === undefined) strategy.highWaterMark = 0; + if (strategy.size !== undefined) $throwRangeError("Strategy for a ReadableByteStreamController cannot have a size"); + + $putByIdDirectPrivate( + stream, + "readableStreamController", + new ReadableByteStreamController(stream, underlyingSource, strategy.highWaterMark, $isReadableStream), + ); + } else if (typeString === "direct") { + var highWaterMark = strategy?.highWaterMark; + $initializeArrayBufferStream.$call(stream, underlyingSource, highWaterMark); + } else if (type === undefined) { + if (strategy.highWaterMark === undefined) strategy.highWaterMark = 1; + + $setupReadableStreamDefaultController( + stream, + underlyingSource, + strategy.size, + strategy.highWaterMark, + underlyingSource.start, + underlyingSource.pull, + underlyingSource.cancel, + ); + } else throw new RangeError("Invalid type for underlying source"); +} + +export function readableStreamDefaultControllerStart(controller) { + if ($getByIdDirectPrivate(controller, "started") !== -1) return; + + const underlyingSource = $getByIdDirectPrivate(controller, "underlyingSource"); + const startMethod = underlyingSource.start; + $putByIdDirectPrivate(controller, "started", 0); + + $promiseInvokeOrNoopMethodNoCatch(underlyingSource, startMethod, [controller]).$then( + () => { + $putByIdDirectPrivate(controller, "started", 1); + $assert(!$getByIdDirectPrivate(controller, "pulling")); + $assert(!$getByIdDirectPrivate(controller, "pullAgain")); + $readableStreamDefaultControllerCallPullIfNeeded(controller); + }, + error => { + $readableStreamDefaultControllerError(controller, error); + }, + ); +} + +// FIXME: Replace readableStreamPipeTo by below function. +// This method implements the latest https://streams.spec.whatwg.org/#readable-stream-pipe-to. +export function readableStreamPipeToWritableStream( + source, + destination, + preventClose, + preventAbort, + preventCancel, + signal, +) { + // const isDirectStream = !!$getByIdDirectPrivate(source, "start"); + + $assert($isReadableStream(source)); + $assert($isWritableStream(destination)); + $assert(!$isReadableStreamLocked(source)); + $assert(!$isWritableStreamLocked(destination)); + $assert(signal === undefined || $isAbortSignal(signal)); + + if ($getByIdDirectPrivate(source, "underlyingByteSource") !== undefined) + return Promise.$reject("Piping to a readable bytestream is not supported"); + + let pipeState: any = { + source: source, + destination: destination, + preventAbort: preventAbort, + preventCancel: preventCancel, + preventClose: preventClose, + signal: signal, + }; + + pipeState.reader = $acquireReadableStreamDefaultReader(source); + pipeState.writer = $acquireWritableStreamDefaultWriter(destination); + + $putByIdDirectPrivate(source, "disturbed", true); + + pipeState.finalized = false; + pipeState.shuttingDown = false; + pipeState.promiseCapability = $newPromiseCapability(Promise); + pipeState.pendingReadPromiseCapability = $newPromiseCapability(Promise); + pipeState.pendingReadPromiseCapability.$resolve.$call(); + pipeState.pendingWritePromise = Promise.$resolve(); + + if (signal !== undefined) { + const algorithm = reason => { + if (pipeState.finalized) return; + + $pipeToShutdownWithAction( + pipeState, + () => { + const shouldAbortDestination = + !pipeState.preventAbort && $getByIdDirectPrivate(pipeState.destination, "state") === "writable"; + const promiseDestination = shouldAbortDestination + ? $writableStreamAbort(pipeState.destination, reason) + : Promise.$resolve(); + + const shouldAbortSource = + !pipeState.preventCancel && $getByIdDirectPrivate(pipeState.source, "state") === $streamReadable; + const promiseSource = shouldAbortSource + ? $readableStreamCancel(pipeState.source, reason) + : Promise.$resolve(); + + let promiseCapability = $newPromiseCapability(Promise); + let shouldWait = true; + let handleResolvedPromise = () => { + if (shouldWait) { + shouldWait = false; + return; + } + promiseCapability.$resolve.$call(); + }; + let handleRejectedPromise = e => { + promiseCapability.$reject.$call(undefined, e); + }; + promiseDestination.$then(handleResolvedPromise, handleRejectedPromise); + promiseSource.$then(handleResolvedPromise, handleRejectedPromise); + return promiseCapability.$promise; + }, + reason, + ); + }; + if ($whenSignalAborted(signal, algorithm)) return pipeState.promiseCapability.$promise; + } + + $pipeToErrorsMustBePropagatedForward(pipeState); + $pipeToErrorsMustBePropagatedBackward(pipeState); + $pipeToClosingMustBePropagatedForward(pipeState); + $pipeToClosingMustBePropagatedBackward(pipeState); + + $pipeToLoop(pipeState); + + return pipeState.promiseCapability.$promise; +} + +export function pipeToLoop(pipeState) { + if (pipeState.shuttingDown) return; + + $pipeToDoReadWrite(pipeState).$then(result => { + if (result) $pipeToLoop(pipeState); + }); +} + +export function pipeToDoReadWrite(pipeState) { + $assert(!pipeState.shuttingDown); + + pipeState.pendingReadPromiseCapability = $newPromiseCapability(Promise); + $getByIdDirectPrivate(pipeState.writer, "readyPromise").$promise.$then( + () => { + if (pipeState.shuttingDown) { + pipeState.pendingReadPromiseCapability.$resolve.$call(undefined, false); + return; + } + + $readableStreamDefaultReaderRead(pipeState.reader).$then( + result => { + const canWrite = !result.done && $getByIdDirectPrivate(pipeState.writer, "stream") !== undefined; + pipeState.pendingReadPromiseCapability.$resolve.$call(undefined, canWrite); + if (!canWrite) return; + + pipeState.pendingWritePromise = $writableStreamDefaultWriterWrite(pipeState.writer, result.value); + }, + e => { + pipeState.pendingReadPromiseCapability.$resolve.$call(undefined, false); + }, + ); + }, + e => { + pipeState.pendingReadPromiseCapability.$resolve.$call(undefined, false); + }, + ); + return pipeState.pendingReadPromiseCapability.$promise; +} + +export function pipeToErrorsMustBePropagatedForward(pipeState) { + const action = () => { + pipeState.pendingReadPromiseCapability.$resolve.$call(undefined, false); + const error = $getByIdDirectPrivate(pipeState.source, "storedError"); + if (!pipeState.preventAbort) { + $pipeToShutdownWithAction(pipeState, () => $writableStreamAbort(pipeState.destination, error), error); + return; + } + $pipeToShutdown(pipeState, error); + }; + + if ($getByIdDirectPrivate(pipeState.source, "state") === $streamErrored) { + action(); + return; + } + + $getByIdDirectPrivate(pipeState.reader, "closedPromiseCapability").$promise.$then(undefined, action); +} + +export function pipeToErrorsMustBePropagatedBackward(pipeState) { + const action = () => { + const error = $getByIdDirectPrivate(pipeState.destination, "storedError"); + if (!pipeState.preventCancel) { + $pipeToShutdownWithAction(pipeState, () => $readableStreamCancel(pipeState.source, error), error); + return; + } + $pipeToShutdown(pipeState, error); + }; + if ($getByIdDirectPrivate(pipeState.destination, "state") === "errored") { + action(); + return; + } + $getByIdDirectPrivate(pipeState.writer, "closedPromise").$promise.$then(undefined, action); +} + +export function pipeToClosingMustBePropagatedForward(pipeState) { + const action = () => { + pipeState.pendingReadPromiseCapability.$resolve.$call(undefined, false); + // const error = $getByIdDirectPrivate(pipeState.source, "storedError"); + if (!pipeState.preventClose) { + $pipeToShutdownWithAction(pipeState, () => + $writableStreamDefaultWriterCloseWithErrorPropagation(pipeState.writer), + ); + return; + } + $pipeToShutdown(pipeState); + }; + if ($getByIdDirectPrivate(pipeState.source, "state") === $streamClosed) { + action(); + return; + } + $getByIdDirectPrivate(pipeState.reader, "closedPromiseCapability").$promise.$then(action, undefined); +} + +export function pipeToClosingMustBePropagatedBackward(pipeState) { + if ( + !$writableStreamCloseQueuedOrInFlight(pipeState.destination) && + $getByIdDirectPrivate(pipeState.destination, "state") !== "closed" + ) + return; + + // $assert no chunks have been read/written + + const error = new TypeError("closing is propagated backward"); + if (!pipeState.preventCancel) { + $pipeToShutdownWithAction(pipeState, () => $readableStreamCancel(pipeState.source, error), error); + return; + } + $pipeToShutdown(pipeState, error); +} + +export function pipeToShutdownWithAction(pipeState, action) { + if (pipeState.shuttingDown) return; + + pipeState.shuttingDown = true; + + const hasError = arguments.length > 2; + const error = arguments[2]; + const finalize = () => { + const promise = action(); + promise.$then( + () => { + if (hasError) $pipeToFinalize(pipeState, error); + else $pipeToFinalize(pipeState); + }, + e => { + $pipeToFinalize(pipeState, e); + }, + ); + }; + + if ( + $getByIdDirectPrivate(pipeState.destination, "state") === "writable" && + !$writableStreamCloseQueuedOrInFlight(pipeState.destination) + ) { + pipeState.pendingReadPromiseCapability.$promise.$then( + () => { + pipeState.pendingWritePromise.$then(finalize, finalize); + }, + e => $pipeToFinalize(pipeState, e), + ); + return; + } + + finalize(); +} + +export function pipeToShutdown(pipeState) { + if (pipeState.shuttingDown) return; + + pipeState.shuttingDown = true; + + const hasError = arguments.length > 1; + const error = arguments[1]; + const finalize = () => { + if (hasError) $pipeToFinalize(pipeState, error); + else $pipeToFinalize(pipeState); + }; + + if ( + $getByIdDirectPrivate(pipeState.destination, "state") === "writable" && + !$writableStreamCloseQueuedOrInFlight(pipeState.destination) + ) { + pipeState.pendingReadPromiseCapability.$promise.$then( + () => { + pipeState.pendingWritePromise.$then(finalize, finalize); + }, + e => $pipeToFinalize(pipeState, e), + ); + return; + } + finalize(); +} + +export function pipeToFinalize(pipeState) { + $writableStreamDefaultWriterRelease(pipeState.writer); + $readableStreamReaderGenericRelease(pipeState.reader); + + // Instead of removing the abort algorithm as per spec, we make it a no-op which is equivalent. + pipeState.finalized = true; + + if (arguments.length > 1) pipeState.promiseCapability.$reject.$call(undefined, arguments[1]); + else pipeState.promiseCapability.$resolve.$call(); +} + +export function readableStreamTee(stream, shouldClone) { + $assert($isReadableStream(stream)); + $assert(typeof shouldClone === "boolean"); + + var start_ = $getByIdDirectPrivate(stream, "start"); + if (start_) { + $putByIdDirectPrivate(stream, "start", undefined); + start_(); + } + + const reader = new $ReadableStreamDefaultReader(stream); + + const teeState = { + closedOrErrored: false, + canceled1: false, + canceled2: false, + reason1: undefined, + reason2: undefined, + }; + + teeState.cancelPromiseCapability = $newPromiseCapability(Promise); + + const pullFunction = $readableStreamTeePullFunction(teeState, reader, shouldClone); + + const branch1Source = {}; + $putByIdDirectPrivate(branch1Source, "pull", pullFunction); + $putByIdDirectPrivate(branch1Source, "cancel", $readableStreamTeeBranch1CancelFunction(teeState, stream)); + + const branch2Source = {}; + $putByIdDirectPrivate(branch2Source, "pull", pullFunction); + $putByIdDirectPrivate(branch2Source, "cancel", $readableStreamTeeBranch2CancelFunction(teeState, stream)); + + const branch1 = new $ReadableStream(branch1Source); + const branch2 = new $ReadableStream(branch2Source); + + $getByIdDirectPrivate(reader, "closedPromiseCapability").$promise.$then(undefined, function (e) { + if (teeState.closedOrErrored) return; + $readableStreamDefaultControllerError(branch1.$readableStreamController, e); + $readableStreamDefaultControllerError(branch2.$readableStreamController, e); + teeState.closedOrErrored = true; + if (!teeState.canceled1 || !teeState.canceled2) teeState.cancelPromiseCapability.$resolve.$call(); + }); + + // Additional fields compared to the spec, as they are needed within pull/cancel functions. + teeState.branch1 = branch1; + teeState.branch2 = branch2; + + return [branch1, branch2]; +} + +export function readableStreamTeePullFunction(teeState, reader, shouldClone) { + return function () { + Promise.prototype.$then.$call($readableStreamDefaultReaderRead(reader), function (result) { + $assert($isObject(result)); + $assert(typeof result.done === "boolean"); + if (result.done && !teeState.closedOrErrored) { + if (!teeState.canceled1) $readableStreamDefaultControllerClose(teeState.branch1.$readableStreamController); + if (!teeState.canceled2) $readableStreamDefaultControllerClose(teeState.branch2.$readableStreamController); + teeState.closedOrErrored = true; + if (!teeState.canceled1 || !teeState.canceled2) teeState.cancelPromiseCapability.$resolve.$call(); + } + if (teeState.closedOrErrored) return; + if (!teeState.canceled1) + $readableStreamDefaultControllerEnqueue(teeState.branch1.$readableStreamController, result.value); + if (!teeState.canceled2) + $readableStreamDefaultControllerEnqueue( + teeState.branch2.$readableStreamController, + shouldClone ? $structuredCloneForStream(result.value) : result.value, + ); + }); + }; +} + +export function readableStreamTeeBranch1CancelFunction(teeState, stream) { + return function (r) { + teeState.canceled1 = true; + teeState.reason1 = r; + if (teeState.canceled2) { + $readableStreamCancel(stream, [teeState.reason1, teeState.reason2]).$then( + teeState.cancelPromiseCapability.$resolve, + teeState.cancelPromiseCapability.$reject, + ); + } + return teeState.cancelPromiseCapability.$promise; + }; +} + +export function readableStreamTeeBranch2CancelFunction(teeState, stream) { + return function (r) { + teeState.canceled2 = true; + teeState.reason2 = r; + if (teeState.canceled1) { + $readableStreamCancel(stream, [teeState.reason1, teeState.reason2]).$then( + teeState.cancelPromiseCapability.$resolve, + teeState.cancelPromiseCapability.$reject, + ); + } + return teeState.cancelPromiseCapability.$promise; + }; +} + +export function isReadableStream(stream) { + // Spec tells to return true only if stream has a readableStreamController internal slot. + // However, since it is a private slot, it cannot be checked using hasOwnProperty(). + // Therefore, readableStreamController is initialized with null value. + return $isObject(stream) && $getByIdDirectPrivate(stream, "readableStreamController") !== undefined; +} + +export function isReadableStreamDefaultReader(reader) { + // Spec tells to return true only if reader has a readRequests internal slot. + // However, since it is a private slot, it cannot be checked using hasOwnProperty(). + // Since readRequests is initialized with an empty array, the following test is ok. + return $isObject(reader) && !!$getByIdDirectPrivate(reader, "readRequests"); +} + +export function isReadableStreamDefaultController(controller) { + // Spec tells to return true only if controller has an underlyingSource internal slot. + // However, since it is a private slot, it cannot be checked using hasOwnProperty(). + // underlyingSource is obtained in ReadableStream constructor: if undefined, it is set + // to an empty object. Therefore, following test is ok. + return $isObject(controller) && !!$getByIdDirectPrivate(controller, "underlyingSource"); +} + +export function readDirectStream(stream, sink, underlyingSource) { + $putByIdDirectPrivate(stream, "underlyingSource", undefined); + $putByIdDirectPrivate(stream, "start", undefined); + + function close(stream, reason) { + if (reason && underlyingSource?.cancel) { + try { + var prom = underlyingSource.cancel(reason); + $markPromiseAsHandled(prom); + } catch (e) {} + + underlyingSource = undefined; + } + + if (stream) { + $putByIdDirectPrivate(stream, "readableStreamController", undefined); + $putByIdDirectPrivate(stream, "reader", undefined); + if (reason) { + $putByIdDirectPrivate(stream, "state", $streamErrored); + $putByIdDirectPrivate(stream, "storedError", reason); + } else { + $putByIdDirectPrivate(stream, "state", $streamClosed); + } + stream = undefined; + } + } + + if (!underlyingSource.pull) { + close(); + return; + } + + if (!$isCallable(underlyingSource.pull)) { + close(); + $throwTypeError("pull is not a function"); + return; + } + + $putByIdDirectPrivate(stream, "readableStreamController", sink); + const highWaterMark = $getByIdDirectPrivate(stream, "highWaterMark"); + + sink.start({ + highWaterMark: !highWaterMark || highWaterMark < 64 ? 64 : highWaterMark, + }); + + $startDirectStream.$call(sink, stream, underlyingSource.pull, close); + $putByIdDirectPrivate(stream, "reader", {}); + + var maybePromise = underlyingSource.pull(sink); + sink = undefined; + if (maybePromise && $isPromise(maybePromise)) { + return maybePromise.$then(() => {}); + } +} + +$linkTimeConstant; +export function assignToStream(stream, sink) { + // The stream is either a direct stream or a "default" JS stream + var underlyingSource = $getByIdDirectPrivate(stream, "underlyingSource"); + + // we know it's a direct stream when $underlyingSource is set + if (underlyingSource) { + try { + return $readDirectStream(stream, sink, underlyingSource); + } catch (e) { + throw e; + } finally { + underlyingSource = undefined; + stream = undefined; + sink = undefined; + } + } + + return $readStreamIntoSink(stream, sink, true); +} + +export async function readStreamIntoSink(stream, sink, isNative) { + var didClose = false; + var didThrow = false; + try { + var reader = stream.getReader(); + var many = reader.readMany(); + if (many && $isPromise(many)) { + many = await many; + } + if (many.done) { + didClose = true; + return sink.end(); + } + var wroteCount = many.value.length; + const highWaterMark = $getByIdDirectPrivate(stream, "highWaterMark"); + if (isNative) + $startDirectStream.$call(sink, stream, undefined, () => !didThrow && $markPromiseAsHandled(stream.cancel())); + + sink.start({ highWaterMark: highWaterMark || 0 }); + + for (var i = 0, values = many.value, length = many.value.length; i < length; i++) { + sink.write(values[i]); + } + + var streamState = $getByIdDirectPrivate(stream, "state"); + if (streamState === $streamClosed) { + didClose = true; + return sink.end(); + } + + while (true) { + var { value, done } = await reader.read(); + if (done) { + didClose = true; + return sink.end(); + } + + sink.write(value); + } + } catch (e) { + didThrow = true; + + try { + reader = undefined; + const prom = stream.cancel(e); + $markPromiseAsHandled(prom); + } catch (j) {} + + if (sink && !didClose) { + didClose = true; + try { + sink.close(e); + } catch (j) { + throw new globalThis.AggregateError([e, j]); + } + } + + throw e; + } finally { + if (reader) { + try { + reader.releaseLock(); + } catch (e) {} + reader = undefined; + } + sink = undefined; + var streamState = $getByIdDirectPrivate(stream, "state"); + if (stream) { + // make it easy for this to be GC'd + // but don't do property transitions + var readableStreamController = $getByIdDirectPrivate(stream, "readableStreamController"); + if (readableStreamController) { + if ($getByIdDirectPrivate(readableStreamController, "underlyingSource")) + $putByIdDirectPrivate(readableStreamController, "underlyingSource", undefined); + if ($getByIdDirectPrivate(readableStreamController, "controlledReadableStream")) + $putByIdDirectPrivate(readableStreamController, "controlledReadableStream", undefined); + + $putByIdDirectPrivate(stream, "readableStreamController", null); + if ($getByIdDirectPrivate(stream, "underlyingSource")) + $putByIdDirectPrivate(stream, "underlyingSource", undefined); + readableStreamController = undefined; + } + + if (!didThrow && streamState !== $streamClosed && streamState !== $streamErrored) { + $readableStreamClose(stream); + } + stream = undefined; + } + } +} + +export function handleDirectStreamError(e) { + var controller = this; + var sink = controller.$sink; + if (sink) { + $putByIdDirectPrivate(controller, "sink", undefined); + try { + sink.close(e); + } catch (f) {} + } + + this.error = this.flush = this.write = this.close = this.end = $onReadableStreamDirectControllerClosed; + + if (typeof this.$underlyingSource.close === "function") { + try { + this.$underlyingSource.close.$call(this.$underlyingSource, e); + } catch (e) {} + } + + try { + var pend = controller._pendingRead; + if (pend) { + controller._pendingRead = undefined; + $rejectPromise(pend, e); + } + } catch (f) {} + var stream = controller.$controlledReadableStream; + if (stream) $readableStreamError(stream, e); +} + +export function handleDirectStreamErrorReject(e) { + $handleDirectStreamError.$call(this, e); + return Promise.$reject(e); +} + +export function onPullDirectStream(controller) { + var stream = controller.$controlledReadableStream; + if (!stream || $getByIdDirectPrivate(stream, "state") !== $streamReadable) return; + + // pull is in progress + // this is a recursive call + // ignore it + if (controller._deferClose === -1) { + return; + } + + controller._deferClose = -1; + controller._deferFlush = -1; + var deferClose; + var deferFlush; + + // Direct streams allow $pull to be called multiple times, unlike the spec. + // Backpressure is handled by the destination, not by the underlying source. + // In this case, we rely on the heuristic that repeatedly draining in the same tick + // is bad for performance + // this code is only run when consuming a direct stream from JS + // without the HTTP server or anything else + try { + var result = controller.$underlyingSource.pull(controller); + + if (result && $isPromise(result)) { + if (controller._handleError === undefined) { + controller._handleError = $handleDirectStreamErrorReject.bind(controller); + } + + Promise.prototype.catch.$call(result, controller._handleError); + } + } catch (e) { + return $handleDirectStreamErrorReject.$call(controller, e); + } finally { + deferClose = controller._deferClose; + deferFlush = controller._deferFlush; + controller._deferFlush = controller._deferClose = 0; + } + + var promiseToReturn; + + if (controller._pendingRead === undefined) { + controller._pendingRead = promiseToReturn = $newPromise(); + } else { + promiseToReturn = $readableStreamAddReadRequest(stream); + } + + // they called close during $pull() + // we delay that + if (deferClose === 1) { + var reason = controller._deferCloseReason; + controller._deferCloseReason = undefined; + $onCloseDirectStream.$call(controller, reason); + return promiseToReturn; + } + + // not done, but they called flush() + if (deferFlush === 1) { + $onFlushDirectStream.$call(controller); + } + + return promiseToReturn; +} + +export function noopDoneFunction() { + return Promise.$resolve({ value: undefined, done: true }); +} + +export function onReadableStreamDirectControllerClosed(reason) { + $throwTypeError("ReadableStreamDirectController is now closed"); +} + +export function onCloseDirectStream(reason) { + var stream = this.$controlledReadableStream; + if (!stream || $getByIdDirectPrivate(stream, "state") !== $streamReadable) return; + + if (this._deferClose !== 0) { + this._deferClose = 1; + this._deferCloseReason = reason; + return; + } + + $putByIdDirectPrivate(stream, "state", $streamClosing); + if (typeof this.$underlyingSource.close === "function") { + try { + this.$underlyingSource.close.$call(this.$underlyingSource, reason); + } catch (e) {} + } + + var flushed; + try { + flushed = this.$sink.end(); + $putByIdDirectPrivate(this, "sink", undefined); + } catch (e) { + if (this._pendingRead) { + var read = this._pendingRead; + this._pendingRead = undefined; + $rejectPromise(read, e); + } + $readableStreamError(stream, e); + return; + } + + this.error = this.flush = this.write = this.close = this.end = $onReadableStreamDirectControllerClosed; + + var reader = $getByIdDirectPrivate(stream, "reader"); + + if (reader && $isReadableStreamDefaultReader(reader)) { + var _pendingRead = this._pendingRead; + if (_pendingRead && $isPromise(_pendingRead) && flushed?.byteLength) { + this._pendingRead = undefined; + $fulfillPromise(_pendingRead, { value: flushed, done: false }); + $readableStreamClose(stream); + return; + } + } + + if (flushed?.byteLength) { + var requests = $getByIdDirectPrivate(reader, "readRequests"); + if (requests?.isNotEmpty()) { + $readableStreamFulfillReadRequest(stream, flushed, false); + $readableStreamClose(stream); + return; + } + + $putByIdDirectPrivate(stream, "state", $streamReadable); + this.$pull = () => { + var thisResult = $createFulfilledPromise({ + value: flushed, + done: false, + }); + flushed = undefined; + $readableStreamClose(stream); + stream = undefined; + return thisResult; + }; + } else if (this._pendingRead) { + var read = this._pendingRead; + this._pendingRead = undefined; + $putByIdDirectPrivate(this, "pull", $noopDoneFunction); + $fulfillPromise(read, { value: undefined, done: true }); + } + + $readableStreamClose(stream); +} + +export function onFlushDirectStream() { + var stream = this.$controlledReadableStream; + var reader = $getByIdDirectPrivate(stream, "reader"); + if (!reader || !$isReadableStreamDefaultReader(reader)) { + return; + } + + var _pendingRead = this._pendingRead; + this._pendingRead = undefined; + if (_pendingRead && $isPromise(_pendingRead)) { + var flushed = this.$sink.flush(); + if (flushed?.byteLength) { + this._pendingRead = $getByIdDirectPrivate(stream, "readRequests")?.shift(); + $fulfillPromise(_pendingRead, { value: flushed, done: false }); + } else { + this._pendingRead = _pendingRead; + } + } else if ($getByIdDirectPrivate(stream, "readRequests")?.isNotEmpty()) { + var flushed = this.$sink.flush(); + if (flushed?.byteLength) { + $readableStreamFulfillReadRequest(stream, flushed, false); + } + } else if (this._deferFlush === -1) { + this._deferFlush = 1; + } +} + +export function createTextStream(highWaterMark) { + var sink; + var array = []; + var hasString = false; + var hasBuffer = false; + var rope = ""; + var estimatedLength = $toLength(0); + var capability = $newPromiseCapability(Promise); + var calledDone = false; + + sink = { + start() {}, + write(chunk) { + if (typeof chunk === "string") { + var chunkLength = $toLength(chunk.length); + if (chunkLength > 0) { + rope += chunk; + hasString = true; + // TODO: utf16 byte length + estimatedLength += chunkLength; + } + + return chunkLength; + } + + if (!chunk || !($ArrayBuffer.$isView(chunk) || chunk instanceof $ArrayBuffer)) { + $throwTypeError("Expected text, ArrayBuffer or ArrayBufferView"); + } + + const byteLength = $toLength(chunk.byteLength); + if (byteLength > 0) { + hasBuffer = true; + if (rope.length > 0) { + $arrayPush(array, rope, chunk); + rope = ""; + } else { + $arrayPush(array, chunk); + } + } + estimatedLength += byteLength; + return byteLength; + }, + + flush() { + return 0; + }, + + end() { + if (calledDone) { + return ""; + } + return sink.fulfill(); + }, + + fulfill() { + calledDone = true; + const result = sink.finishInternal(); + + $fulfillPromise(capability.$promise, result); + return result; + }, + + finishInternal() { + if (!hasString && !hasBuffer) { + return ""; + } + + if (hasString && !hasBuffer) { + return rope; + } + + if (hasBuffer && !hasString) { + return new globalThis.TextDecoder().decode($Bun.concatArrayBuffers(array)); + } + + // worst case: mixed content + + var arrayBufferSink = new $Bun.ArrayBufferSink(); + arrayBufferSink.start({ + highWaterMark: estimatedLength, + asUint8Array: true, + }); + for (let item of array) { + arrayBufferSink.write(item); + } + array.length = 0; + if (rope.length > 0) { + arrayBufferSink.write(rope); + rope = ""; + } + + // TODO: use builtin + return new globalThis.TextDecoder().decode(arrayBufferSink.end()); + }, + + close() { + try { + if (!calledDone) { + calledDone = true; + sink.fulfill(); + } + } catch (e) {} + }, + }; + + return [sink, capability]; +} + +export function initializeTextStream(underlyingSource, highWaterMark) { + var [sink, closingPromise] = $createTextStream(highWaterMark); + + var controller = { + $underlyingSource: underlyingSource, + $pull: $onPullDirectStream, + $controlledReadableStream: this, + $sink: sink, + close: $onCloseDirectStream, + write: sink.write, + error: $handleDirectStreamError, + end: $onCloseDirectStream, + $close: $onCloseDirectStream, + flush: $onFlushDirectStream, + _pendingRead: undefined, + _deferClose: 0, + _deferFlush: 0, + _deferCloseReason: undefined, + _handleError: undefined, + }; + + $putByIdDirectPrivate(this, "readableStreamController", controller); + $putByIdDirectPrivate(this, "underlyingSource", undefined); + $putByIdDirectPrivate(this, "start", undefined); + return closingPromise; +} + +export function initializeArrayStream(underlyingSource, highWaterMark) { + var array = []; + var closingPromise = $newPromiseCapability(Promise); + var calledDone = false; + + function fulfill() { + calledDone = true; + closingPromise.$resolve.$call(undefined, array); + return array; + } + + var sink = { + start() {}, + write(chunk) { + $arrayPush(array, chunk); + return chunk.byteLength || chunk.length; + }, + + flush() { + return 0; + }, + + end() { + if (calledDone) { + return []; + } + return fulfill(); + }, + + close() { + if (!calledDone) { + fulfill(); + } + }, + }; + + var controller = { + $underlyingSource: underlyingSource, + $pull: $onPullDirectStream, + $controlledReadableStream: this, + $sink: sink, + close: $onCloseDirectStream, + write: sink.write, + error: $handleDirectStreamError, + end: $onCloseDirectStream, + $close: $onCloseDirectStream, + flush: $onFlushDirectStream, + _pendingRead: undefined, + _deferClose: 0, + _deferFlush: 0, + _deferCloseReason: undefined, + _handleError: undefined, + }; + + $putByIdDirectPrivate(this, "readableStreamController", controller); + $putByIdDirectPrivate(this, "underlyingSource", undefined); + $putByIdDirectPrivate(this, "start", undefined); + return closingPromise; +} + +export function initializeArrayBufferStream(underlyingSource, highWaterMark) { + // This is the fallback implementation for direct streams + // When we don't know what the destination type is + // We assume it is a Uint8Array. + + var opts = + highWaterMark && typeof highWaterMark === "number" + ? { highWaterMark, stream: true, asUint8Array: true } + : { stream: true, asUint8Array: true }; + var sink = new $Bun.ArrayBufferSink(); + sink.start(opts); + + var controller = { + $underlyingSource: underlyingSource, + $pull: $onPullDirectStream, + $controlledReadableStream: this, + $sink: sink, + close: $onCloseDirectStream, + write: sink.write.bind(sink), + error: $handleDirectStreamError, + end: $onCloseDirectStream, + $close: $onCloseDirectStream, + flush: $onFlushDirectStream, + _pendingRead: undefined, + _deferClose: 0, + _deferFlush: 0, + _deferCloseReason: undefined, + _handleError: undefined, + }; + + $putByIdDirectPrivate(this, "readableStreamController", controller); + $putByIdDirectPrivate(this, "underlyingSource", undefined); + $putByIdDirectPrivate(this, "start", undefined); +} + +export function readableStreamError(stream, error) { + $assert($isReadableStream(stream)); + $assert($getByIdDirectPrivate(stream, "state") === $streamReadable); + $putByIdDirectPrivate(stream, "state", $streamErrored); + $putByIdDirectPrivate(stream, "storedError", error); + + const reader = $getByIdDirectPrivate(stream, "reader"); + + if (!reader) return; + + if ($isReadableStreamDefaultReader(reader)) { + const requests = $getByIdDirectPrivate(reader, "readRequests"); + $putByIdDirectPrivate(reader, "readRequests", $createFIFO()); + for (var request = requests.shift(); request; request = requests.shift()) $rejectPromise(request, error); + } else { + $assert($isReadableStreamBYOBReader(reader)); + const requests = $getByIdDirectPrivate(reader, "readIntoRequests"); + $putByIdDirectPrivate(reader, "readIntoRequests", $createFIFO()); + for (var request = requests.shift(); request; request = requests.shift()) $rejectPromise(request, error); + } + + $getByIdDirectPrivate(reader, "closedPromiseCapability").$reject.$call(undefined, error); + const promise = $getByIdDirectPrivate(reader, "closedPromiseCapability").$promise; + $markPromiseAsHandled(promise); +} + +export function readableStreamDefaultControllerShouldCallPull(controller) { + const stream = $getByIdDirectPrivate(controller, "controlledReadableStream"); + + if (!$readableStreamDefaultControllerCanCloseOrEnqueue(controller)) return false; + if (!($getByIdDirectPrivate(controller, "started") === 1)) return false; + if ( + (!$isReadableStreamLocked(stream) || + !$getByIdDirectPrivate($getByIdDirectPrivate(stream, "reader"), "readRequests")?.isNotEmpty()) && + $readableStreamDefaultControllerGetDesiredSize(controller) <= 0 + ) + return false; + const desiredSize = $readableStreamDefaultControllerGetDesiredSize(controller); + $assert(desiredSize !== null); + return desiredSize > 0; +} + +export function readableStreamDefaultControllerCallPullIfNeeded(controller) { + // FIXME: use $readableStreamDefaultControllerShouldCallPull + const stream = $getByIdDirectPrivate(controller, "controlledReadableStream"); + + if (!$readableStreamDefaultControllerCanCloseOrEnqueue(controller)) return; + if (!($getByIdDirectPrivate(controller, "started") === 1)) return; + if ( + (!$isReadableStreamLocked(stream) || + !$getByIdDirectPrivate($getByIdDirectPrivate(stream, "reader"), "readRequests")?.isNotEmpty()) && + $readableStreamDefaultControllerGetDesiredSize(controller) <= 0 + ) + return; + + if ($getByIdDirectPrivate(controller, "pulling")) { + $putByIdDirectPrivate(controller, "pullAgain", true); + return; + } + + $assert(!$getByIdDirectPrivate(controller, "pullAgain")); + $putByIdDirectPrivate(controller, "pulling", true); + + $getByIdDirectPrivate(controller, "pullAlgorithm") + .$call(undefined) + .$then( + function () { + $putByIdDirectPrivate(controller, "pulling", false); + if ($getByIdDirectPrivate(controller, "pullAgain")) { + $putByIdDirectPrivate(controller, "pullAgain", false); + + $readableStreamDefaultControllerCallPullIfNeeded(controller); + } + }, + function (error) { + $readableStreamDefaultControllerError(controller, error); + }, + ); +} + +export function isReadableStreamLocked(stream) { + $assert($isReadableStream(stream)); + return !!$getByIdDirectPrivate(stream, "reader"); +} + +export function readableStreamDefaultControllerGetDesiredSize(controller) { + const stream = $getByIdDirectPrivate(controller, "controlledReadableStream"); + const state = $getByIdDirectPrivate(stream, "state"); + + if (state === $streamErrored) return null; + if (state === $streamClosed) return 0; + + return $getByIdDirectPrivate(controller, "strategy").highWaterMark - $getByIdDirectPrivate(controller, "queue").size; +} + +export function readableStreamReaderGenericCancel(reader, reason) { + const stream = $getByIdDirectPrivate(reader, "ownerReadableStream"); + $assert(!!stream); + return $readableStreamCancel(stream, reason); +} + +export function readableStreamCancel(stream, reason) { + $putByIdDirectPrivate(stream, "disturbed", true); + const state = $getByIdDirectPrivate(stream, "state"); + if (state === $streamClosed) return Promise.$resolve(); + if (state === $streamErrored) return Promise.$reject($getByIdDirectPrivate(stream, "storedError")); + $readableStreamClose(stream); + + var controller = $getByIdDirectPrivate(stream, "readableStreamController"); + var cancel = controller.$cancel; + if (cancel) { + return cancel(controller, reason).$then(function () {}); + } + + var close = controller.close; + if (close) { + return Promise.$resolve(controller.close(reason)); + } + + $throwTypeError("ReadableStreamController has no cancel or close method"); +} + +export function readableStreamDefaultControllerCancel(controller, reason) { + $putByIdDirectPrivate(controller, "queue", $newQueue()); + return $getByIdDirectPrivate(controller, "cancelAlgorithm").$call(undefined, reason); +} + +export function readableStreamDefaultControllerPull(controller) { + var queue = $getByIdDirectPrivate(controller, "queue"); + if (queue.content.isNotEmpty()) { + const chunk = $dequeueValue(queue); + if ($getByIdDirectPrivate(controller, "closeRequested") && queue.content.isEmpty()) + $readableStreamClose($getByIdDirectPrivate(controller, "controlledReadableStream")); + else $readableStreamDefaultControllerCallPullIfNeeded(controller); + + return $createFulfilledPromise({ value: chunk, done: false }); + } + const pendingPromise = $readableStreamAddReadRequest($getByIdDirectPrivate(controller, "controlledReadableStream")); + $readableStreamDefaultControllerCallPullIfNeeded(controller); + return pendingPromise; +} + +export function readableStreamDefaultControllerClose(controller) { + $assert($readableStreamDefaultControllerCanCloseOrEnqueue(controller)); + $putByIdDirectPrivate(controller, "closeRequested", true); + if ($getByIdDirectPrivate(controller, "queue")?.content?.isEmpty()) + $readableStreamClose($getByIdDirectPrivate(controller, "controlledReadableStream")); +} + +export function readableStreamClose(stream) { + $assert($getByIdDirectPrivate(stream, "state") === $streamReadable); + $putByIdDirectPrivate(stream, "state", $streamClosed); + if (!$getByIdDirectPrivate(stream, "reader")) return; + + if ($isReadableStreamDefaultReader($getByIdDirectPrivate(stream, "reader"))) { + const requests = $getByIdDirectPrivate($getByIdDirectPrivate(stream, "reader"), "readRequests"); + if (requests.isNotEmpty()) { + $putByIdDirectPrivate($getByIdDirectPrivate(stream, "reader"), "readRequests", $createFIFO()); + + for (var request = requests.shift(); request; request = requests.shift()) + $fulfillPromise(request, { value: undefined, done: true }); + } + } + + $getByIdDirectPrivate($getByIdDirectPrivate(stream, "reader"), "closedPromiseCapability").$resolve.$call(); +} + +export function readableStreamFulfillReadRequest(stream, chunk, done) { + const readRequest = $getByIdDirectPrivate($getByIdDirectPrivate(stream, "reader"), "readRequests").shift(); + $fulfillPromise(readRequest, { value: chunk, done: done }); +} + +export function readableStreamDefaultControllerEnqueue(controller, chunk) { + const stream = $getByIdDirectPrivate(controller, "controlledReadableStream"); + // this is checked by callers + $assert($readableStreamDefaultControllerCanCloseOrEnqueue(controller)); + + if ( + $isReadableStreamLocked(stream) && + $getByIdDirectPrivate($getByIdDirectPrivate(stream, "reader"), "readRequests")?.isNotEmpty() + ) { + $readableStreamFulfillReadRequest(stream, chunk, false); + $readableStreamDefaultControllerCallPullIfNeeded(controller); + return; + } + + try { + let chunkSize = 1; + if ($getByIdDirectPrivate(controller, "strategy").size !== undefined) + chunkSize = $getByIdDirectPrivate(controller, "strategy").size(chunk); + $enqueueValueWithSize($getByIdDirectPrivate(controller, "queue"), chunk, chunkSize); + } catch (error) { + $readableStreamDefaultControllerError(controller, error); + throw error; + } + $readableStreamDefaultControllerCallPullIfNeeded(controller); +} + +export function readableStreamDefaultReaderRead(reader) { + const stream = $getByIdDirectPrivate(reader, "ownerReadableStream"); + $assert(!!stream); + const state = $getByIdDirectPrivate(stream, "state"); + + $putByIdDirectPrivate(stream, "disturbed", true); + if (state === $streamClosed) return $createFulfilledPromise({ value: undefined, done: true }); + if (state === $streamErrored) return Promise.$reject($getByIdDirectPrivate(stream, "storedError")); + $assert(state === $streamReadable); + + return $getByIdDirectPrivate(stream, "readableStreamController").$pull( + $getByIdDirectPrivate(stream, "readableStreamController"), + ); +} + +export function readableStreamAddReadRequest(stream) { + $assert($isReadableStreamDefaultReader($getByIdDirectPrivate(stream, "reader"))); + $assert($getByIdDirectPrivate(stream, "state") == $streamReadable); + + const readRequest = $newPromise(); + + $getByIdDirectPrivate($getByIdDirectPrivate(stream, "reader"), "readRequests").push(readRequest); + + return readRequest; +} + +export function isReadableStreamDisturbed(stream) { + $assert($isReadableStream(stream)); + return $getByIdDirectPrivate(stream, "disturbed"); +} + +export function readableStreamReaderGenericRelease(reader) { + $assert(!!$getByIdDirectPrivate(reader, "ownerReadableStream")); + $assert($getByIdDirectPrivate($getByIdDirectPrivate(reader, "ownerReadableStream"), "reader") === reader); + + if ($getByIdDirectPrivate($getByIdDirectPrivate(reader, "ownerReadableStream"), "state") === $streamReadable) + $getByIdDirectPrivate(reader, "closedPromiseCapability").$reject.$call( + undefined, + $makeTypeError("releasing lock of reader whose stream is still in readable state"), + ); + else + $putByIdDirectPrivate(reader, "closedPromiseCapability", { + $promise: $newHandledRejectedPromise($makeTypeError("reader released lock")), + }); + + const promise = $getByIdDirectPrivate(reader, "closedPromiseCapability").$promise; + $markPromiseAsHandled(promise); + $putByIdDirectPrivate($getByIdDirectPrivate(reader, "ownerReadableStream"), "reader", undefined); + $putByIdDirectPrivate(reader, "ownerReadableStream", undefined); +} + +export function readableStreamDefaultControllerCanCloseOrEnqueue(controller) { + return ( + !$getByIdDirectPrivate(controller, "closeRequested") && + $getByIdDirectPrivate($getByIdDirectPrivate(controller, "controlledReadableStream"), "state") === $streamReadable + ); +} + +export function lazyLoadStream(stream, autoAllocateChunkSize) { + var nativeType = $getByIdDirectPrivate(stream, "bunNativeType"); + var nativePtr = $getByIdDirectPrivate(stream, "bunNativePtr"); + var Prototype = $lazyStreamPrototypeMap.$get(nativeType); + if (Prototype === undefined) { + var [pull, start, cancel, setClose, deinit, setRefOrUnref, drain] = $lazyLoad(nativeType); + var closer = [false]; + var handleResult; + function handleNativeReadableStreamPromiseResult(val) { + var { c, v } = this; + this.c = undefined; + this.v = undefined; + handleResult(val, c, v); + } + + function callClose(controller) { + try { + controller.close(); + } catch (e) { + globalThis.reportError(e); + } + } + + handleResult = function handleResult(result, controller, view) { + if (result && $isPromise(result)) { + return result.then( + handleNativeReadableStreamPromiseResult.bind({ + c: controller, + v: view, + }), + err => controller.error(err), + ); + } else if (typeof result === "number") { + if (view && view.byteLength === result && view.buffer === controller.byobRequest?.view?.buffer) { + controller.byobRequest.respondWithNewView(view); + } else { + controller.byobRequest.respond(result); + } + } else if (result.constructor === $Uint8Array) { + controller.enqueue(result); + } + + if (closer[0] || result === false) { + $enqueueJob(callClose, controller); + closer[0] = false; + } + }; + + function createResult(tag, controller, view, closer) { + closer[0] = false; + + var result; + try { + result = pull(tag, view, closer); + } catch (err) { + return controller.error(err); + } + + return handleResult(result, controller, view); + } + + const registry = deinit ? new FinalizationRegistry(deinit) : null; + Prototype = class NativeReadableStreamSource { + constructor(tag, autoAllocateChunkSize, drainValue) { + this.#tag = tag; + this.#cancellationToken = {}; + this.pull = this.#pull.bind(this); + this.cancel = this.#cancel.bind(this); + this.autoAllocateChunkSize = autoAllocateChunkSize; + + if (drainValue !== undefined) { + this.start = controller => { + controller.enqueue(drainValue); + }; + } + + if (registry) { + registry.register(this, tag, this.#cancellationToken); + } + } + + #cancellationToken; + pull; + cancel; + start; + + #tag; + type = "bytes"; + autoAllocateChunkSize = 0; + + static startSync = start; + + #pull(controller) { + var tag = this.#tag; + + if (!tag) { + controller.close(); + return; + } + + createResult(tag, controller, controller.byobRequest.view, closer); + } + + #cancel(reason) { + var tag = this.#tag; + + registry && registry.unregister(this.#cancellationToken); + setRefOrUnref && setRefOrUnref(tag, false); + cancel(tag, reason); + } + static deinit = deinit; + static drain = drain; + }; + $lazyStreamPrototypeMap.$set(nativeType, Prototype); + } + + const chunkSize = Prototype.startSync(nativePtr, autoAllocateChunkSize); + var drainValue; + const { drain: drainFn, deinit: deinitFn } = Prototype; + if (drainFn) { + drainValue = drainFn(nativePtr); + } + + // empty file, no need for native back-and-forth on this + if (chunkSize === 0) { + deinit && nativePtr && $enqueueJob(deinit, nativePtr); + + if ((drainValue?.byteLength ?? 0) > 0) { + return { + start(controller) { + controller.enqueue(drainValue); + controller.close(); + }, + type: "bytes", + }; + } + + return { + start(controller) { + controller.close(); + }, + type: "bytes", + }; + } + + return new Prototype(nativePtr, chunkSize, drainValue); +} + +export function readableStreamIntoArray(stream) { + var reader = stream.getReader(); + var manyResult = reader.readMany(); + + async function processManyResult(result) { + if (result.done) { + return []; + } + + var chunks = result.value || []; + + while (true) { + var thisResult = await reader.read(); + if (thisResult.done) { + break; + } + chunks = chunks.concat(thisResult.value); + } + + return chunks; + } + + if (manyResult && $isPromise(manyResult)) { + return manyResult.$then(processManyResult); + } + + return processManyResult(manyResult); +} + +export function readableStreamIntoText(stream) { + const [textStream, closer] = $createTextStream($getByIdDirectPrivate(stream, "highWaterMark")); + const prom = $readStreamIntoSink(stream, textStream, false); + if (prom && $isPromise(prom)) { + return Promise.$resolve(prom).$then(closer.$promise); + } + return closer.$promise; +} + +export function readableStreamToArrayBufferDirect(stream, underlyingSource) { + var sink = new $Bun.ArrayBufferSink(); + $putByIdDirectPrivate(stream, "underlyingSource", undefined); + var highWaterMark = $getByIdDirectPrivate(stream, "highWaterMark"); + sink.start(highWaterMark ? { highWaterMark } : {}); + var capability = $newPromiseCapability(Promise); + var ended = false; + var pull = underlyingSource.pull; + var close = underlyingSource.close; + + var controller = { + start() {}, + close(reason) { + if (!ended) { + ended = true; + if (close) { + close(); + } + + $fulfillPromise(capability.$promise, sink.end()); + } + }, + end() { + if (!ended) { + ended = true; + if (close) { + close(); + } + $fulfillPromise(capability.$promise, sink.end()); + } + }, + flush() { + return 0; + }, + write: sink.write.bind(sink), + }; + + var didError = false; + try { + const firstPull = pull(controller); + if (firstPull && $isObject(firstPull) && $isPromise(firstPull)) { + return (async function (controller, promise, pull) { + while (!ended) { + await pull(controller); + } + return await promise; + })(controller, promise, pull); + } + + return capability.$promise; + } catch (e) { + didError = true; + $readableStreamError(stream, e); + return Promise.$reject(e); + } finally { + if (!didError && stream) $readableStreamClose(stream); + controller = close = sink = pull = stream = undefined; + } +} + +export async function readableStreamToTextDirect(stream, underlyingSource) { + const capability = $initializeTextStream.$call(stream, underlyingSource, undefined); + var reader = stream.getReader(); + + while ($getByIdDirectPrivate(stream, "state") === $streamReadable) { + var thisResult = await reader.read(); + if (thisResult.done) { + break; + } + } + + try { + reader.releaseLock(); + } catch (e) {} + reader = undefined; + stream = undefined; + + return capability.$promise; +} + +export async function readableStreamToArrayDirect(stream, underlyingSource) { + const capability = $initializeArrayStream.$call(stream, underlyingSource, undefined); + underlyingSource = undefined; + var reader = stream.getReader(); + try { + while ($getByIdDirectPrivate(stream, "state") === $streamReadable) { + var thisResult = await reader.read(); + if (thisResult.done) { + break; + } + } + + try { + reader.releaseLock(); + } catch (e) {} + reader = undefined; + + return Promise.$resolve(capability.$promise); + } catch (e) { + throw e; + } finally { + stream = undefined; + reader = undefined; + } +} + +export function readableStreamDefineLazyIterators(prototype) { + var asyncIterator = globalThis.Symbol.asyncIterator; + + var ReadableStreamAsyncIterator = async function* ReadableStreamAsyncIterator(stream, preventCancel) { + var reader = stream.getReader(); + var deferredError; + try { + while (true) { + var done, value; + const firstResult = reader.readMany(); + if ($isPromise(firstResult)) { + ({ done, value } = await firstResult); + } else { + ({ done, value } = firstResult); + } + + if (done) { + return; + } + yield* value; + } + } catch (e) { + deferredError = e; + } finally { + reader.releaseLock(); + + if (!preventCancel) { + stream.cancel(deferredError); + } + + if (deferredError) { + throw deferredError; + } + } + }; + var createAsyncIterator = function asyncIterator() { + return ReadableStreamAsyncIterator(this, false); + }; + var createValues = function values({ preventCancel = false } = { preventCancel: false }) { + return ReadableStreamAsyncIterator(this, preventCancel); + }; + $Object.$defineProperty(prototype, asyncIterator, { value: createAsyncIterator }); + $Object.$defineProperty(prototype, "values", { value: createValues }); + return prototype; +} |