// @ts-nocheck /* * Copyright (C) 2015 Canon Inc. All rights reserved. * Copyright (C) 2015 Igalia. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions * are met: * 1. Redistributions of source code must retain the above copyright * notice, this list of conditions and the following disclaimer. * 2. Redistributions in binary form must reproduce the above copyright * notice, this list of conditions and the following disclaimer in the * documentation and/or other materials provided with the distribution. * * THIS SOFTWARE IS PROVIDED BY APPLE INC. ``AS IS'' AND ANY * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL APPLE INC. OR * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY * OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. */ // @internal export function readableStreamReaderGenericInitialize(reader, stream) { $putByIdDirectPrivate(reader, "ownerReadableStream", stream); $putByIdDirectPrivate(stream, "reader", reader); if ($getByIdDirectPrivate(stream, "state") === $streamReadable) $putByIdDirectPrivate(reader, "closedPromiseCapability", $newPromiseCapability(Promise)); else if ($getByIdDirectPrivate(stream, "state") === $streamClosed) $putByIdDirectPrivate(reader, "closedPromiseCapability", { $promise: Promise.$resolve(), }); else { $assert($getByIdDirectPrivate(stream, "state") === $streamErrored); $putByIdDirectPrivate(reader, "closedPromiseCapability", { $promise: $newHandledRejectedPromise($getByIdDirectPrivate(stream, "storedError")), }); } } export function privateInitializeReadableStreamDefaultController(this, stream, underlyingSource, size, highWaterMark) { if (!$isReadableStream(stream)) throw new TypeError("ReadableStreamDefaultController needs a ReadableStream"); // readableStreamController is initialized with null value. if ($getByIdDirectPrivate(stream, "readableStreamController") !== null) throw new TypeError("ReadableStream already has a controller"); $putByIdDirectPrivate(this, "controlledReadableStream", stream); $putByIdDirectPrivate(this, "underlyingSource", underlyingSource); $putByIdDirectPrivate(this, "queue", $newQueue()); $putByIdDirectPrivate(this, "started", -1); $putByIdDirectPrivate(this, "closeRequested", false); $putByIdDirectPrivate(this, "pullAgain", false); $putByIdDirectPrivate(this, "pulling", false); $putByIdDirectPrivate(this, "strategy", $validateAndNormalizeQueuingStrategy(size, highWaterMark)); return this; } export function readableStreamDefaultControllerError(controller, error) { const stream = $getByIdDirectPrivate(controller, "controlledReadableStream"); if ($getByIdDirectPrivate(stream, "state") !== $streamReadable) return; $putByIdDirectPrivate(controller, "queue", $newQueue()); $readableStreamError(stream, error); } export function readableStreamPipeTo(stream, sink) { $assert($isReadableStream(stream)); const reader = new ReadableStreamDefaultReader(stream); $getByIdDirectPrivate(reader, "closedPromiseCapability").$promise.$then( () => {}, e => { sink.error(e); }, ); function doPipe() { $readableStreamDefaultReaderRead(reader).$then( function (result) { if (result.done) { sink.close(); return; } try { sink.enqueue(result.value); } catch (e) { sink.error("ReadableStream chunk enqueueing in the sink failed"); return; } doPipe(); }, function (e) { sink.error(e); }, ); } doPipe(); } export function acquireReadableStreamDefaultReader(stream) { var start = $getByIdDirectPrivate(stream, "start"); if (start) { start.$call(stream); } return new ReadableStreamDefaultReader(stream); } // https://streams.spec.whatwg.org/#set-up-readable-stream-default-controller, starting from step 6. // The other part is implemented in privateInitializeReadableStreamDefaultController. export function setupReadableStreamDefaultController( stream, underlyingSource, size, highWaterMark, startMethod, pullMethod, cancelMethod, ) { const controller = new ReadableStreamDefaultController( stream, underlyingSource, size, highWaterMark, $isReadableStream, ); const pullAlgorithm = () => $promiseInvokeOrNoopMethod(underlyingSource, pullMethod, [controller]); const cancelAlgorithm = reason => $promiseInvokeOrNoopMethod(underlyingSource, cancelMethod, [reason]); $putByIdDirectPrivate(controller, "pullAlgorithm", pullAlgorithm); $putByIdDirectPrivate(controller, "cancelAlgorithm", cancelAlgorithm); $putByIdDirectPrivate(controller, "pull", $readableStreamDefaultControllerPull); $putByIdDirectPrivate(controller, "cancel", $readableStreamDefaultControllerCancel); $putByIdDirectPrivate(stream, "readableStreamController", controller); $readableStreamDefaultControllerStart(controller); } export function createReadableStreamController(stream, underlyingSource, strategy) { const type = underlyingSource.type; const typeString = $toString(type); if (typeString === "bytes") { // if (!$readableByteStreamAPIEnabled()) // $throwTypeError("ReadableByteStreamController is not implemented"); if (strategy.highWaterMark === undefined) strategy.highWaterMark = 0; if (strategy.size !== undefined) $throwRangeError("Strategy for a ReadableByteStreamController cannot have a size"); $putByIdDirectPrivate( stream, "readableStreamController", new ReadableByteStreamController(stream, underlyingSource, strategy.highWaterMark, $isReadableStream), ); } else if (typeString === "direct") { var highWaterMark = strategy?.highWaterMark; $initializeArrayBufferStream.$call(stream, underlyingSource, highWaterMark); } else if (type === undefined) { if (strategy.highWaterMark === undefined) strategy.highWaterMark = 1; $setupReadableStreamDefaultController( stream, underlyingSource, strategy.size, strategy.highWaterMark, underlyingSource.start, underlyingSource.pull, underlyingSource.cancel, ); } else throw new RangeError("Invalid type for underlying source"); } export function readableStreamDefaultControllerStart(controller) { if ($getByIdDirectPrivate(controller, "started") !== -1) return; const underlyingSource = $getByIdDirectPrivate(controller, "underlyingSource"); const startMethod = underlyingSource.start; $putByIdDirectPrivate(controller, "started", 0); $promiseInvokeOrNoopMethodNoCatch(underlyingSource, startMethod, [controller]).$then( () => { $putByIdDirectPrivate(controller, "started", 1); $assert(!$getByIdDirectPrivate(controller, "pulling")); $assert(!$getByIdDirectPrivate(controller, "pullAgain")); $readableStreamDefaultControllerCallPullIfNeeded(controller); }, error => { $readableStreamDefaultControllerError(controller, error); }, ); } // FIXME: Replace readableStreamPipeTo by below function. // This method implements the latest https://streams.spec.whatwg.org/#readable-stream-pipe-to. export function readableStreamPipeToWritableStream( source, destination, preventClose, preventAbort, preventCancel, signal, ) { // const isDirectStream = !!$getByIdDirectPrivate(source, "start"); $assert($isReadableStream(source)); $assert($isWritableStream(destination)); $assert(!$isReadableStreamLocked(source)); $assert(!$isWritableStreamLocked(destination)); $assert(signal === undefined || $isAbortSignal(signal)); if ($getByIdDirectPrivate(source, "underlyingByteSource") !== undefined) return Promise.$reject("Piping to a readable bytestream is not supported"); let pipeState: any = { source: source, destination: destination, preventAbort: preventAbort, preventCancel: preventCancel, preventClose: preventClose, signal: signal, }; pipeState.reader = $acquireReadableStreamDefaultReader(source); pipeState.writer = $acquireWritableStreamDefaultWriter(destination); $putByIdDirectPrivate(source, "disturbed", true); pipeState.finalized = false; pipeState.shuttingDown = false; pipeState.promiseCapability = $newPromiseCapability(Promise); pipeState.pendingReadPromiseCapability = $newPromiseCapability(Promise); pipeState.pendingReadPromiseCapability.$resolve.$call(); pipeState.pendingWritePromise = Promise.$resolve(); if (signal !== undefined) { const algorithm = reason => { if (pipeState.finalized) return; $pipeToShutdownWithAction( pipeState, () => { const shouldAbortDestination = !pipeState.preventAbort && $getByIdDirectPrivate(pipeState.destination, "state") === "writable"; const promiseDestination = shouldAbortDestination ? $writableStreamAbort(pipeState.destination, reason) : Promise.$resolve(); const shouldAbortSource = !pipeState.preventCancel && $getByIdDirectPrivate(pipeState.source, "state") === $streamReadable; const promiseSource = shouldAbortSource ? $readableStreamCancel(pipeState.source, reason) : Promise.$resolve(); let promiseCapability = $newPromiseCapability(Promise); let shouldWait = true; let handleResolvedPromise = () => { if (shouldWait) { shouldWait = false; return; } promiseCapability.$resolve.$call(); }; let handleRejectedPromise = e => { promiseCapability.$reject.$call(undefined, e); }; promiseDestination.$then(handleResolvedPromise, handleRejectedPromise); promiseSource.$then(handleResolvedPromise, handleRejectedPromise); return promiseCapability.$promise; }, reason, ); }; if ($whenSignalAborted(signal, algorithm)) return pipeState.promiseCapability.$promise; } $pipeToErrorsMustBePropagatedForward(pipeState); $pipeToErrorsMustBePropagatedBackward(pipeState); $pipeToClosingMustBePropagatedForward(pipeState); $pipeToClosingMustBePropagatedBackward(pipeState); $pipeToLoop(pipeState); return pipeState.promiseCapability.$promise; } export function pipeToLoop(pipeState) { if (pipeState.shuttingDown) return; $pipeToDoReadWrite(pipeState).$then(result => { if (result) $pipeToLoop(pipeState); }); } export function pipeToDoReadWrite(pipeState) { $assert(!pipeState.shuttingDown); pipeState.pendingReadPromiseCapability = $newPromiseCapability(Promise); $getByIdDirectPrivate(pipeState.writer, "readyPromise").$promise.$then( () => { if (pipeState.shuttingDown) { pipeState.pendingReadPromiseCapability.$resolve.$call(undefined, false); return; } $readableStreamDefaultReaderRead(pipeState.reader).$then( result => { const canWrite = !result.done && $getByIdDirectPrivate(pipeState.writer, "stream") !== undefined; pipeState.pendingReadPromiseCapability.$resolve.$call(undefined, canWrite); if (!canWrite) return; pipeState.pendingWritePromise = $writableStreamDefaultWriterWrite(pipeState.writer, result.value); }, e => { pipeState.pendingReadPromiseCapability.$resolve.$call(undefined, false); }, ); }, e => { pipeState.pendingReadPromiseCapability.$resolve.$call(undefined, false); }, ); return pipeState.pendingReadPromiseCapability.$promise; } export function pipeToErrorsMustBePropagatedForward(pipeState) { const action = () => { pipeState.pendingReadPromiseCapability.$resolve.$call(undefined, false); const error = $getByIdDirectPrivate(pipeState.source, "storedError"); if (!pipeState.preventAbort) { $pipeToShutdownWithAction(pipeState, () => $writableStreamAbort(pipeState.destination, error), error); return; } $pipeToShutdown(pipeState, error); }; if ($getByIdDirectPrivate(pipeState.source, "state") === $streamErrored) { action(); return; } $getByIdDirectPrivate(pipeState.reader, "closedPromiseCapability").$promise.$then(undefined, action); } export function pipeToErrorsMustBePropagatedBackward(pipeState) { const action = () => { const error = $getByIdDirectPrivate(pipeState.destination, "storedError"); if (!pipeState.preventCancel) { $pipeToShutdownWithAction(pipeState, () => $readableStreamCancel(pipeState.source, error), error); return; } $pipeToShutdown(pipeState, error); }; if ($getByIdDirectPrivate(pipeState.destination, "state") === "errored") { action(); return; } $getByIdDirectPrivate(pipeState.writer, "closedPromise").$promise.$then(undefined, action); } export function pipeToClosingMustBePropagatedForward(pipeState) { const action = () => { pipeState.pendingReadPromiseCapability.$resolve.$call(undefined, false); // const error = $getByIdDirectPrivate(pipeState.source, "storedError"); if (!pipeState.preventClose) { $pipeToShutdownWithAction(pipeState, () => $writableStreamDefaultWriterCloseWithErrorPropagation(pipeState.writer), ); return; } $pipeToShutdown(pipeState); }; if ($getByIdDirectPrivate(pipeState.source, "state") === $streamClosed) { action(); return; } $getByIdDirectPrivate(pipeState.reader, "closedPromiseCapability").$promise.$then(action, undefined); } export function pipeToClosingMustBePropagatedBackward(pipeState) { if ( !$writableStreamCloseQueuedOrInFlight(pipeState.destination) && $getByIdDirectPrivate(pipeState.destination, "state") !== "closed" ) return; // $assert no chunks have been read/written const error = new TypeError("closing is propagated backward"); if (!pipeState.preventCancel) { $pipeToShutdownWithAction(pipeState, () => $readableStreamCancel(pipeState.source, error), error); return; } $pipeToShutdown(pipeState, error); } export function pipeToShutdownWithAction(pipeState, action) { if (pipeState.shuttingDown) return; pipeState.shuttingDown = true; const hasError = arguments.length > 2; const error = arguments[2]; const finalize = () => { const promise = action(); promise.$then( () => { if (hasError) $pipeToFinalize(pipeState, error); else $pipeToFinalize(pipeState); }, e => { $pipeToFinalize(pipeState, e); }, ); }; if ( $getByIdDirectPrivate(pipeState.destination, "state") === "writable" && !$writableStreamCloseQueuedOrInFlight(pipeState.destination) ) { pipeState.pendingReadPromiseCapability.$promise.$then( () => { pipeState.pendingWritePromise.$then(finalize, finalize); }, e => $pipeToFinalize(pipeState, e), ); return; } finalize(); } export function pipeToShutdown(pipeState) { if (pipeState.shuttingDown) return; pipeState.shuttingDown = true; const hasError = arguments.length > 1; const error = arguments[1]; const finalize = () => { if (hasError) $pipeToFinalize(pipeState, error); else $pipeToFinalize(pipeState); }; if ( $getByIdDirectPrivate(pipeState.destination, "state") === "writable" && !$writableStreamCloseQueuedOrInFlight(pipeState.destination) ) { pipeState.pendingReadPromiseCapability.$promise.$then( () => { pipeState.pendingWritePromise.$then(finalize, finalize); }, e => $pipeToFinalize(pipeState, e), ); return; } finalize(); } export function pipeToFinalize(pipeState) { $writableStreamDefaultWriterRelease(pipeState.writer); $readableStreamReaderGenericRelease(pipeState.reader); // Instead of removing the abort algorithm as per spec, we make it a no-op which is equivalent. pipeState.finalized = true; if (arguments.length > 1) pipeState.promiseCapability.$reject.$call(undefined, arguments[1]); else pipeState.promiseCapability.$resolve.$call(); } export function readableStreamTee(stream, shouldClone) { $assert($isReadableStream(stream)); $assert(typeof shouldClone === "boolean"); var start_ = $getByIdDirectPrivate(stream, "start"); if (start_) { $putByIdDirectPrivate(stream, "start", undefined); start_(); } const reader = new $ReadableStreamDefaultReader(stream); const teeState = { closedOrErrored: false, canceled1: false, canceled2: false, reason1: undefined, reason2: undefined, }; teeState.cancelPromiseCapability = $newPromiseCapability(Promise); const pullFunction = $readableStreamTeePullFunction(teeState, reader, shouldClone); const branch1Source = {}; $putByIdDirectPrivate(branch1Source, "pull", pullFunction); $putByIdDirectPrivate(branch1Source, "cancel", $readableStreamTeeBranch1CancelFunction(teeState, stream)); const branch2Source = {}; $putByIdDirectPrivate(branch2Source, "pull", pullFunction); $putByIdDirectPrivate(branch2Source, "cancel", $readableStreamTeeBranch2CancelFunction(teeState, stream)); const branch1 = new $ReadableStream(branch1Source); const branch2 = new $ReadableStream(branch2Source); $getByIdDirectPrivate(reader, "closedPromiseCapability").$promise.$then(undefined, function (e) { if (teeState.closedOrErrored) return; $readableStreamDefaultControllerError(branch1.$readableStreamController, e); $readableStreamDefaultControllerError(branch2.$readableStreamController, e); teeState.closedOrErrored = true; if (!teeState.canceled1 || !teeState.canceled2) teeState.cancelPromiseCapability.$resolve.$call(); }); // Additional fields compared to the spec, as they are needed within pull/cancel functions. teeState.branch1 = branch1; teeState.branch2 = branch2; return [branch1, branch2]; } export function readableStreamTeePullFunction(teeState, reader, shouldClone) { return function () { Promise.prototype.$then.$call($readableStreamDefaultReaderRead(reader), function (result) { $assert($isObject(result)); $assert(typeof result.done === "boolean"); if (result.done && !teeState.closedOrErrored) { if (!teeState.canceled1) $readableStreamDefaultControllerClose(teeState.branch1.$readableStreamController); if (!teeState.canceled2) $readableStreamDefaultControllerClose(teeState.branch2.$readableStreamController); teeState.closedOrErrored = true; if (!teeState.canceled1 || !teeState.canceled2) teeState.cancelPromiseCapability.$resolve.$call(); } if (teeState.closedOrErrored) return; if (!teeState.canceled1) $readableStreamDefaultControllerEnqueue(teeState.branch1.$readableStreamController, result.value); if (!teeState.canceled2) $readableStreamDefaultControllerEnqueue( teeState.branch2.$readableStreamController, shouldClone ? $structuredCloneForStream(result.value) : result.value, ); }); }; } export function readableStreamTeeBranch1CancelFunction(teeState, stream) { return function (r) { teeState.canceled1 = true; teeState.reason1 = r; if (teeState.canceled2) { $readableStreamCancel(stream, [teeState.reason1, teeState.reason2]).$then( teeState.cancelPromiseCapability.$resolve, teeState.cancelPromiseCapability.$reject, ); } return teeState.cancelPromiseCapability.$promise; }; } export function readableStreamTeeBranch2CancelFunction(teeState, stream) { return function (r) { teeState.canceled2 = true; teeState.reason2 = r; if (teeState.canceled1) { $readableStreamCancel(stream, [teeState.reason1, teeState.reason2]).$then( teeState.cancelPromiseCapability.$resolve, teeState.cancelPromiseCapability.$reject, ); } return teeState.cancelPromiseCapability.$promise; }; } export function isReadableStream(stream) { // Spec tells to return true only if stream has a readableStreamController internal slot. // However, since it is a private slot, it cannot be checked using hasOwnProperty(). // Therefore, readableStreamController is initialized with null value. return $isObject(stream) && $getByIdDirectPrivate(stream, "readableStreamController") !== undefined; } export function isReadableStreamDefaultReader(reader) { // Spec tells to return true only if reader has a readRequests internal slot. // However, since it is a private slot, it cannot be checked using hasOwnProperty(). // Since readRequests is initialized with an empty array, the following test is ok. return $isObject(reader) && !!$getByIdDirectPrivate(reader, "readRequests"); } export function isReadableStreamDefaultController(controller) { // Spec tells to return true only if controller has an underlyingSource internal slot. // However, since it is a private slot, it cannot be checked using hasOwnProperty(). // underlyingSource is obtained in ReadableStream constructor: if undefined, it is set // to an empty object. Therefore, following test is ok. return $isObject(controller) && !!$getByIdDirectPrivate(controller, "underlyingSource"); } export function readDirectStream(stream, sink, underlyingSource) { $putByIdDirectPrivate(stream, "underlyingSource", undefined); $putByIdDirectPrivate(stream, "start", undefined); function close(stream, reason) { if (reason && underlyingSource?.cancel) { try { var prom = underlyingSource.cancel(reason); $markPromiseAsHandled(prom); } catch (e) {} underlyingSource = undefined; } if (stream) { $putByIdDirectPrivate(stream, "readableStreamController", undefined); $putByIdDirectPrivate(stream, "reader", undefined); if (reason) { $putByIdDirectPrivate(stream, "state", $streamErrored); $putByIdDirectPrivate(stream, "storedError", reason); } else { $putByIdDirectPrivate(stream, "state", $streamClosed); } stream = undefined; } } if (!underlyingSource.pull) { close(); return; } if (!$isCallable(underlyingSource.pull)) { close(); $throwTypeError("pull is not a function"); return; } $putByIdDirectPrivate(stream, "readableStreamController", sink); const highWaterMark = $getByIdDirectPrivate(stream, "highWaterMark"); sink.start({ highWaterMark: !highWaterMark || highWaterMark < 64 ? 64 : highWaterMark, }); $startDirectStream.$call(sink, stream, underlyingSource.pull, close); $putByIdDirectPrivate(stream, "reader", {}); var maybePromise = underlyingSource.pull(sink); sink = undefined; if (maybePromise && $isPromise(maybePromise)) { return maybePromise.$then(() => {}); } } $linkTimeConstant; export function assignToStream(stream, sink) { // The stream is either a direct stream or a "default" JS stream var underlyingSource = $getByIdDirectPrivate(stream, "underlyingSource"); // we know it's a direct stream when $underlyingSource is set if (underlyingSource) { try { return $readDirectStream(stream, sink, underlyingSource); } catch (e) { throw e; } finally { underlyingSource = undefined; stream = undefined; sink = undefined; } } return $readStreamIntoSink(stream, sink, true); } export async function readStreamIntoSink(stream, sink, isNative) { var didClose = false; var didThrow = false; try { var reader = stream.getReader(); var many = reader.readMany(); if (many && $isPromise(many)) { many = await many; } if (many.done) { didClose = true; return sink.end(); } var wroteCount = many.value.length; const highWaterMark = $getByIdDirectPrivate(stream, "highWaterMark"); if (isNative) $startDirectStream.$call(sink, stream, undefined, () => !didThrow && $markPromiseAsHandled(stream.cancel())); sink.start({ highWaterMark: highWaterMark || 0 }); for (var i = 0, values = many.value, length = many.value.length; i < length; i++) { sink.write(values[i]); } var streamState = $getByIdDirectPrivate(stream, "state"); if (streamState === $streamClosed) { didClose = true; return sink.end(); } while (true) { var { value, done } = await reader.read(); if (done) { didClose = true; return sink.end(); } sink.write(value); } } catch (e) { didThrow = true; try { reader = undefined; const prom = stream.cancel(e); $markPromiseAsHandled(prom); } catch (j) {} if (sink && !didClose) { didClose = true; try { sink.close(e); } catch (j) { throw new globalThis.AggregateError([e, j]); } } throw e; } finally { if (reader) { try { reader.releaseLock(); } catch (e) {} reader = undefined; } sink = undefined; var streamState = $getByIdDirectPrivate(stream, "state"); if (stream) { // make it easy for this to be GC'd // but don't do property transitions var readableStreamController = $getByIdDirectPrivate(stream, "readableStreamController"); if (readableStreamController) { if ($getByIdDirectPrivate(readableStreamController, "underlyingSource")) $putByIdDirectPrivate(readableStreamController, "underlyingSource", undefined); if ($getByIdDirectPrivate(readableStreamController, "controlledReadableStream")) $putByIdDirectPrivate(readableStreamController, "controlledReadableStream", undefined); $putByIdDirectPrivate(stream, "readableStreamController", null); if ($getByIdDirectPrivate(stream, "underlyingSource")) $putByIdDirectPrivate(stream, "underlyingSource", undefined); readableStreamController = undefined; } if (!didThrow && streamState !== $streamClosed && streamState !== $streamErrored) { $readableStreamClose(stream); } stream = undefined; } } } export function handleDirectStreamError(e) { var controller = this; var sink = controller.$sink; if (sink) { $putByIdDirectPrivate(controller, "sink", undefined); try { sink.close(e); } catch (f) {} } this.error = this.flush = this.write = this.close = this.end = $onReadableStreamDirectControllerClosed; if (typeof this.$underlyingSource.close === "function") { try { this.$underlyingSource.close.$call(this.$underlyingSource, e); } catch (e) {} } try { var pend = controller._pendingRead; if (pend) { controller._pendingRead = undefined; $rejectPromise(pend, e); } } catch (f) {} var stream = controller.$controlledReadableStream; if (stream) $readableStreamError(stream, e); } export function handleDirectStreamErrorReject(e) { $handleDirectStreamError.$call(this, e); return Promise.$reject(e); } export function onPullDirectStream(controller) { var stream = controller.$controlledReadableStream; if (!stream || $getByIdDirectPrivate(stream, "state") !== $streamReadable) return; // pull is in progress // this is a recursive call // ignore it if (controller._deferClose === -1) { return; } controller._deferClose = -1; controller._deferFlush = -1; var deferClose; var deferFlush; // Direct streams allow $pull to be called multiple times, unlike the spec. // Backpressure is handled by the destination, not by the underlying source. // In this case, we rely on the heuristic that repeatedly draining in the same tick // is bad for performance // this code is only run when consuming a direct stream from JS // without the HTTP server or anything else try { var result = controller.$underlyingSource.pull(controller); if (result && $isPromise(result)) { if (controller._handleError === undefined) { controller._handleError = $handleDirectStreamErrorReject.bind(controller); } Promise.prototype.catch.$call(result, controller._handleError); } } catch (e) { return $handleDirectStreamErrorReject.$call(controller, e); } finally { deferClose = controller._deferClose; deferFlush = controller._deferFlush; controller._deferFlush = controller._deferClose = 0; } var promiseToReturn; if (controller._pendingRead === undefined) { controller._pendingRead = promiseToReturn = $newPromise(); } else { promiseToReturn = $readableStreamAddReadRequest(stream); } // they called close during $pull() // we delay that if (deferClose === 1) { var reason = controller._deferCloseReason; controller._deferCloseReason = undefined; $onCloseDirectStream.$call(controller, reason); return promiseToReturn; } // not done, but they called flush() if (deferFlush === 1) { $onFlushDirectStream.$call(controller); } return promiseToReturn; } export function noopDoneFunction() { return Promise.$resolve({ value: undefined, done: true }); } export function onReadableStreamDirectControllerClosed(reason) { $throwTypeError("ReadableStreamDirectController is now closed"); } export function onCloseDirectStream(reason) { var stream = this.$controlledReadableStream; if (!stream || $getByIdDirectPrivate(stream, "state") !== $streamReadable) return; if (this._deferClose !== 0) { this._deferClose = 1; this._deferCloseReason = reason; return; } $putByIdDirectPrivate(stream, "state", $streamClosing); if (typeof this.$underlyingSource.close === "function") { try { this.$underlyingSource.close.$call(this.$underlyingSource, reason); } catch (e) {} } var flushed; try { flushed = this.$sink.end(); $putByIdDirectPrivate(this, "sink", undefined); } catch (e) { if (this._pendingRead) { var read = this._pendingRead; this._pendingRead = undefined; $rejectPromise(read, e); } $readableStreamError(stream, e); return; } this.error = this.flush = this.write = this.close = this.end = $onReadableStreamDirectControllerClosed; var reader = $getByIdDirectPrivate(stream, "reader"); if (reader && $isReadableStreamDefaultReader(reader)) { var _pendingRead = this._pendingRead; if (_pendingRead && $isPromise(_pendingRead) && flushed?.byteLength) { this._pendingRead = undefined; $fulfillPromise(_pendingRead, { value: flushed, done: false }); $readableStreamClose(stream); return; } } if (flushed?.byteLength) { var requests = $getByIdDirectPrivate(reader, "readRequests"); if (requests?.isNotEmpty()) { $readableStreamFulfillReadRequest(stream, flushed, false); $readableStreamClose(stream); return; } $putByIdDirectPrivate(stream, "state", $streamReadable); this.$pull = () => { var thisResult = $createFulfilledPromise({ value: flushed, done: false, }); flushed = undefined; $readableStreamClose(stream); stream = undefined; return thisResult; }; } else if (this._pendingRead) { var read = this._pendingRead; this._pendingRead = undefined; $putByIdDirectPrivate(this, "pull", $noopDoneFunction); $fulfillPromise(read, { value: undefined, done: true }); } $readableStreamClose(stream); } export function onFlushDirectStream() { var stream = this.$controlledReadableStream; var reader = $getByIdDirectPrivate(stream, "reader"); if (!reader || !$isReadableStreamDefaultReader(reader)) { return; } var _pendingRead = this._pendingRead; this._pendingRead = undefined; if (_pendingRead && $isPromise(_pendingRead)) { var flushed = this.$sink.flush(); if (flushed?.byteLength) { this._pendingRead = $getByIdDirectPrivate(stream, "readRequests")?.shift(); $fulfillPromise(_pendingRead, { value: flushed, done: false }); } else { this._pendingRead = _pendingRead; } } else if ($getByIdDirectPrivate(stream, "readRequests")?.isNotEmpty()) { var flushed = this.$sink.flush(); if (flushed?.byteLength) { $readableStreamFulfillReadRequest(stream, flushed, false); } } else if (this._deferFlush === -1) { this._deferFlush = 1; } } export function createTextStream(highWaterMark) { var sink; var array = []; var hasString = false; var hasBuffer = false; var rope = ""; var estimatedLength = $toLength(0); var capability = $newPromiseCapability(Promise); var calledDone = false; sink = { start() {}, write(chunk) { if (typeof chunk === "string") { var chunkLength = $toLength(chunk.length); if (chunkLength > 0) { rope += chunk; hasString = true; // TODO: utf16 byte length estimatedLength += chunkLength; } return chunkLength; } if (!chunk || !($ArrayBuffer.$isView(chunk) || chunk instanceof $ArrayBuffer)) { $throwTypeError("Expected text, ArrayBuffer or ArrayBufferView"); } const byteLength = $toLength(chunk.byteLength); if (byteLength > 0) { hasBuffer = true; if (rope.length > 0) { $arrayPush(array, rope, chunk); rope = ""; } else { $arrayPush(array, chunk); } } estimatedLength += byteLength; return byteLength; }, flush() { return 0; }, end() { if (calledDone) { return ""; } return sink.fulfill(); }, fulfill() { calledDone = true; const result = sink.finishInternal(); $fulfillPromise(capability.$promise, result); return result; }, finishInternal() { if (!hasString && !hasBuffer) { return ""; } if (hasString && !hasBuffer) { return rope; } if (hasBuffer && !hasString) { return new globalThis.TextDecoder().decode($Bun.concatArrayBuffers(array)); } // worst case: mixed content var arrayBufferSink = new $Bun.ArrayBufferSink(); arrayBufferSink.start({ highWaterMark: estimatedLength, asUint8Array: true, }); for (let item of array) { arrayBufferSink.write(item); } array.length = 0; if (rope.length > 0) { arrayBufferSink.write(rope); rope = ""; } // TODO: use builtin return new globalThis.TextDecoder().decode(arrayBufferSink.end()); }, close() { try { if (!calledDone) { calledDone = true; sink.fulfill(); } } catch (e) {} }, }; return [sink, capability]; } export function initializeTextStream(underlyingSource, highWaterMark) { var [sink, closingPromise] = $createTextStream(highWaterMark); var controller = { $underlyingSource: underlyingSource, $pull: $onPullDirectStream, $controlledReadableStream: this, $sink: sink, close: $onCloseDirectStream, write: sink.write, error: $handleDirectStreamError, end: $onCloseDirectStream, $close: $onCloseDirectStream, flush: $onFlushDirectStream, _pendingRead: undefined, _deferClose: 0, _deferFlush: 0, _deferCloseReason: undefined, _handleError: undefined, }; $putByIdDirectPrivate(this, "readableStreamController", controller); $putByIdDirectPrivate(this, "underlyingSource", undefined); $putByIdDirectPrivate(this, "start", undefined); return closingPromise; } export function initializeArrayStream(underlyingSource, highWaterMark) { var array = []; var closingPromise = $newPromiseCapability(Promise); var calledDone = false; function fulfill() { calledDone = true; closingPromise.$resolve.$call(undefined, array); return array; } var sink = { start() {}, write(chunk) { $arrayPush(array, chunk); return chunk.byteLength || chunk.length; }, flush() { return 0; }, end() { if (calledDone) { return []; } return fulfill(); }, close() { if (!calledDone) { fulfill(); } }, }; var controller = { $underlyingSource: underlyingSource, $pull: $onPullDirectStream, $controlledReadableStream: this, $sink: sink, close: $onCloseDirectStream, write: sink.write, error: $handleDirectStreamError, end: $onCloseDirectStream, $close: $onCloseDirectStream, flush: $onFlushDirectStream, _pendingRead: undefined, _deferClose: 0, _deferFlush: 0, _deferCloseReason: undefined, _handleError: undefined, }; $putByIdDirectPrivate(this, "readableStreamController", controller); $putByIdDirectPrivate(this, "underlyingSource", undefined); $putByIdDirectPrivate(this, "start", undefined); return closingPromise; } export function initializeArrayBufferStream(underlyingSource, highWaterMark) { // This is the fallback implementation for direct streams // When we don't know what the destination type is // We assume it is a Uint8Array. var opts = highWaterMark && typeof highWaterMark === "number" ? { highWaterMark, stream: true, asUint8Array: true } : { stream: true, asUint8Array: true }; var sink = new $Bun.ArrayBufferSink(); sink.start(opts); var controller = { $underlyingSource: underlyingSource, $pull: $onPullDirectStream, $controlledReadableStream: this, $sink: sink, close: $onCloseDirectStream, write: sink.write.bind(sink), error: $handleDirectStreamError, end: $onCloseDirectStream, $close: $onCloseDirectStream, flush: $onFlushDirectStream, _pendingRead: undefined, _deferClose: 0, _deferFlush: 0, _deferCloseReason: undefined, _handleError: undefined, }; $putByIdDirectPrivate(this, "readableStreamController", controller); $putByIdDirectPrivate(this, "underlyingSource", undefined); $putByIdDirectPrivate(this, "start", undefined); } export function readableStreamError(stream, error) { $assert($isReadableStream(stream)); $assert($getByIdDirectPrivate(stream, "state") === $streamReadable); $putByIdDirectPrivate(stream, "state", $streamErrored); $putByIdDirectPrivate(stream, "storedError", error); const reader = $getByIdDirectPrivate(stream, "reader"); if (!reader) return; if ($isReadableStreamDefaultReader(reader)) { const requests = $getByIdDirectPrivate(reader, "readRequests"); $putByIdDirectPrivate(reader, "readRequests", $createFIFO()); for (var request = requests.shift(); request; request = requests.shift()) $rejectPromise(request, error); } else { $assert($isReadableStreamBYOBReader(reader)); const requests = $getByIdDirectPrivate(reader, "readIntoRequests"); $putByIdDirectPrivate(reader, "readIntoRequests", $createFIFO()); for (var request = requests.shift(); request; request = requests.shift()) $rejectPromise(request, error); } $getByIdDirectPrivate(reader, "closedPromiseCapability").$reject.$call(undefined, error); const promise = $getByIdDirectPrivate(reader, "closedPromiseCapability").$promise; $markPromiseAsHandled(promise); } export function readableStreamDefaultControllerShouldCallPull(controller) { const stream = $getByIdDirectPrivate(controller, "controlledReadableStream"); if (!$readableStreamDefaultControllerCanCloseOrEnqueue(controller)) return false; if (!($getByIdDirectPrivate(controller, "started") === 1)) return false; if ( (!$isReadableStreamLocked(stream) || !$getByIdDirectPrivate($getByIdDirectPrivate(stream, "reader"), "readRequests")?.isNotEmpty()) && $readableStreamDefaultControllerGetDesiredSize(controller) <= 0 ) return false; const desiredSize = $readableStreamDefaultControllerGetDesiredSize(controller); $assert(desiredSize !== null); return desiredSize > 0; } export function readableStreamDefaultControllerCallPullIfNeeded(controller) { // FIXME: use $readableStreamDefaultControllerShouldCallPull const stream = $getByIdDirectPrivate(controller, "controlledReadableStream"); if (!$readableStreamDefaultControllerCanCloseOrEnqueue(controller)) return; if (!($getByIdDirectPrivate(controller, "started") === 1)) return; if ( (!$isReadableStreamLocked(stream) || !$getByIdDirectPrivate($getByIdDirectPrivate(stream, "reader"), "readRequests")?.isNotEmpty()) && $readableStreamDefaultControllerGetDesiredSize(controller) <= 0 ) return; if ($getByIdDirectPrivate(controller, "pulling")) { $putByIdDirectPrivate(controller, "pullAgain", true); return; } $assert(!$getByIdDirectPrivate(controller, "pullAgain")); $putByIdDirectPrivate(controller, "pulling", true); $getByIdDirectPrivate(controller, "pullAlgorithm") .$call(undefined) .$then( function () { $putByIdDirectPrivate(controller, "pulling", false); if ($getByIdDirectPrivate(controller, "pullAgain")) { $putByIdDirectPrivate(controller, "pullAgain", false); $readableStreamDefaultControllerCallPullIfNeeded(controller); } }, function (error) { $readableStreamDefaultControllerError(controller, error); }, ); } export function isReadableStreamLocked(stream) { $assert($isReadableStream(stream)); return !!$getByIdDirectPrivate(stream, "reader"); } export function readableStreamDefaultControllerGetDesiredSize(controller) { const stream = $getByIdDirectPrivate(controller, "controlledReadableStream"); const state = $getByIdDirectPrivate(stream, "state"); if (state === $streamErrored) return null; if (state === $streamClosed) return 0; return $getByIdDirectPrivate(controller, "strategy").highWaterMark - $getByIdDirectPrivate(controller, "queue").size; } export function readableStreamReaderGenericCancel(reader, reason) { const stream = $getByIdDirectPrivate(reader, "ownerReadableStream"); $assert(!!stream); return $readableStreamCancel(stream, reason); } export function readableStreamCancel(stream, reason) { $putByIdDirectPrivate(stream, "disturbed", true); const state = $getByIdDirectPrivate(stream, "state"); if (state === $streamClosed) return Promise.$resolve(); if (state === $streamErrored) return Promise.$reject($getByIdDirectPrivate(stream, "storedError")); $readableStreamClose(stream); var controller = $getByIdDirectPrivate(stream, "readableStreamController"); var cancel = controller.$cancel; if (cancel) { return cancel(controller, reason).$then(function () {}); } var close = controller.close; if (close) { return Promise.$resolve(controller.close(reason)); } $throwTypeError("ReadableStreamController has no cancel or close method"); } export function readableStreamDefaultControllerCancel(controller, reason) { $putByIdDirectPrivate(controller, "queue", $newQueue()); return $getByIdDirectPrivate(controller, "cancelAlgorithm").$call(undefined, reason); } export function readableStreamDefaultControllerPull(controller) { var queue = $getByIdDirectPrivate(controller, "queue"); if (queue.content.isNotEmpty()) { const chunk = $dequeueValue(queue); if ($getByIdDirectPrivate(controller, "closeRequested") && queue.content.isEmpty()) $readableStreamClose($getByIdDirectPrivate(controller, "controlledReadableStream")); else $readableStreamDefaultControllerCallPullIfNeeded(controller); return $createFulfilledPromise({ value: chunk, done: false }); } const pendingPromise = $readableStreamAddReadRequest($getByIdDirectPrivate(controller, "controlledReadableStream")); $readableStreamDefaultControllerCallPullIfNeeded(controller); return pendingPromise; } export function readableStreamDefaultControllerClose(controller) { $assert($readableStreamDefaultControllerCanCloseOrEnqueue(controller)); $putByIdDirectPrivate(controller, "closeRequested", true); if ($getByIdDirectPrivate(controller, "queue")?.content?.isEmpty()) $readableStreamClose($getByIdDirectPrivate(controller, "controlledReadableStream")); } export function readableStreamClose(stream) { $assert($getByIdDirectPrivate(stream, "state") === $streamReadable); $putByIdDirectPrivate(stream, "state", $streamClosed); if (!$getByIdDirectPrivate(stream, "reader")) return; if ($isReadableStreamDefaultReader($getByIdDirectPrivate(stream, "reader"))) { const requests = $getByIdDirectPrivate($getByIdDirectPrivate(stream, "reader"), "readRequests"); if (requests.isNotEmpty()) { $putByIdDirectPrivate($getByIdDirectPrivate(stream, "reader"), "readRequests", $createFIFO()); for (var request = requests.shift(); request; request = requests.shift()) $fulfillPromise(request, { value: undefined, done: true }); } } $getByIdDirectPrivate($getByIdDirectPrivate(stream, "reader"), "closedPromiseCapability").$resolve.$call(); } export function readableStreamFulfillReadRequest(stream, chunk, done) { const readRequest = $getByIdDirectPrivate($getByIdDirectPrivate(stream, "reader"), "readRequests").shift(); $fulfillPromise(readRequest, { value: chunk, done: done }); } export function readableStreamDefaultControllerEnqueue(controller, chunk) { const stream = $getByIdDirectPrivate(controller, "controlledReadableStream"); // this is checked by callers $assert($readableStreamDefaultControllerCanCloseOrEnqueue(controller)); if ( $isReadableStreamLocked(stream) && $getByIdDirectPrivate($getByIdDirectPrivate(stream, "reader"), "readRequests")?.isNotEmpty() ) { $readableStreamFulfillReadRequest(stream, chunk, false); $readableStreamDefaultControllerCallPullIfNeeded(controller); return; } try { let chunkSize = 1; if ($getByIdDirectPrivate(controller, "strategy").size !== undefined) chunkSize = $getByIdDirectPrivate(controller, "strategy").size(chunk); $enqueueValueWithSize($getByIdDirectPrivate(controller, "queue"), chunk, chunkSize); } catch (error) { $readableStreamDefaultControllerError(controller, error); throw error; } $readableStreamDefaultControllerCallPullIfNeeded(controller); } export function readableStreamDefaultReaderRead(reader) { const stream = $getByIdDirectPrivate(reader, "ownerReadableStream"); $assert(!!stream); const state = $getByIdDirectPrivate(stream, "state"); $putByIdDirectPrivate(stream, "disturbed", true); if (state === $streamClosed) return $createFulfilledPromise({ value: undefined, done: true }); if (state === $streamErrored) return Promise.$reject($getByIdDirectPrivate(stream, "storedError")); $assert(state === $streamReadable); return $getByIdDirectPrivate(stream, "readableStreamController").$pull( $getByIdDirectPrivate(stream, "readableStreamController"), ); } export function readableStreamAddReadRequest(stream) { $assert($isReadableStreamDefaultReader($getByIdDirectPrivate(stream, "reader"))); $assert($getByIdDirectPrivate(stream, "state") == $streamReadable); const readRequest = $newPromise(); $getByIdDirectPrivate($getByIdDirectPrivate(stream, "reader"), "readRequests").push(readRequest); return readRequest; } export function isReadableStreamDisturbed(stream) { $assert($isReadableStream(stream)); return $getByIdDirectPrivate(stream, "disturbed"); } export function readableStreamReaderGenericRelease(reader) { $assert(!!$getByIdDirectPrivate(reader, "ownerReadableStream")); $assert($getByIdDirectPrivate($getByIdDirectPrivate(reader, "ownerReadableStream"), "reader") === reader); if ($getByIdDirectPrivate($getByIdDirectPrivate(reader, "ownerReadableStream"), "state") === $streamReadable) $getByIdDirectPrivate(reader, "closedPromiseCapability").$reject.$call( undefined, $makeTypeError("releasing lock of reader whose stream is still in readable state"), ); else $putByIdDirectPrivate(reader, "closedPromiseCapability", { $promise: $newHandledRejectedPromise($makeTypeError("reader released lock")), }); const promise = $getByIdDirectPrivate(reader, "closedPromiseCapability").$promise; $markPromiseAsHandled(promise); $putByIdDirectPrivate($getByIdDirectPrivate(reader, "ownerReadableStream"), "reader", undefined); $putByIdDirectPrivate(reader, "ownerReadableStream", undefined); } export function readableStreamDefaultControllerCanCloseOrEnqueue(controller) { return ( !$getByIdDirectPrivate(controller, "closeRequested") && $getByIdDirectPrivate($getByIdDirectPrivate(controller, "controlledReadableStream"), "state") === $streamReadable ); } export function lazyLoadStream(stream, autoAllocateChunkSize) { var nativeType = $getByIdDirectPrivate(stream, "bunNativeType"); var nativePtr = $getByIdDirectPrivate(stream, "bunNativePtr"); var Prototype = $lazyStreamPrototypeMap.$get(nativeType); if (Prototype === undefined) { var [pull, start, cancel, setClose, deinit, setRefOrUnref, drain] = $lazyLoad(nativeType); var closer = [false]; var handleResult; function handleNativeReadableStreamPromiseResult(val) { var { c, v } = this; this.c = undefined; this.v = undefined; handleResult(val, c, v); } function callClose(controller) { try { controller.close(); } catch (e) { globalThis.reportError(e); } } handleResult = function handleResult(result, controller, view) { if (result && $isPromise(result)) { return result.then( handleNativeReadableStreamPromiseResult.bind({ c: controller, v: view, }), err => controller.error(err), ); } else if (typeof result === "number") { if (view && view.byteLength === result && view.buffer === controller.byobRequest?.view?.buffer) { controller.byobRequest.respondWithNewView(view); } else { controller.byobRequest.respond(result); } } else if (result.constructor === $Uint8Array) { controller.enqueue(result); } if (closer[0] || result === false) { $enqueueJob(callClose, controller); closer[0] = false; } }; function createResult(tag, controller, view, closer) { closer[0] = false; var result; try { result = pull(tag, view, closer); } catch (err) { return controller.error(err); } return handleResult(result, controller, view); } const registry = deinit ? new FinalizationRegistry(deinit) : null; Prototype = class NativeReadableStreamSource { constructor(tag, autoAllocateChunkSize, drainValue) { this.#tag = tag; this.#cancellationToken = {}; this.pull = this.#pull.bind(this); this.cancel = this.#cancel.bind(this); this.autoAllocateChunkSize = autoAllocateChunkSize; if (drainValue !== undefined) { this.start = controller => { controller.enqueue(drainValue); }; } if (registry) { registry.register(this, tag, this.#cancellationToken); } } #cancellationToken; pull; cancel; start; #tag; type = "bytes"; autoAllocateChunkSize = 0; static startSync = start; #pull(controller) { var tag = this.#tag; if (!tag) { controller.close(); return; } createResult(tag, controller, controller.byobRequest.view, closer); } #cancel(reason) { var tag = this.#tag; registry && registry.unregister(this.#cancellationToken); setRefOrUnref && setRefOrUnref(tag, false); cancel(tag, reason); } static deinit = deinit; static drain = drain; }; $lazyStreamPrototypeMap.$set(nativeType, Prototype); } const chunkSize = Prototype.startSync(nativePtr, autoAllocateChunkSize); var drainValue; const { drain: drainFn, deinit: deinitFn } = Prototype; if (drainFn) { drainValue = drainFn(nativePtr); } // empty file, no need for native back-and-forth on this if (chunkSize === 0) { deinit && nativePtr && $enqueueJob(deinit, nativePtr); if ((drainValue?.byteLength ?? 0) > 0) { return { start(controller) { controller.enqueue(drainValue); controller.close(); }, type: "bytes", }; } return { start(controller) { controller.close(); }, type: "bytes", }; } return new Prototype(nativePtr, chunkSize, drainValue); } export function readableStreamIntoArray(stream) { var reader = stream.getReader(); var manyResult = reader.readMany(); async function processManyResult(result) { if (result.done) { return []; } var chunks = result.value || []; while (true) { var thisResult = await reader.read(); if (thisResult.done) { break; } chunks = chunks.concat(thisResult.value); } return chunks; } if (manyResult && $isPromise(manyResult)) { return manyResult.$then(processManyResult); } return processManyResult(manyResult); } export function readableStreamIntoText(stream) { const [textStream, closer] = $createTextStream($getByIdDirectPrivate(stream, "highWaterMark")); const prom = $readStreamIntoSink(stream, textStream, false); if (prom && $isPromise(prom)) { return Promise.$resolve(prom).$then(closer.$promise); } return closer.$promise; } export function readableStreamToArrayBufferDirect(stream, underlyingSource) { var sink = new $Bun.ArrayBufferSink(); $putByIdDirectPrivate(stream, "underlyingSource", undefined); var highWaterMark = $getByIdDirectPrivate(stream, "highWaterMark"); sink.start(highWaterMark ? { highWaterMark } : {}); var capability = $newPromiseCapability(Promise); var ended = false; var pull = underlyingSource.pull; var close = underlyingSource.close; var controller = { start() {}, close(reason) { if (!ended) { ended = true; if (close) { close(); } $fulfillPromise(capability.$promise, sink.end()); } }, end() { if (!ended) { ended = true; if (close) { close(); } $fulfillPromise(capability.$promise, sink.end()); } }, flush() { return 0; }, write: sink.write.bind(sink), }; var didError = false; try { const firstPull = pull(controller); if (firstPull && $isObject(firstPull) && $isPromise(firstPull)) { return (async function (controller, promise, pull) { while (!ended) { await pull(controller); } return await promise; })(controller, promise, pull); } return capability.$promise; } catch (e) { didError = true; $readableStreamError(stream, e); return Promise.$reject(e); } finally { if (!didError && stream) $readableStreamClose(stream); controller = close = sink = pull = stream = undefined; } } export async function readableStreamToTextDirect(stream, underlyingSource) { const capability = $initializeTextStream.$call(stream, underlyingSource, undefined); var reader = stream.getReader(); while ($getByIdDirectPrivate(stream, "state") === $streamReadable) { var thisResult = await reader.read(); if (thisResult.done) { break; } } try { reader.releaseLock(); } catch (e) {} reader = undefined; stream = undefined; return capability.$promise; } export async function readableStreamToArrayDirect(stream, underlyingSource) { const capability = $initializeArrayStream.$call(stream, underlyingSource, undefined); underlyingSource = undefined; var reader = stream.getReader(); try { while ($getByIdDirectPrivate(stream, "state") === $streamReadable) { var thisResult = await reader.read(); if (thisResult.done) { break; } } try { reader.releaseLock(); } catch (e) {} reader = undefined; return Promise.$resolve(capability.$promise); } catch (e) { throw e; } finally { stream = undefined; reader = undefined; } } export function readableStreamDefineLazyIterators(prototype) { var asyncIterator = globalThis.Symbol.asyncIterator; var ReadableStreamAsyncIterator = async function* ReadableStreamAsyncIterator(stream, preventCancel) { var reader = stream.getReader(); var deferredError; try { while (true) { var done, value; const firstResult = reader.readMany(); if ($isPromise(firstResult)) { ({ done, value } = await firstResult); } else { ({ done, value } = firstResult); } if (done) { return; } yield* value; } } catch (e) { deferredError = e; } finally { reader.releaseLock(); if (!preventCancel) { stream.cancel(deferredError); } if (deferredError) { throw deferredError; } } }; var createAsyncIterator = function asyncIterator() { return ReadableStreamAsyncIterator(this, false); }; var createValues = function values({ preventCancel = false } = { preventCancel: false }) { return ReadableStreamAsyncIterator(this, preventCancel); }; $Object.$defineProperty(prototype, asyncIterator, { value: createAsyncIterator }); $Object.$defineProperty(prototype, "values", { value: createValues }); return prototype; }