aboutsummaryrefslogtreecommitdiff
path: root/src/js/builtins/ReadableStreamInternals.ts
diff options
context:
space:
mode:
Diffstat (limited to 'src/js/builtins/ReadableStreamInternals.ts')
-rw-r--r--src/js/builtins/ReadableStreamInternals.ts1799
1 files changed, 1799 insertions, 0 deletions
diff --git a/src/js/builtins/ReadableStreamInternals.ts b/src/js/builtins/ReadableStreamInternals.ts
new file mode 100644
index 000000000..0c4e816f4
--- /dev/null
+++ b/src/js/builtins/ReadableStreamInternals.ts
@@ -0,0 +1,1799 @@
+/*
+ * Copyright (C) 2015 Canon Inc. All rights reserved.
+ * Copyright (C) 2015 Igalia.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ * 1. Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY APPLE INC. ``AS IS'' AND ANY
+ * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+ * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL APPLE INC. OR
+ * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
+ * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
+ * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
+ * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY
+ * OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+// @internal
+
+export function readableStreamReaderGenericInitialize(reader, stream) {
+ $putByIdDirectPrivate(reader, "ownerReadableStream", stream);
+ $putByIdDirectPrivate(stream, "reader", reader);
+ if ($getByIdDirectPrivate(stream, "state") === $streamReadable)
+ $putByIdDirectPrivate(reader, "closedPromiseCapability", $newPromiseCapability(Promise));
+ else if ($getByIdDirectPrivate(stream, "state") === $streamClosed)
+ $putByIdDirectPrivate(reader, "closedPromiseCapability", {
+ $promise: Promise.$resolve(),
+ });
+ else {
+ $assert($getByIdDirectPrivate(stream, "state") === $streamErrored);
+ $putByIdDirectPrivate(reader, "closedPromiseCapability", {
+ $promise: $newHandledRejectedPromise($getByIdDirectPrivate(stream, "storedError")),
+ });
+ }
+}
+
+export function privateInitializeReadableStreamDefaultController(this, stream, underlyingSource, size, highWaterMark) {
+ if (!$isReadableStream(stream)) throw new TypeError("ReadableStreamDefaultController needs a ReadableStream");
+
+ // readableStreamController is initialized with null value.
+ if ($getByIdDirectPrivate(stream, "readableStreamController") !== null)
+ throw new TypeError("ReadableStream already has a controller");
+
+ $putByIdDirectPrivate(this, "controlledReadableStream", stream);
+ $putByIdDirectPrivate(this, "underlyingSource", underlyingSource);
+ $putByIdDirectPrivate(this, "queue", $newQueue());
+ $putByIdDirectPrivate(this, "started", -1);
+ $putByIdDirectPrivate(this, "closeRequested", false);
+ $putByIdDirectPrivate(this, "pullAgain", false);
+ $putByIdDirectPrivate(this, "pulling", false);
+ $putByIdDirectPrivate(this, "strategy", $validateAndNormalizeQueuingStrategy(size, highWaterMark));
+
+ return this;
+}
+
+export function readableStreamDefaultControllerError(controller, error) {
+ const stream = $getByIdDirectPrivate(controller, "controlledReadableStream");
+ if ($getByIdDirectPrivate(stream, "state") !== $streamReadable) return;
+ $putByIdDirectPrivate(controller, "queue", $newQueue());
+
+ $readableStreamError(stream, error);
+}
+
+export function readableStreamPipeTo(stream, sink) {
+ $assert($isReadableStream(stream));
+
+ const reader = new ReadableStreamDefaultReader(stream);
+
+ $getByIdDirectPrivate(reader, "closedPromiseCapability").$promise.$then(
+ () => {},
+ e => {
+ sink.error(e);
+ },
+ );
+
+ function doPipe() {
+ $readableStreamDefaultReaderRead(reader).$then(
+ function (result) {
+ if (result.done) {
+ sink.close();
+ return;
+ }
+ try {
+ sink.enqueue(result.value);
+ } catch (e) {
+ sink.error("ReadableStream chunk enqueueing in the sink failed");
+ return;
+ }
+ doPipe();
+ },
+ function (e) {
+ sink.error(e);
+ },
+ );
+ }
+ doPipe();
+}
+
+export function acquireReadableStreamDefaultReader(stream) {
+ var start = $getByIdDirectPrivate(stream, "start");
+ if (start) {
+ start.$call(stream);
+ }
+
+ return new ReadableStreamDefaultReader(stream);
+}
+
+// https://streams.spec.whatwg.org/#set-up-readable-stream-default-controller, starting from step 6.
+// The other part is implemented in privateInitializeReadableStreamDefaultController.
+export function setupReadableStreamDefaultController(
+ stream,
+ underlyingSource,
+ size,
+ highWaterMark,
+ startMethod,
+ pullMethod,
+ cancelMethod,
+) {
+ const controller = new ReadableStreamDefaultController(
+ stream,
+ underlyingSource,
+ size,
+ highWaterMark,
+ $isReadableStream,
+ );
+
+ const pullAlgorithm = () => $promiseInvokeOrNoopMethod(underlyingSource, pullMethod, [controller]);
+ const cancelAlgorithm = reason => $promiseInvokeOrNoopMethod(underlyingSource, cancelMethod, [reason]);
+
+ $putByIdDirectPrivate(controller, "pullAlgorithm", pullAlgorithm);
+ $putByIdDirectPrivate(controller, "cancelAlgorithm", cancelAlgorithm);
+ $putByIdDirectPrivate(controller, "pull", $readableStreamDefaultControllerPull);
+ $putByIdDirectPrivate(controller, "cancel", $readableStreamDefaultControllerCancel);
+ $putByIdDirectPrivate(stream, "readableStreamController", controller);
+
+ $readableStreamDefaultControllerStart(controller);
+}
+
+export function createReadableStreamController(stream, underlyingSource, strategy) {
+ const type = underlyingSource.type;
+ const typeString = $toString(type);
+
+ if (typeString === "bytes") {
+ // if (!$readableByteStreamAPIEnabled())
+ // $throwTypeError("ReadableByteStreamController is not implemented");
+
+ if (strategy.highWaterMark === undefined) strategy.highWaterMark = 0;
+ if (strategy.size !== undefined) $throwRangeError("Strategy for a ReadableByteStreamController cannot have a size");
+
+ $putByIdDirectPrivate(
+ stream,
+ "readableStreamController",
+ new ReadableByteStreamController(stream, underlyingSource, strategy.highWaterMark, $isReadableStream),
+ );
+ } else if (typeString === "direct") {
+ var highWaterMark = strategy?.highWaterMark;
+ $initializeArrayBufferStream.$call(stream, underlyingSource, highWaterMark);
+ } else if (type === undefined) {
+ if (strategy.highWaterMark === undefined) strategy.highWaterMark = 1;
+
+ $setupReadableStreamDefaultController(
+ stream,
+ underlyingSource,
+ strategy.size,
+ strategy.highWaterMark,
+ underlyingSource.start,
+ underlyingSource.pull,
+ underlyingSource.cancel,
+ );
+ } else throw new RangeError("Invalid type for underlying source");
+}
+
+export function readableStreamDefaultControllerStart(controller) {
+ if ($getByIdDirectPrivate(controller, "started") !== -1) return;
+
+ const underlyingSource = $getByIdDirectPrivate(controller, "underlyingSource");
+ const startMethod = underlyingSource.start;
+ $putByIdDirectPrivate(controller, "started", 0);
+
+ $promiseInvokeOrNoopMethodNoCatch(underlyingSource, startMethod, [controller]).$then(
+ () => {
+ $putByIdDirectPrivate(controller, "started", 1);
+ $assert(!$getByIdDirectPrivate(controller, "pulling"));
+ $assert(!$getByIdDirectPrivate(controller, "pullAgain"));
+ $readableStreamDefaultControllerCallPullIfNeeded(controller);
+ },
+ error => {
+ $readableStreamDefaultControllerError(controller, error);
+ },
+ );
+}
+
+// FIXME: Replace readableStreamPipeTo by below function.
+// This method implements the latest https://streams.spec.whatwg.org/#readable-stream-pipe-to.
+export function readableStreamPipeToWritableStream(
+ source,
+ destination,
+ preventClose,
+ preventAbort,
+ preventCancel,
+ signal,
+) {
+ // const isDirectStream = !!$getByIdDirectPrivate(source, "start");
+
+ $assert($isReadableStream(source));
+ $assert($isWritableStream(destination));
+ $assert(!$isReadableStreamLocked(source));
+ $assert(!$isWritableStreamLocked(destination));
+ $assert(signal === undefined || $isAbortSignal(signal));
+
+ if ($getByIdDirectPrivate(source, "underlyingByteSource") !== undefined)
+ return Promise.$reject("Piping to a readable bytestream is not supported");
+
+ let pipeState: any = {
+ source: source,
+ destination: destination,
+ preventAbort: preventAbort,
+ preventCancel: preventCancel,
+ preventClose: preventClose,
+ signal: signal,
+ };
+
+ pipeState.reader = $acquireReadableStreamDefaultReader(source);
+ pipeState.writer = $acquireWritableStreamDefaultWriter(destination);
+
+ $putByIdDirectPrivate(source, "disturbed", true);
+
+ pipeState.finalized = false;
+ pipeState.shuttingDown = false;
+ pipeState.promiseCapability = $newPromiseCapability(Promise);
+ pipeState.pendingReadPromiseCapability = $newPromiseCapability(Promise);
+ pipeState.pendingReadPromiseCapability.$resolve.$call();
+ pipeState.pendingWritePromise = Promise.$resolve();
+
+ if (signal !== undefined) {
+ const algorithm = reason => {
+ if (pipeState.finalized) return;
+
+ $pipeToShutdownWithAction(
+ pipeState,
+ () => {
+ const shouldAbortDestination =
+ !pipeState.preventAbort && $getByIdDirectPrivate(pipeState.destination, "state") === "writable";
+ const promiseDestination = shouldAbortDestination
+ ? $writableStreamAbort(pipeState.destination, reason)
+ : Promise.$resolve();
+
+ const shouldAbortSource =
+ !pipeState.preventCancel && $getByIdDirectPrivate(pipeState.source, "state") === $streamReadable;
+ const promiseSource = shouldAbortSource
+ ? $readableStreamCancel(pipeState.source, reason)
+ : Promise.$resolve();
+
+ let promiseCapability = $newPromiseCapability(Promise);
+ let shouldWait = true;
+ let handleResolvedPromise = () => {
+ if (shouldWait) {
+ shouldWait = false;
+ return;
+ }
+ promiseCapability.$resolve.$call();
+ };
+ let handleRejectedPromise = e => {
+ promiseCapability.$reject.$call(undefined, e);
+ };
+ promiseDestination.$then(handleResolvedPromise, handleRejectedPromise);
+ promiseSource.$then(handleResolvedPromise, handleRejectedPromise);
+ return promiseCapability.$promise;
+ },
+ reason,
+ );
+ };
+ if ($whenSignalAborted(signal, algorithm)) return pipeState.promiseCapability.$promise;
+ }
+
+ $pipeToErrorsMustBePropagatedForward(pipeState);
+ $pipeToErrorsMustBePropagatedBackward(pipeState);
+ $pipeToClosingMustBePropagatedForward(pipeState);
+ $pipeToClosingMustBePropagatedBackward(pipeState);
+
+ $pipeToLoop(pipeState);
+
+ return pipeState.promiseCapability.$promise;
+}
+
+export function pipeToLoop(pipeState) {
+ if (pipeState.shuttingDown) return;
+
+ $pipeToDoReadWrite(pipeState).$then(result => {
+ if (result) $pipeToLoop(pipeState);
+ });
+}
+
+export function pipeToDoReadWrite(pipeState) {
+ $assert(!pipeState.shuttingDown);
+
+ pipeState.pendingReadPromiseCapability = $newPromiseCapability(Promise);
+ $getByIdDirectPrivate(pipeState.writer, "readyPromise").$promise.$then(
+ () => {
+ if (pipeState.shuttingDown) {
+ pipeState.pendingReadPromiseCapability.$resolve.$call(undefined, false);
+ return;
+ }
+
+ $readableStreamDefaultReaderRead(pipeState.reader).$then(
+ result => {
+ const canWrite = !result.done && $getByIdDirectPrivate(pipeState.writer, "stream") !== undefined;
+ pipeState.pendingReadPromiseCapability.$resolve.$call(undefined, canWrite);
+ if (!canWrite) return;
+
+ pipeState.pendingWritePromise = $writableStreamDefaultWriterWrite(pipeState.writer, result.value);
+ },
+ e => {
+ pipeState.pendingReadPromiseCapability.$resolve.$call(undefined, false);
+ },
+ );
+ },
+ e => {
+ pipeState.pendingReadPromiseCapability.$resolve.$call(undefined, false);
+ },
+ );
+ return pipeState.pendingReadPromiseCapability.$promise;
+}
+
+export function pipeToErrorsMustBePropagatedForward(pipeState) {
+ const action = () => {
+ pipeState.pendingReadPromiseCapability.$resolve.$call(undefined, false);
+ const error = $getByIdDirectPrivate(pipeState.source, "storedError");
+ if (!pipeState.preventAbort) {
+ $pipeToShutdownWithAction(pipeState, () => $writableStreamAbort(pipeState.destination, error), error);
+ return;
+ }
+ $pipeToShutdown(pipeState, error);
+ };
+
+ if ($getByIdDirectPrivate(pipeState.source, "state") === $streamErrored) {
+ action();
+ return;
+ }
+
+ $getByIdDirectPrivate(pipeState.reader, "closedPromiseCapability").$promise.$then(undefined, action);
+}
+
+export function pipeToErrorsMustBePropagatedBackward(pipeState) {
+ const action = () => {
+ const error = $getByIdDirectPrivate(pipeState.destination, "storedError");
+ if (!pipeState.preventCancel) {
+ $pipeToShutdownWithAction(pipeState, () => $readableStreamCancel(pipeState.source, error), error);
+ return;
+ }
+ $pipeToShutdown(pipeState, error);
+ };
+ if ($getByIdDirectPrivate(pipeState.destination, "state") === "errored") {
+ action();
+ return;
+ }
+ $getByIdDirectPrivate(pipeState.writer, "closedPromise").$promise.$then(undefined, action);
+}
+
+export function pipeToClosingMustBePropagatedForward(pipeState) {
+ const action = () => {
+ pipeState.pendingReadPromiseCapability.$resolve.$call(undefined, false);
+ // const error = $getByIdDirectPrivate(pipeState.source, "storedError");
+ if (!pipeState.preventClose) {
+ $pipeToShutdownWithAction(pipeState, () =>
+ $writableStreamDefaultWriterCloseWithErrorPropagation(pipeState.writer),
+ );
+ return;
+ }
+ $pipeToShutdown(pipeState);
+ };
+ if ($getByIdDirectPrivate(pipeState.source, "state") === $streamClosed) {
+ action();
+ return;
+ }
+ $getByIdDirectPrivate(pipeState.reader, "closedPromiseCapability").$promise.$then(action, undefined);
+}
+
+export function pipeToClosingMustBePropagatedBackward(pipeState) {
+ if (
+ !$writableStreamCloseQueuedOrInFlight(pipeState.destination) &&
+ $getByIdDirectPrivate(pipeState.destination, "state") !== "closed"
+ )
+ return;
+
+ // $assert no chunks have been read/written
+
+ const error = new TypeError("closing is propagated backward");
+ if (!pipeState.preventCancel) {
+ $pipeToShutdownWithAction(pipeState, () => $readableStreamCancel(pipeState.source, error), error);
+ return;
+ }
+ $pipeToShutdown(pipeState, error);
+}
+
+export function pipeToShutdownWithAction(pipeState, action) {
+ if (pipeState.shuttingDown) return;
+
+ pipeState.shuttingDown = true;
+
+ const hasError = arguments.length > 2;
+ const error = arguments[2];
+ const finalize = () => {
+ const promise = action();
+ promise.$then(
+ () => {
+ if (hasError) $pipeToFinalize(pipeState, error);
+ else $pipeToFinalize(pipeState);
+ },
+ e => {
+ $pipeToFinalize(pipeState, e);
+ },
+ );
+ };
+
+ if (
+ $getByIdDirectPrivate(pipeState.destination, "state") === "writable" &&
+ !$writableStreamCloseQueuedOrInFlight(pipeState.destination)
+ ) {
+ pipeState.pendingReadPromiseCapability.$promise.$then(
+ () => {
+ pipeState.pendingWritePromise.$then(finalize, finalize);
+ },
+ e => $pipeToFinalize(pipeState, e),
+ );
+ return;
+ }
+
+ finalize();
+}
+
+export function pipeToShutdown(pipeState) {
+ if (pipeState.shuttingDown) return;
+
+ pipeState.shuttingDown = true;
+
+ const hasError = arguments.length > 1;
+ const error = arguments[1];
+ const finalize = () => {
+ if (hasError) $pipeToFinalize(pipeState, error);
+ else $pipeToFinalize(pipeState);
+ };
+
+ if (
+ $getByIdDirectPrivate(pipeState.destination, "state") === "writable" &&
+ !$writableStreamCloseQueuedOrInFlight(pipeState.destination)
+ ) {
+ pipeState.pendingReadPromiseCapability.$promise.$then(
+ () => {
+ pipeState.pendingWritePromise.$then(finalize, finalize);
+ },
+ e => $pipeToFinalize(pipeState, e),
+ );
+ return;
+ }
+ finalize();
+}
+
+export function pipeToFinalize(pipeState) {
+ $writableStreamDefaultWriterRelease(pipeState.writer);
+ $readableStreamReaderGenericRelease(pipeState.reader);
+
+ // Instead of removing the abort algorithm as per spec, we make it a no-op which is equivalent.
+ pipeState.finalized = true;
+
+ if (arguments.length > 1) pipeState.promiseCapability.$reject.$call(undefined, arguments[1]);
+ else pipeState.promiseCapability.$resolve.$call();
+}
+
+export function readableStreamTee(stream, shouldClone) {
+ $assert($isReadableStream(stream));
+ $assert(typeof shouldClone === "boolean");
+
+ var start_ = $getByIdDirectPrivate(stream, "start");
+ if (start_) {
+ $putByIdDirectPrivate(stream, "start", undefined);
+ start_();
+ }
+
+ const reader = new $ReadableStreamDefaultReader(stream);
+
+ const teeState = {
+ closedOrErrored: false,
+ canceled1: false,
+ canceled2: false,
+ reason1: undefined,
+ reason2: undefined,
+ };
+
+ teeState.cancelPromiseCapability = $newPromiseCapability(Promise);
+
+ const pullFunction = $readableStreamTeePullFunction(teeState, reader, shouldClone);
+
+ const branch1Source = {};
+ $putByIdDirectPrivate(branch1Source, "pull", pullFunction);
+ $putByIdDirectPrivate(branch1Source, "cancel", $readableStreamTeeBranch1CancelFunction(teeState, stream));
+
+ const branch2Source = {};
+ $putByIdDirectPrivate(branch2Source, "pull", pullFunction);
+ $putByIdDirectPrivate(branch2Source, "cancel", $readableStreamTeeBranch2CancelFunction(teeState, stream));
+
+ const branch1 = new $ReadableStream(branch1Source);
+ const branch2 = new $ReadableStream(branch2Source);
+
+ $getByIdDirectPrivate(reader, "closedPromiseCapability").$promise.$then(undefined, function (e) {
+ if (teeState.closedOrErrored) return;
+ $readableStreamDefaultControllerError(branch1.$readableStreamController, e);
+ $readableStreamDefaultControllerError(branch2.$readableStreamController, e);
+ teeState.closedOrErrored = true;
+ if (!teeState.canceled1 || !teeState.canceled2) teeState.cancelPromiseCapability.$resolve.$call();
+ });
+
+ // Additional fields compared to the spec, as they are needed within pull/cancel functions.
+ teeState.branch1 = branch1;
+ teeState.branch2 = branch2;
+
+ return [branch1, branch2];
+}
+
+export function readableStreamTeePullFunction(teeState, reader, shouldClone) {
+ return function () {
+ Promise.prototype.$then.$call($readableStreamDefaultReaderRead(reader), function (result) {
+ $assert($isObject(result));
+ $assert(typeof result.done === "boolean");
+ if (result.done && !teeState.closedOrErrored) {
+ if (!teeState.canceled1) $readableStreamDefaultControllerClose(teeState.branch1.$readableStreamController);
+ if (!teeState.canceled2) $readableStreamDefaultControllerClose(teeState.branch2.$readableStreamController);
+ teeState.closedOrErrored = true;
+ if (!teeState.canceled1 || !teeState.canceled2) teeState.cancelPromiseCapability.$resolve.$call();
+ }
+ if (teeState.closedOrErrored) return;
+ if (!teeState.canceled1)
+ $readableStreamDefaultControllerEnqueue(teeState.branch1.$readableStreamController, result.value);
+ if (!teeState.canceled2)
+ $readableStreamDefaultControllerEnqueue(
+ teeState.branch2.$readableStreamController,
+ shouldClone ? $structuredCloneForStream(result.value) : result.value,
+ );
+ });
+ };
+}
+
+export function readableStreamTeeBranch1CancelFunction(teeState, stream) {
+ return function (r) {
+ teeState.canceled1 = true;
+ teeState.reason1 = r;
+ if (teeState.canceled2) {
+ $readableStreamCancel(stream, [teeState.reason1, teeState.reason2]).$then(
+ teeState.cancelPromiseCapability.$resolve,
+ teeState.cancelPromiseCapability.$reject,
+ );
+ }
+ return teeState.cancelPromiseCapability.$promise;
+ };
+}
+
+export function readableStreamTeeBranch2CancelFunction(teeState, stream) {
+ return function (r) {
+ teeState.canceled2 = true;
+ teeState.reason2 = r;
+ if (teeState.canceled1) {
+ $readableStreamCancel(stream, [teeState.reason1, teeState.reason2]).$then(
+ teeState.cancelPromiseCapability.$resolve,
+ teeState.cancelPromiseCapability.$reject,
+ );
+ }
+ return teeState.cancelPromiseCapability.$promise;
+ };
+}
+
+export function isReadableStream(stream) {
+ // Spec tells to return true only if stream has a readableStreamController internal slot.
+ // However, since it is a private slot, it cannot be checked using hasOwnProperty().
+ // Therefore, readableStreamController is initialized with null value.
+ return $isObject(stream) && $getByIdDirectPrivate(stream, "readableStreamController") !== undefined;
+}
+
+export function isReadableStreamDefaultReader(reader) {
+ // Spec tells to return true only if reader has a readRequests internal slot.
+ // However, since it is a private slot, it cannot be checked using hasOwnProperty().
+ // Since readRequests is initialized with an empty array, the following test is ok.
+ return $isObject(reader) && !!$getByIdDirectPrivate(reader, "readRequests");
+}
+
+export function isReadableStreamDefaultController(controller) {
+ // Spec tells to return true only if controller has an underlyingSource internal slot.
+ // However, since it is a private slot, it cannot be checked using hasOwnProperty().
+ // underlyingSource is obtained in ReadableStream constructor: if undefined, it is set
+ // to an empty object. Therefore, following test is ok.
+ return $isObject(controller) && !!$getByIdDirectPrivate(controller, "underlyingSource");
+}
+
+export function readDirectStream(stream, sink, underlyingSource) {
+ $putByIdDirectPrivate(stream, "underlyingSource", undefined);
+ $putByIdDirectPrivate(stream, "start", undefined);
+
+ function close(stream, reason) {
+ if (reason && underlyingSource?.cancel) {
+ try {
+ var prom = underlyingSource.cancel(reason);
+ $markPromiseAsHandled(prom);
+ } catch (e) {}
+
+ underlyingSource = undefined;
+ }
+
+ if (stream) {
+ $putByIdDirectPrivate(stream, "readableStreamController", undefined);
+ $putByIdDirectPrivate(stream, "reader", undefined);
+ if (reason) {
+ $putByIdDirectPrivate(stream, "state", $streamErrored);
+ $putByIdDirectPrivate(stream, "storedError", reason);
+ } else {
+ $putByIdDirectPrivate(stream, "state", $streamClosed);
+ }
+ stream = undefined;
+ }
+ }
+
+ if (!underlyingSource.pull) {
+ close();
+ return;
+ }
+
+ if (!$isCallable(underlyingSource.pull)) {
+ close();
+ $throwTypeError("pull is not a function");
+ return;
+ }
+
+ $putByIdDirectPrivate(stream, "readableStreamController", sink);
+ const highWaterMark = $getByIdDirectPrivate(stream, "highWaterMark");
+
+ sink.start({
+ highWaterMark: !highWaterMark || highWaterMark < 64 ? 64 : highWaterMark,
+ });
+
+ $startDirectStream.$call(sink, stream, underlyingSource.pull, close);
+ $putByIdDirectPrivate(stream, "reader", {});
+
+ var maybePromise = underlyingSource.pull(sink);
+ sink = undefined;
+ if (maybePromise && $isPromise(maybePromise)) {
+ return maybePromise.$then(() => {});
+ }
+}
+
+$linkTimeConstant;
+export function assignToStream(stream, sink) {
+ // The stream is either a direct stream or a "default" JS stream
+ var underlyingSource = $getByIdDirectPrivate(stream, "underlyingSource");
+
+ // we know it's a direct stream when $underlyingSource is set
+ if (underlyingSource) {
+ try {
+ return $readDirectStream(stream, sink, underlyingSource);
+ } catch (e) {
+ throw e;
+ } finally {
+ underlyingSource = undefined;
+ stream = undefined;
+ sink = undefined;
+ }
+ }
+
+ return $readStreamIntoSink(stream, sink, true);
+}
+
+export async function readStreamIntoSink(stream, sink, isNative) {
+ var didClose = false;
+ var didThrow = false;
+ try {
+ var reader = stream.getReader();
+ var many = reader.readMany();
+ if (many && $isPromise(many)) {
+ many = await many;
+ }
+ if (many.done) {
+ didClose = true;
+ return sink.end();
+ }
+ var wroteCount = many.value.length;
+ const highWaterMark = $getByIdDirectPrivate(stream, "highWaterMark");
+ if (isNative)
+ $startDirectStream.$call(sink, stream, undefined, () => !didThrow && $markPromiseAsHandled(stream.cancel()));
+
+ sink.start({ highWaterMark: highWaterMark || 0 });
+
+ for (var i = 0, values = many.value, length = many.value.length; i < length; i++) {
+ sink.write(values[i]);
+ }
+
+ var streamState = $getByIdDirectPrivate(stream, "state");
+ if (streamState === $streamClosed) {
+ didClose = true;
+ return sink.end();
+ }
+
+ while (true) {
+ var { value, done } = await reader.read();
+ if (done) {
+ didClose = true;
+ return sink.end();
+ }
+
+ sink.write(value);
+ }
+ } catch (e) {
+ didThrow = true;
+
+ try {
+ reader = undefined;
+ const prom = stream.cancel(e);
+ $markPromiseAsHandled(prom);
+ } catch (j) {}
+
+ if (sink && !didClose) {
+ didClose = true;
+ try {
+ sink.close(e);
+ } catch (j) {
+ throw new globalThis.AggregateError([e, j]);
+ }
+ }
+
+ throw e;
+ } finally {
+ if (reader) {
+ try {
+ reader.releaseLock();
+ } catch (e) {}
+ reader = undefined;
+ }
+ sink = undefined;
+ var streamState = $getByIdDirectPrivate(stream, "state");
+ if (stream) {
+ // make it easy for this to be GC'd
+ // but don't do property transitions
+ var readableStreamController = $getByIdDirectPrivate(stream, "readableStreamController");
+ if (readableStreamController) {
+ if ($getByIdDirectPrivate(readableStreamController, "underlyingSource"))
+ $putByIdDirectPrivate(readableStreamController, "underlyingSource", undefined);
+ if ($getByIdDirectPrivate(readableStreamController, "controlledReadableStream"))
+ $putByIdDirectPrivate(readableStreamController, "controlledReadableStream", undefined);
+
+ $putByIdDirectPrivate(stream, "readableStreamController", null);
+ if ($getByIdDirectPrivate(stream, "underlyingSource"))
+ $putByIdDirectPrivate(stream, "underlyingSource", undefined);
+ readableStreamController = undefined;
+ }
+
+ if (!didThrow && streamState !== $streamClosed && streamState !== $streamErrored) {
+ $readableStreamClose(stream);
+ }
+ stream = undefined;
+ }
+ }
+}
+
+export function handleDirectStreamError(e) {
+ var controller = this;
+ var sink = controller.$sink;
+ if (sink) {
+ $putByIdDirectPrivate(controller, "sink", undefined);
+ try {
+ sink.close(e);
+ } catch (f) {}
+ }
+
+ this.error = this.flush = this.write = this.close = this.end = $onReadableStreamDirectControllerClosed;
+
+ if (typeof this.$underlyingSource.close === "function") {
+ try {
+ this.$underlyingSource.close.$call(this.$underlyingSource, e);
+ } catch (e) {}
+ }
+
+ try {
+ var pend = controller._pendingRead;
+ if (pend) {
+ controller._pendingRead = undefined;
+ $rejectPromise(pend, e);
+ }
+ } catch (f) {}
+ var stream = controller.$controlledReadableStream;
+ if (stream) $readableStreamError(stream, e);
+}
+
+export function handleDirectStreamErrorReject(e) {
+ $handleDirectStreamError.$call(this, e);
+ return Promise.$reject(e);
+}
+
+export function onPullDirectStream(controller) {
+ var stream = controller.$controlledReadableStream;
+ if (!stream || $getByIdDirectPrivate(stream, "state") !== $streamReadable) return;
+
+ // pull is in progress
+ // this is a recursive call
+ // ignore it
+ if (controller._deferClose === -1) {
+ return;
+ }
+
+ controller._deferClose = -1;
+ controller._deferFlush = -1;
+ var deferClose;
+ var deferFlush;
+
+ // Direct streams allow $pull to be called multiple times, unlike the spec.
+ // Backpressure is handled by the destination, not by the underlying source.
+ // In this case, we rely on the heuristic that repeatedly draining in the same tick
+ // is bad for performance
+ // this code is only run when consuming a direct stream from JS
+ // without the HTTP server or anything else
+ try {
+ var result = controller.$underlyingSource.pull(controller);
+
+ if (result && $isPromise(result)) {
+ if (controller._handleError === undefined) {
+ controller._handleError = $handleDirectStreamErrorReject.bind(controller);
+ }
+
+ Promise.prototype.catch.$call(result, controller._handleError);
+ }
+ } catch (e) {
+ return $handleDirectStreamErrorReject.$call(controller, e);
+ } finally {
+ deferClose = controller._deferClose;
+ deferFlush = controller._deferFlush;
+ controller._deferFlush = controller._deferClose = 0;
+ }
+
+ var promiseToReturn;
+
+ if (controller._pendingRead === undefined) {
+ controller._pendingRead = promiseToReturn = $newPromise();
+ } else {
+ promiseToReturn = $readableStreamAddReadRequest(stream);
+ }
+
+ // they called close during $pull()
+ // we delay that
+ if (deferClose === 1) {
+ var reason = controller._deferCloseReason;
+ controller._deferCloseReason = undefined;
+ $onCloseDirectStream.$call(controller, reason);
+ return promiseToReturn;
+ }
+
+ // not done, but they called flush()
+ if (deferFlush === 1) {
+ $onFlushDirectStream.$call(controller);
+ }
+
+ return promiseToReturn;
+}
+
+export function noopDoneFunction() {
+ return Promise.$resolve({ value: undefined, done: true });
+}
+
+export function onReadableStreamDirectControllerClosed(reason) {
+ $throwTypeError("ReadableStreamDirectController is now closed");
+}
+
+export function onCloseDirectStream(reason) {
+ var stream = this.$controlledReadableStream;
+ if (!stream || $getByIdDirectPrivate(stream, "state") !== $streamReadable) return;
+
+ if (this._deferClose !== 0) {
+ this._deferClose = 1;
+ this._deferCloseReason = reason;
+ return;
+ }
+
+ $putByIdDirectPrivate(stream, "state", $streamClosing);
+ if (typeof this.$underlyingSource.close === "function") {
+ try {
+ this.$underlyingSource.close.$call(this.$underlyingSource, reason);
+ } catch (e) {}
+ }
+
+ var flushed;
+ try {
+ flushed = this.$sink.end();
+ $putByIdDirectPrivate(this, "sink", undefined);
+ } catch (e) {
+ if (this._pendingRead) {
+ var read = this._pendingRead;
+ this._pendingRead = undefined;
+ $rejectPromise(read, e);
+ }
+ $readableStreamError(stream, e);
+ return;
+ }
+
+ this.error = this.flush = this.write = this.close = this.end = $onReadableStreamDirectControllerClosed;
+
+ var reader = $getByIdDirectPrivate(stream, "reader");
+
+ if (reader && $isReadableStreamDefaultReader(reader)) {
+ var _pendingRead = this._pendingRead;
+ if (_pendingRead && $isPromise(_pendingRead) && flushed?.byteLength) {
+ this._pendingRead = undefined;
+ $fulfillPromise(_pendingRead, { value: flushed, done: false });
+ $readableStreamClose(stream);
+ return;
+ }
+ }
+
+ if (flushed?.byteLength) {
+ var requests = $getByIdDirectPrivate(reader, "readRequests");
+ if (requests?.isNotEmpty()) {
+ $readableStreamFulfillReadRequest(stream, flushed, false);
+ $readableStreamClose(stream);
+ return;
+ }
+
+ $putByIdDirectPrivate(stream, "state", $streamReadable);
+ this.$pull = () => {
+ var thisResult = $createFulfilledPromise({
+ value: flushed,
+ done: false,
+ });
+ flushed = undefined;
+ $readableStreamClose(stream);
+ stream = undefined;
+ return thisResult;
+ };
+ } else if (this._pendingRead) {
+ var read = this._pendingRead;
+ this._pendingRead = undefined;
+ $putByIdDirectPrivate(this, "pull", $noopDoneFunction);
+ $fulfillPromise(read, { value: undefined, done: true });
+ }
+
+ $readableStreamClose(stream);
+}
+
+export function onFlushDirectStream() {
+ var stream = this.$controlledReadableStream;
+ var reader = $getByIdDirectPrivate(stream, "reader");
+ if (!reader || !$isReadableStreamDefaultReader(reader)) {
+ return;
+ }
+
+ var _pendingRead = this._pendingRead;
+ this._pendingRead = undefined;
+ if (_pendingRead && $isPromise(_pendingRead)) {
+ var flushed = this.$sink.flush();
+ if (flushed?.byteLength) {
+ this._pendingRead = $getByIdDirectPrivate(stream, "readRequests")?.shift();
+ $fulfillPromise(_pendingRead, { value: flushed, done: false });
+ } else {
+ this._pendingRead = _pendingRead;
+ }
+ } else if ($getByIdDirectPrivate(stream, "readRequests")?.isNotEmpty()) {
+ var flushed = this.$sink.flush();
+ if (flushed?.byteLength) {
+ $readableStreamFulfillReadRequest(stream, flushed, false);
+ }
+ } else if (this._deferFlush === -1) {
+ this._deferFlush = 1;
+ }
+}
+
+export function createTextStream(highWaterMark) {
+ var sink;
+ var array = [];
+ var hasString = false;
+ var hasBuffer = false;
+ var rope = "";
+ var estimatedLength = $toLength(0);
+ var capability = $newPromiseCapability(Promise);
+ var calledDone = false;
+
+ sink = {
+ start() {},
+ write(chunk) {
+ if (typeof chunk === "string") {
+ var chunkLength = $toLength(chunk.length);
+ if (chunkLength > 0) {
+ rope += chunk;
+ hasString = true;
+ // TODO: utf16 byte length
+ estimatedLength += chunkLength;
+ }
+
+ return chunkLength;
+ }
+
+ if (!chunk || !($ArrayBuffer.$isView(chunk) || chunk instanceof $ArrayBuffer)) {
+ $throwTypeError("Expected text, ArrayBuffer or ArrayBufferView");
+ }
+
+ const byteLength = $toLength(chunk.byteLength);
+ if (byteLength > 0) {
+ hasBuffer = true;
+ if (rope.length > 0) {
+ $arrayPush(array, rope, chunk);
+ rope = "";
+ } else {
+ $arrayPush(array, chunk);
+ }
+ }
+ estimatedLength += byteLength;
+ return byteLength;
+ },
+
+ flush() {
+ return 0;
+ },
+
+ end() {
+ if (calledDone) {
+ return "";
+ }
+ return sink.fulfill();
+ },
+
+ fulfill() {
+ calledDone = true;
+ const result = sink.finishInternal();
+
+ $fulfillPromise(capability.$promise, result);
+ return result;
+ },
+
+ finishInternal() {
+ if (!hasString && !hasBuffer) {
+ return "";
+ }
+
+ if (hasString && !hasBuffer) {
+ return rope;
+ }
+
+ if (hasBuffer && !hasString) {
+ return new globalThis.TextDecoder().decode($Bun.concatArrayBuffers(array));
+ }
+
+ // worst case: mixed content
+
+ var arrayBufferSink = new $Bun.ArrayBufferSink();
+ arrayBufferSink.start({
+ highWaterMark: estimatedLength,
+ asUint8Array: true,
+ });
+ for (let item of array) {
+ arrayBufferSink.write(item);
+ }
+ array.length = 0;
+ if (rope.length > 0) {
+ arrayBufferSink.write(rope);
+ rope = "";
+ }
+
+ // TODO: use builtin
+ return new globalThis.TextDecoder().decode(arrayBufferSink.end());
+ },
+
+ close() {
+ try {
+ if (!calledDone) {
+ calledDone = true;
+ sink.fulfill();
+ }
+ } catch (e) {}
+ },
+ };
+
+ return [sink, capability];
+}
+
+export function initializeTextStream(underlyingSource, highWaterMark) {
+ var [sink, closingPromise] = $createTextStream(highWaterMark);
+
+ var controller = {
+ $underlyingSource: underlyingSource,
+ $pull: $onPullDirectStream,
+ $controlledReadableStream: this,
+ $sink: sink,
+ close: $onCloseDirectStream,
+ write: sink.write,
+ error: $handleDirectStreamError,
+ end: $onCloseDirectStream,
+ $close: $onCloseDirectStream,
+ flush: $onFlushDirectStream,
+ _pendingRead: undefined,
+ _deferClose: 0,
+ _deferFlush: 0,
+ _deferCloseReason: undefined,
+ _handleError: undefined,
+ };
+
+ $putByIdDirectPrivate(this, "readableStreamController", controller);
+ $putByIdDirectPrivate(this, "underlyingSource", undefined);
+ $putByIdDirectPrivate(this, "start", undefined);
+ return closingPromise;
+}
+
+export function initializeArrayStream(underlyingSource, highWaterMark) {
+ var array = [];
+ var closingPromise = $newPromiseCapability(Promise);
+ var calledDone = false;
+
+ function fulfill() {
+ calledDone = true;
+ closingPromise.$resolve.$call(undefined, array);
+ return array;
+ }
+
+ var sink = {
+ start() {},
+ write(chunk) {
+ $arrayPush(array, chunk);
+ return chunk.byteLength || chunk.length;
+ },
+
+ flush() {
+ return 0;
+ },
+
+ end() {
+ if (calledDone) {
+ return [];
+ }
+ return fulfill();
+ },
+
+ close() {
+ if (!calledDone) {
+ fulfill();
+ }
+ },
+ };
+
+ var controller = {
+ $underlyingSource: underlyingSource,
+ $pull: $onPullDirectStream,
+ $controlledReadableStream: this,
+ $sink: sink,
+ close: $onCloseDirectStream,
+ write: sink.write,
+ error: $handleDirectStreamError,
+ end: $onCloseDirectStream,
+ $close: $onCloseDirectStream,
+ flush: $onFlushDirectStream,
+ _pendingRead: undefined,
+ _deferClose: 0,
+ _deferFlush: 0,
+ _deferCloseReason: undefined,
+ _handleError: undefined,
+ };
+
+ $putByIdDirectPrivate(this, "readableStreamController", controller);
+ $putByIdDirectPrivate(this, "underlyingSource", undefined);
+ $putByIdDirectPrivate(this, "start", undefined);
+ return closingPromise;
+}
+
+export function initializeArrayBufferStream(underlyingSource, highWaterMark) {
+ // This is the fallback implementation for direct streams
+ // When we don't know what the destination type is
+ // We assume it is a Uint8Array.
+
+ var opts =
+ highWaterMark && typeof highWaterMark === "number"
+ ? { highWaterMark, stream: true, asUint8Array: true }
+ : { stream: true, asUint8Array: true };
+ var sink = new $Bun.ArrayBufferSink();
+ sink.start(opts);
+
+ var controller = {
+ $underlyingSource: underlyingSource,
+ $pull: $onPullDirectStream,
+ $controlledReadableStream: this,
+ $sink: sink,
+ close: $onCloseDirectStream,
+ write: sink.write.bind(sink),
+ error: $handleDirectStreamError,
+ end: $onCloseDirectStream,
+ $close: $onCloseDirectStream,
+ flush: $onFlushDirectStream,
+ _pendingRead: undefined,
+ _deferClose: 0,
+ _deferFlush: 0,
+ _deferCloseReason: undefined,
+ _handleError: undefined,
+ };
+
+ $putByIdDirectPrivate(this, "readableStreamController", controller);
+ $putByIdDirectPrivate(this, "underlyingSource", undefined);
+ $putByIdDirectPrivate(this, "start", undefined);
+}
+
+export function readableStreamError(stream, error) {
+ $assert($isReadableStream(stream));
+ $assert($getByIdDirectPrivate(stream, "state") === $streamReadable);
+ $putByIdDirectPrivate(stream, "state", $streamErrored);
+ $putByIdDirectPrivate(stream, "storedError", error);
+
+ const reader = $getByIdDirectPrivate(stream, "reader");
+
+ if (!reader) return;
+
+ if ($isReadableStreamDefaultReader(reader)) {
+ const requests = $getByIdDirectPrivate(reader, "readRequests");
+ $putByIdDirectPrivate(reader, "readRequests", $createFIFO());
+ for (var request = requests.shift(); request; request = requests.shift()) $rejectPromise(request, error);
+ } else {
+ $assert($isReadableStreamBYOBReader(reader));
+ const requests = $getByIdDirectPrivate(reader, "readIntoRequests");
+ $putByIdDirectPrivate(reader, "readIntoRequests", $createFIFO());
+ for (var request = requests.shift(); request; request = requests.shift()) $rejectPromise(request, error);
+ }
+
+ $getByIdDirectPrivate(reader, "closedPromiseCapability").$reject.$call(undefined, error);
+ const promise = $getByIdDirectPrivate(reader, "closedPromiseCapability").$promise;
+ $markPromiseAsHandled(promise);
+}
+
+export function readableStreamDefaultControllerShouldCallPull(controller) {
+ const stream = $getByIdDirectPrivate(controller, "controlledReadableStream");
+
+ if (!$readableStreamDefaultControllerCanCloseOrEnqueue(controller)) return false;
+ if (!($getByIdDirectPrivate(controller, "started") === 1)) return false;
+ if (
+ (!$isReadableStreamLocked(stream) ||
+ !$getByIdDirectPrivate($getByIdDirectPrivate(stream, "reader"), "readRequests")?.isNotEmpty()) &&
+ $readableStreamDefaultControllerGetDesiredSize(controller) <= 0
+ )
+ return false;
+ const desiredSize = $readableStreamDefaultControllerGetDesiredSize(controller);
+ $assert(desiredSize !== null);
+ return desiredSize > 0;
+}
+
+export function readableStreamDefaultControllerCallPullIfNeeded(controller) {
+ // FIXME: use $readableStreamDefaultControllerShouldCallPull
+ const stream = $getByIdDirectPrivate(controller, "controlledReadableStream");
+
+ if (!$readableStreamDefaultControllerCanCloseOrEnqueue(controller)) return;
+ if (!($getByIdDirectPrivate(controller, "started") === 1)) return;
+ if (
+ (!$isReadableStreamLocked(stream) ||
+ !$getByIdDirectPrivate($getByIdDirectPrivate(stream, "reader"), "readRequests")?.isNotEmpty()) &&
+ $readableStreamDefaultControllerGetDesiredSize(controller) <= 0
+ )
+ return;
+
+ if ($getByIdDirectPrivate(controller, "pulling")) {
+ $putByIdDirectPrivate(controller, "pullAgain", true);
+ return;
+ }
+
+ $assert(!$getByIdDirectPrivate(controller, "pullAgain"));
+ $putByIdDirectPrivate(controller, "pulling", true);
+
+ $getByIdDirectPrivate(controller, "pullAlgorithm")
+ .$call(undefined)
+ .$then(
+ function () {
+ $putByIdDirectPrivate(controller, "pulling", false);
+ if ($getByIdDirectPrivate(controller, "pullAgain")) {
+ $putByIdDirectPrivate(controller, "pullAgain", false);
+
+ $readableStreamDefaultControllerCallPullIfNeeded(controller);
+ }
+ },
+ function (error) {
+ $readableStreamDefaultControllerError(controller, error);
+ },
+ );
+}
+
+export function isReadableStreamLocked(stream) {
+ $assert($isReadableStream(stream));
+ return !!$getByIdDirectPrivate(stream, "reader");
+}
+
+export function readableStreamDefaultControllerGetDesiredSize(controller) {
+ const stream = $getByIdDirectPrivate(controller, "controlledReadableStream");
+ const state = $getByIdDirectPrivate(stream, "state");
+
+ if (state === $streamErrored) return null;
+ if (state === $streamClosed) return 0;
+
+ return $getByIdDirectPrivate(controller, "strategy").highWaterMark - $getByIdDirectPrivate(controller, "queue").size;
+}
+
+export function readableStreamReaderGenericCancel(reader, reason) {
+ const stream = $getByIdDirectPrivate(reader, "ownerReadableStream");
+ $assert(!!stream);
+ return $readableStreamCancel(stream, reason);
+}
+
+export function readableStreamCancel(stream, reason) {
+ $putByIdDirectPrivate(stream, "disturbed", true);
+ const state = $getByIdDirectPrivate(stream, "state");
+ if (state === $streamClosed) return Promise.$resolve();
+ if (state === $streamErrored) return Promise.$reject($getByIdDirectPrivate(stream, "storedError"));
+ $readableStreamClose(stream);
+
+ var controller = $getByIdDirectPrivate(stream, "readableStreamController");
+ var cancel = controller.$cancel;
+ if (cancel) {
+ return cancel(controller, reason).$then(function () {});
+ }
+
+ var close = controller.close;
+ if (close) {
+ return Promise.$resolve(controller.close(reason));
+ }
+
+ $throwTypeError("ReadableStreamController has no cancel or close method");
+}
+
+export function readableStreamDefaultControllerCancel(controller, reason) {
+ $putByIdDirectPrivate(controller, "queue", $newQueue());
+ return $getByIdDirectPrivate(controller, "cancelAlgorithm").$call(undefined, reason);
+}
+
+export function readableStreamDefaultControllerPull(controller) {
+ var queue = $getByIdDirectPrivate(controller, "queue");
+ if (queue.content.isNotEmpty()) {
+ const chunk = $dequeueValue(queue);
+ if ($getByIdDirectPrivate(controller, "closeRequested") && queue.content.isEmpty())
+ $readableStreamClose($getByIdDirectPrivate(controller, "controlledReadableStream"));
+ else $readableStreamDefaultControllerCallPullIfNeeded(controller);
+
+ return $createFulfilledPromise({ value: chunk, done: false });
+ }
+ const pendingPromise = $readableStreamAddReadRequest($getByIdDirectPrivate(controller, "controlledReadableStream"));
+ $readableStreamDefaultControllerCallPullIfNeeded(controller);
+ return pendingPromise;
+}
+
+export function readableStreamDefaultControllerClose(controller) {
+ $assert($readableStreamDefaultControllerCanCloseOrEnqueue(controller));
+ $putByIdDirectPrivate(controller, "closeRequested", true);
+ if ($getByIdDirectPrivate(controller, "queue")?.content?.isEmpty())
+ $readableStreamClose($getByIdDirectPrivate(controller, "controlledReadableStream"));
+}
+
+export function readableStreamClose(stream) {
+ $assert($getByIdDirectPrivate(stream, "state") === $streamReadable);
+ $putByIdDirectPrivate(stream, "state", $streamClosed);
+ if (!$getByIdDirectPrivate(stream, "reader")) return;
+
+ if ($isReadableStreamDefaultReader($getByIdDirectPrivate(stream, "reader"))) {
+ const requests = $getByIdDirectPrivate($getByIdDirectPrivate(stream, "reader"), "readRequests");
+ if (requests.isNotEmpty()) {
+ $putByIdDirectPrivate($getByIdDirectPrivate(stream, "reader"), "readRequests", $createFIFO());
+
+ for (var request = requests.shift(); request; request = requests.shift())
+ $fulfillPromise(request, { value: undefined, done: true });
+ }
+ }
+
+ $getByIdDirectPrivate($getByIdDirectPrivate(stream, "reader"), "closedPromiseCapability").$resolve.$call();
+}
+
+export function readableStreamFulfillReadRequest(stream, chunk, done) {
+ const readRequest = $getByIdDirectPrivate($getByIdDirectPrivate(stream, "reader"), "readRequests").shift();
+ $fulfillPromise(readRequest, { value: chunk, done: done });
+}
+
+export function readableStreamDefaultControllerEnqueue(controller, chunk) {
+ const stream = $getByIdDirectPrivate(controller, "controlledReadableStream");
+ // this is checked by callers
+ $assert($readableStreamDefaultControllerCanCloseOrEnqueue(controller));
+
+ if (
+ $isReadableStreamLocked(stream) &&
+ $getByIdDirectPrivate($getByIdDirectPrivate(stream, "reader"), "readRequests")?.isNotEmpty()
+ ) {
+ $readableStreamFulfillReadRequest(stream, chunk, false);
+ $readableStreamDefaultControllerCallPullIfNeeded(controller);
+ return;
+ }
+
+ try {
+ let chunkSize = 1;
+ if ($getByIdDirectPrivate(controller, "strategy").size !== undefined)
+ chunkSize = $getByIdDirectPrivate(controller, "strategy").size(chunk);
+ $enqueueValueWithSize($getByIdDirectPrivate(controller, "queue"), chunk, chunkSize);
+ } catch (error) {
+ $readableStreamDefaultControllerError(controller, error);
+ throw error;
+ }
+ $readableStreamDefaultControllerCallPullIfNeeded(controller);
+}
+
+export function readableStreamDefaultReaderRead(reader) {
+ const stream = $getByIdDirectPrivate(reader, "ownerReadableStream");
+ $assert(!!stream);
+ const state = $getByIdDirectPrivate(stream, "state");
+
+ $putByIdDirectPrivate(stream, "disturbed", true);
+ if (state === $streamClosed) return $createFulfilledPromise({ value: undefined, done: true });
+ if (state === $streamErrored) return Promise.$reject($getByIdDirectPrivate(stream, "storedError"));
+ $assert(state === $streamReadable);
+
+ return $getByIdDirectPrivate(stream, "readableStreamController").$pull(
+ $getByIdDirectPrivate(stream, "readableStreamController"),
+ );
+}
+
+export function readableStreamAddReadRequest(stream) {
+ $assert($isReadableStreamDefaultReader($getByIdDirectPrivate(stream, "reader")));
+ $assert($getByIdDirectPrivate(stream, "state") == $streamReadable);
+
+ const readRequest = $newPromise();
+
+ $getByIdDirectPrivate($getByIdDirectPrivate(stream, "reader"), "readRequests").push(readRequest);
+
+ return readRequest;
+}
+
+export function isReadableStreamDisturbed(stream) {
+ $assert($isReadableStream(stream));
+ return $getByIdDirectPrivate(stream, "disturbed");
+}
+
+export function readableStreamReaderGenericRelease(reader) {
+ $assert(!!$getByIdDirectPrivate(reader, "ownerReadableStream"));
+ $assert($getByIdDirectPrivate($getByIdDirectPrivate(reader, "ownerReadableStream"), "reader") === reader);
+
+ if ($getByIdDirectPrivate($getByIdDirectPrivate(reader, "ownerReadableStream"), "state") === $streamReadable)
+ $getByIdDirectPrivate(reader, "closedPromiseCapability").$reject.$call(
+ undefined,
+ $makeTypeError("releasing lock of reader whose stream is still in readable state"),
+ );
+ else
+ $putByIdDirectPrivate(reader, "closedPromiseCapability", {
+ $promise: $newHandledRejectedPromise($makeTypeError("reader released lock")),
+ });
+
+ const promise = $getByIdDirectPrivate(reader, "closedPromiseCapability").$promise;
+ $markPromiseAsHandled(promise);
+ $putByIdDirectPrivate($getByIdDirectPrivate(reader, "ownerReadableStream"), "reader", undefined);
+ $putByIdDirectPrivate(reader, "ownerReadableStream", undefined);
+}
+
+export function readableStreamDefaultControllerCanCloseOrEnqueue(controller) {
+ return (
+ !$getByIdDirectPrivate(controller, "closeRequested") &&
+ $getByIdDirectPrivate($getByIdDirectPrivate(controller, "controlledReadableStream"), "state") === $streamReadable
+ );
+}
+
+export function lazyLoadStream(stream, autoAllocateChunkSize) {
+ var nativeType = $getByIdDirectPrivate(stream, "bunNativeType");
+ var nativePtr = $getByIdDirectPrivate(stream, "bunNativePtr");
+ var Prototype = $lazyStreamPrototypeMap.$get(nativeType);
+ if (Prototype === undefined) {
+ var [pull, start, cancel, setClose, deinit, setRefOrUnref, drain] = $lazyLoad(nativeType);
+ var closer = [false];
+ var handleResult;
+ function handleNativeReadableStreamPromiseResult(val) {
+ var { c, v } = this;
+ this.c = undefined;
+ this.v = undefined;
+ handleResult(val, c, v);
+ }
+
+ function callClose(controller) {
+ try {
+ controller.close();
+ } catch (e) {
+ globalThis.reportError(e);
+ }
+ }
+
+ handleResult = function handleResult(result, controller, view) {
+ if (result && $isPromise(result)) {
+ return result.then(
+ handleNativeReadableStreamPromiseResult.bind({
+ c: controller,
+ v: view,
+ }),
+ err => controller.error(err),
+ );
+ } else if (typeof result === "number") {
+ if (view && view.byteLength === result && view.buffer === controller.byobRequest?.view?.buffer) {
+ controller.byobRequest.respondWithNewView(view);
+ } else {
+ controller.byobRequest.respond(result);
+ }
+ } else if (result.constructor === $Uint8Array) {
+ controller.enqueue(result);
+ }
+
+ if (closer[0] || result === false) {
+ $enqueueJob(callClose, controller);
+ closer[0] = false;
+ }
+ };
+
+ function createResult(tag, controller, view, closer) {
+ closer[0] = false;
+
+ var result;
+ try {
+ result = pull(tag, view, closer);
+ } catch (err) {
+ return controller.error(err);
+ }
+
+ return handleResult(result, controller, view);
+ }
+
+ const registry = deinit ? new FinalizationRegistry(deinit) : null;
+ Prototype = class NativeReadableStreamSource {
+ constructor(tag, autoAllocateChunkSize, drainValue) {
+ this.#tag = tag;
+ this.#cancellationToken = {};
+ this.pull = this.#pull.bind(this);
+ this.cancel = this.#cancel.bind(this);
+ this.autoAllocateChunkSize = autoAllocateChunkSize;
+
+ if (drainValue !== undefined) {
+ this.start = controller => {
+ controller.enqueue(drainValue);
+ };
+ }
+
+ if (registry) {
+ registry.register(this, tag, this.#cancellationToken);
+ }
+ }
+
+ #cancellationToken;
+ pull;
+ cancel;
+ start;
+
+ #tag;
+ type = "bytes";
+ autoAllocateChunkSize = 0;
+
+ static startSync = start;
+
+ #pull(controller) {
+ var tag = this.#tag;
+
+ if (!tag) {
+ controller.close();
+ return;
+ }
+
+ createResult(tag, controller, controller.byobRequest.view, closer);
+ }
+
+ #cancel(reason) {
+ var tag = this.#tag;
+
+ registry && registry.unregister(this.#cancellationToken);
+ setRefOrUnref && setRefOrUnref(tag, false);
+ cancel(tag, reason);
+ }
+ static deinit = deinit;
+ static drain = drain;
+ };
+ $lazyStreamPrototypeMap.$set(nativeType, Prototype);
+ }
+
+ const chunkSize = Prototype.startSync(nativePtr, autoAllocateChunkSize);
+ var drainValue;
+ const { drain: drainFn, deinit: deinitFn } = Prototype;
+ if (drainFn) {
+ drainValue = drainFn(nativePtr);
+ }
+
+ // empty file, no need for native back-and-forth on this
+ if (chunkSize === 0) {
+ deinit && nativePtr && $enqueueJob(deinit, nativePtr);
+
+ if ((drainValue?.byteLength ?? 0) > 0) {
+ return {
+ start(controller) {
+ controller.enqueue(drainValue);
+ controller.close();
+ },
+ type: "bytes",
+ };
+ }
+
+ return {
+ start(controller) {
+ controller.close();
+ },
+ type: "bytes",
+ };
+ }
+
+ return new Prototype(nativePtr, chunkSize, drainValue);
+}
+
+export function readableStreamIntoArray(stream) {
+ var reader = stream.getReader();
+ var manyResult = reader.readMany();
+
+ async function processManyResult(result) {
+ if (result.done) {
+ return [];
+ }
+
+ var chunks = result.value || [];
+
+ while (true) {
+ var thisResult = await reader.read();
+ if (thisResult.done) {
+ break;
+ }
+ chunks = chunks.concat(thisResult.value);
+ }
+
+ return chunks;
+ }
+
+ if (manyResult && $isPromise(manyResult)) {
+ return manyResult.$then(processManyResult);
+ }
+
+ return processManyResult(manyResult);
+}
+
+export function readableStreamIntoText(stream) {
+ const [textStream, closer] = $createTextStream($getByIdDirectPrivate(stream, "highWaterMark"));
+ const prom = $readStreamIntoSink(stream, textStream, false);
+ if (prom && $isPromise(prom)) {
+ return Promise.$resolve(prom).$then(closer.$promise);
+ }
+ return closer.$promise;
+}
+
+export function readableStreamToArrayBufferDirect(stream, underlyingSource) {
+ var sink = new $Bun.ArrayBufferSink();
+ $putByIdDirectPrivate(stream, "underlyingSource", undefined);
+ var highWaterMark = $getByIdDirectPrivate(stream, "highWaterMark");
+ sink.start(highWaterMark ? { highWaterMark } : {});
+ var capability = $newPromiseCapability(Promise);
+ var ended = false;
+ var pull = underlyingSource.pull;
+ var close = underlyingSource.close;
+
+ var controller = {
+ start() {},
+ close(reason) {
+ if (!ended) {
+ ended = true;
+ if (close) {
+ close();
+ }
+
+ $fulfillPromise(capability.$promise, sink.end());
+ }
+ },
+ end() {
+ if (!ended) {
+ ended = true;
+ if (close) {
+ close();
+ }
+ $fulfillPromise(capability.$promise, sink.end());
+ }
+ },
+ flush() {
+ return 0;
+ },
+ write: sink.write.bind(sink),
+ };
+
+ var didError = false;
+ try {
+ const firstPull = pull(controller);
+ if (firstPull && $isObject(firstPull) && $isPromise(firstPull)) {
+ return (async function (controller, promise, pull) {
+ while (!ended) {
+ await pull(controller);
+ }
+ return await promise;
+ })(controller, promise, pull);
+ }
+
+ return capability.$promise;
+ } catch (e) {
+ didError = true;
+ $readableStreamError(stream, e);
+ return Promise.$reject(e);
+ } finally {
+ if (!didError && stream) $readableStreamClose(stream);
+ controller = close = sink = pull = stream = undefined;
+ }
+}
+
+export async function readableStreamToTextDirect(stream, underlyingSource) {
+ const capability = $initializeTextStream.$call(stream, underlyingSource, undefined);
+ var reader = stream.getReader();
+
+ while ($getByIdDirectPrivate(stream, "state") === $streamReadable) {
+ var thisResult = await reader.read();
+ if (thisResult.done) {
+ break;
+ }
+ }
+
+ try {
+ reader.releaseLock();
+ } catch (e) {}
+ reader = undefined;
+ stream = undefined;
+
+ return capability.$promise;
+}
+
+export async function readableStreamToArrayDirect(stream, underlyingSource) {
+ const capability = $initializeArrayStream.$call(stream, underlyingSource, undefined);
+ underlyingSource = undefined;
+ var reader = stream.getReader();
+ try {
+ while ($getByIdDirectPrivate(stream, "state") === $streamReadable) {
+ var thisResult = await reader.read();
+ if (thisResult.done) {
+ break;
+ }
+ }
+
+ try {
+ reader.releaseLock();
+ } catch (e) {}
+ reader = undefined;
+
+ return Promise.$resolve(capability.$promise);
+ } catch (e) {
+ throw e;
+ } finally {
+ stream = undefined;
+ reader = undefined;
+ }
+}
+
+export function readableStreamDefineLazyIterators(prototype) {
+ var asyncIterator = globalThis.Symbol.asyncIterator;
+
+ var ReadableStreamAsyncIterator = async function* ReadableStreamAsyncIterator(stream, preventCancel) {
+ var reader = stream.getReader();
+ var deferredError;
+ try {
+ while (true) {
+ var done, value;
+ const firstResult = reader.readMany();
+ if ($isPromise(firstResult)) {
+ ({ done, value } = await firstResult);
+ } else {
+ ({ done, value } = firstResult);
+ }
+
+ if (done) {
+ return;
+ }
+ yield* value;
+ }
+ } catch (e) {
+ deferredError = e;
+ } finally {
+ reader.releaseLock();
+
+ if (!preventCancel) {
+ stream.cancel(deferredError);
+ }
+
+ if (deferredError) {
+ throw deferredError;
+ }
+ }
+ };
+ var createAsyncIterator = function asyncIterator() {
+ return ReadableStreamAsyncIterator(this, false);
+ };
+ var createValues = function values({ preventCancel = false } = { preventCancel: false }) {
+ return ReadableStreamAsyncIterator(this, preventCancel);
+ };
+ $Object.$defineProperty(prototype, asyncIterator, { value: createAsyncIterator });
+ $Object.$defineProperty(prototype, "values", { value: createValues });
+ return prototype;
+}