aboutsummaryrefslogtreecommitdiff
path: root/src/bun.js/builtins/js/ReadableStreamInternals.js
diff options
context:
space:
mode:
Diffstat (limited to 'src/bun.js/builtins/js/ReadableStreamInternals.js')
-rw-r--r--src/bun.js/builtins/js/ReadableStreamInternals.js1255
1 files changed, 1255 insertions, 0 deletions
diff --git a/src/bun.js/builtins/js/ReadableStreamInternals.js b/src/bun.js/builtins/js/ReadableStreamInternals.js
new file mode 100644
index 000000000..3e6590f31
--- /dev/null
+++ b/src/bun.js/builtins/js/ReadableStreamInternals.js
@@ -0,0 +1,1255 @@
+/*
+ * Copyright (C) 2015 Canon Inc. All rights reserved.
+ * Copyright (C) 2015 Igalia.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ * 1. Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY APPLE INC. ``AS IS'' AND ANY
+ * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+ * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL APPLE INC. OR
+ * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
+ * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
+ * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
+ * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY
+ * OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+// @internal
+
+function readableStreamReaderGenericInitialize(reader, stream)
+{
+ "use strict";
+
+ @putByIdDirectPrivate(reader, "ownerReadableStream", stream);
+ @putByIdDirectPrivate(stream, "reader", reader);
+ if (@getByIdDirectPrivate(stream, "state") === @streamReadable)
+ @putByIdDirectPrivate(reader, "closedPromiseCapability", @newPromiseCapability(@Promise));
+ else if (@getByIdDirectPrivate(stream, "state") === @streamClosed)
+ @putByIdDirectPrivate(reader, "closedPromiseCapability", { @promise: @Promise.@resolve() });
+ else {
+ @assert(@getByIdDirectPrivate(stream, "state") === @streamErrored);
+ @putByIdDirectPrivate(reader, "closedPromiseCapability", { @promise: @newHandledRejectedPromise(@getByIdDirectPrivate(stream, "storedError")) });
+ }
+}
+
+function privateInitializeReadableStreamDefaultController(stream, underlyingSource, size, highWaterMark)
+{
+ "use strict";
+
+ if (!@isReadableStream(stream))
+ @throwTypeError("ReadableStreamDefaultController needs a ReadableStream");
+
+ // readableStreamController is initialized with null value.
+ if (@getByIdDirectPrivate(stream, "readableStreamController") !== null)
+ @throwTypeError("ReadableStream already has a controller");
+
+
+
+ @putByIdDirectPrivate(this, "controlledReadableStream", stream);
+ @putByIdDirectPrivate(this, "underlyingSource", underlyingSource);
+ @putByIdDirectPrivate(this, "queue", @newQueue());
+ @putByIdDirectPrivate(this, "started", -1);
+ @putByIdDirectPrivate(this, "closeRequested", false);
+ @putByIdDirectPrivate(this, "pullAgain", false);
+ @putByIdDirectPrivate(this, "pulling", false);
+ @putByIdDirectPrivate(this, "strategy", @validateAndNormalizeQueuingStrategy(size, highWaterMark));
+
+ return this;
+}
+
+function readableStreamDefaultControllerError(controller, error)
+{
+ "use strict";
+
+ const stream = @getByIdDirectPrivate(controller, "controlledReadableStream");
+ if (@getByIdDirectPrivate(stream, "state") !== @streamReadable)
+ return;
+ @putByIdDirectPrivate(controller, "queue", @newQueue());
+
+ @readableStreamError(stream, error);
+}
+
+function readableStreamPipeTo(stream, sink)
+{
+ "use strict";
+ @assert(@isReadableStream(stream));
+
+ const reader = new @ReadableStreamDefaultReader(stream);
+
+ @getByIdDirectPrivate(reader, "closedPromiseCapability").@promise.@then(() => { }, (e) => { sink.error(e); });
+
+ function doPipe() {
+ @readableStreamDefaultReaderRead(reader).@then(function(result) {
+ if (result.done) {
+ sink.close();
+ return;
+ }
+ try {
+ sink.enqueue(result.value);
+ } catch (e) {
+ sink.error("ReadableStream chunk enqueueing in the sink failed");
+ return;
+ }
+ doPipe();
+ }, function(e) {
+ sink.error(e);
+ });
+ }
+ doPipe();
+}
+
+
+
+function acquireReadableStreamDefaultReader(stream)
+{
+ "use strict";
+ var start = @getByIdDirectPrivate(stream, "start");
+ if (start) {
+ start.@call(stream);
+ }
+
+ return new @ReadableStreamDefaultReader(stream);
+}
+
+// https://streams.spec.whatwg.org/#set-up-readable-stream-default-controller, starting from step 6.
+// The other part is implemented in privateInitializeReadableStreamDefaultController.
+function setupReadableStreamDefaultController(stream, underlyingSource, size, highWaterMark, startMethod, pullMethod, cancelMethod)
+{
+ "use strict";
+
+ const controller = new @ReadableStreamDefaultController(stream, underlyingSource, size, highWaterMark, @isReadableStream);
+
+ const pullAlgorithm = () => @promiseInvokeOrNoopMethod(underlyingSource, pullMethod, [controller]);
+ const cancelAlgorithm = (reason) => @promiseInvokeOrNoopMethod(underlyingSource, cancelMethod, [reason]);
+
+ @putByIdDirectPrivate(controller, "pullAlgorithm", pullAlgorithm);
+ @putByIdDirectPrivate(controller, "cancelAlgorithm", cancelAlgorithm);
+ @putByIdDirectPrivate(controller, "pull", @readableStreamDefaultControllerPull);
+ @putByIdDirectPrivate(controller, "cancel", @readableStreamDefaultControllerCancel);
+ @putByIdDirectPrivate(stream, "readableStreamController", controller);
+
+ @readableStreamDefaultControllerStart(controller);
+
+}
+
+
+function createReadableStreamController(stream, underlyingSource, strategy) {
+ "use strict";
+
+ const type = underlyingSource.type;
+ const typeString = @toString(type);
+
+ if (typeString === "bytes") {
+ // if (!@readableByteStreamAPIEnabled())
+ // @throwTypeError("ReadableByteStreamController is not implemented");
+
+ if (strategy.highWaterMark === @undefined)
+ strategy.highWaterMark = 0;
+ if (strategy.size !== @undefined)
+ @throwRangeError("Strategy for a ReadableByteStreamController cannot have a size");
+
+ @putByIdDirectPrivate(stream, "readableStreamController", new @ReadableByteStreamController(stream, underlyingSource, strategy.highWaterMark, @isReadableStream));
+ } else if (typeString === "direct") {
+ var highWaterMark = strategy?.highWaterMark;
+ @initializeArrayBufferStream.@call(stream, underlyingSource, highWaterMark);
+ } else if (type === @undefined) {
+ if (strategy.highWaterMark === @undefined)
+ strategy.highWaterMark = 1;
+
+ @setupReadableStreamDefaultController(stream, underlyingSource, strategy.size, strategy.highWaterMark, underlyingSource.start, underlyingSource.pull, underlyingSource.cancel);
+ } else
+ @throwRangeError("Invalid type for underlying source");
+
+}
+
+function readableStreamDefaultControllerStart(controller) {
+ "use strict";
+
+
+
+ if (@getByIdDirectPrivate(controller, "started") !== -1)
+ return;
+
+ const underlyingSource = @getByIdDirectPrivate(controller, "underlyingSource");
+ const startMethod = underlyingSource.start;
+ @putByIdDirectPrivate(controller, "started", 0);
+
+ @promiseInvokeOrNoopMethodNoCatch(underlyingSource, startMethod, [controller]).@then(() => {
+ @putByIdDirectPrivate(controller, "started", 1);
+ @assert(!@getByIdDirectPrivate(controller, "pulling"));
+ @assert(!@getByIdDirectPrivate(controller, "pullAgain"));
+ @readableStreamDefaultControllerCallPullIfNeeded(controller);
+ }, (error) => {
+ @readableStreamDefaultControllerError(controller, error);
+ });
+}
+
+
+// FIXME: Replace readableStreamPipeTo by below function.
+// This method implements the latest https://streams.spec.whatwg.org/#readable-stream-pipe-to.
+function readableStreamPipeToWritableStream(source, destination, preventClose, preventAbort, preventCancel, signal)
+{
+ "use strict";
+
+ @assert(@isReadableStream(source));
+ @assert(@isWritableStream(destination));
+ @assert(!@isReadableStreamLocked(source));
+ @assert(!@isWritableStreamLocked(destination));
+ @assert(signal === @undefined || @isAbortSignal(signal));
+
+ if (@getByIdDirectPrivate(source, "underlyingByteSource") !== @undefined)
+ return @Promise.@reject("Piping to a readable bytestream is not supported");
+
+ let pipeState = { source : source, destination : destination, preventAbort : preventAbort, preventCancel : preventCancel, preventClose : preventClose, signal : signal };
+
+ pipeState.reader = @acquireReadableStreamDefaultReader(source);
+ pipeState.writer = @acquireWritableStreamDefaultWriter(destination);
+
+ @putByIdDirectPrivate(source, "disturbed", true);
+
+ pipeState.finalized = false;
+ pipeState.shuttingDown = false;
+ pipeState.promiseCapability = @newPromiseCapability(@Promise);
+ pipeState.pendingReadPromiseCapability = @newPromiseCapability(@Promise);
+ pipeState.pendingReadPromiseCapability.@resolve.@call();
+ pipeState.pendingWritePromise = @Promise.@resolve();
+
+ if (signal !== @undefined) {
+ const algorithm = () => {
+ if (pipeState.finalized)
+ return;
+
+ const error = @makeDOMException("AbortError", "abort pipeTo from signal");
+
+ @pipeToShutdownWithAction(pipeState, () => {
+ const shouldAbortDestination = !pipeState.preventAbort && @getByIdDirectPrivate(pipeState.destination, "state") === "writable";
+ const promiseDestination = shouldAbortDestination ? @writableStreamAbort(pipeState.destination, error) : @Promise.@resolve();
+
+ const shouldAbortSource = !pipeState.preventCancel && @getByIdDirectPrivate(pipeState.source, "state") === @streamReadable;
+ const promiseSource = shouldAbortSource ? @readableStreamCancel(pipeState.source, error) : @Promise.@resolve();
+
+ let promiseCapability = @newPromiseCapability(@Promise);
+ let shouldWait = true;
+ let handleResolvedPromise = () => {
+ if (shouldWait) {
+ shouldWait = false;
+ return;
+ }
+ promiseCapability.@resolve.@call();
+ }
+ let handleRejectedPromise = (e) => {
+ promiseCapability.@reject.@call(@undefined, e);
+ }
+ promiseDestination.@then(handleResolvedPromise, handleRejectedPromise);
+ promiseSource.@then(handleResolvedPromise, handleRejectedPromise);
+ return promiseCapability.@promise;
+ }, error);
+ };
+ if (@whenSignalAborted(signal, algorithm))
+ return pipeState.promiseCapability.@promise;
+ }
+
+ @pipeToErrorsMustBePropagatedForward(pipeState);
+ @pipeToErrorsMustBePropagatedBackward(pipeState);
+ @pipeToClosingMustBePropagatedForward(pipeState);
+ @pipeToClosingMustBePropagatedBackward(pipeState);
+
+ @pipeToLoop(pipeState);
+
+ return pipeState.promiseCapability.@promise;
+}
+
+function pipeToLoop(pipeState)
+{
+ "use strict";
+ if (pipeState.shuttingDown)
+ return;
+
+ @pipeToDoReadWrite(pipeState).@then((result) => {
+ if (result)
+ @pipeToLoop(pipeState);
+ });
+}
+
+function pipeToDoReadWrite(pipeState)
+{
+ "use strict";
+ @assert(!pipeState.shuttingDown);
+
+ pipeState.pendingReadPromiseCapability = @newPromiseCapability(@Promise);
+ @getByIdDirectPrivate(pipeState.writer, "readyPromise").@promise.@then(() => {
+ if (pipeState.shuttingDown) {
+ pipeState.pendingReadPromiseCapability.@resolve.@call(@undefined, false);
+ return;
+ }
+
+ @readableStreamDefaultReaderRead(pipeState.reader).@then((result) => {
+ const canWrite = !result.done && @getByIdDirectPrivate(pipeState.writer, "stream") !== @undefined;
+ pipeState.pendingReadPromiseCapability.@resolve.@call(@undefined, canWrite);
+ if (!canWrite)
+ return;
+
+ pipeState.pendingWritePromise = @writableStreamDefaultWriterWrite(pipeState.writer, result.value);
+ }, (e) => {
+ pipeState.pendingReadPromiseCapability.@resolve.@call(@undefined, false);
+ });
+ }, (e) => {
+ pipeState.pendingReadPromiseCapability.@resolve.@call(@undefined, false);
+ });
+ return pipeState.pendingReadPromiseCapability.@promise;
+}
+
+function pipeToErrorsMustBePropagatedForward(pipeState)
+{
+ "use strict";
+
+ const action = () => {
+ pipeState.pendingReadPromiseCapability.@resolve.@call(@undefined, false);
+ const error = @getByIdDirectPrivate(pipeState.source, "storedError");
+ if (!pipeState.preventAbort) {
+ @pipeToShutdownWithAction(pipeState, () => @writableStreamAbort(pipeState.destination, error), error);
+ return;
+ }
+ @pipeToShutdown(pipeState, error);
+ };
+
+ if (@getByIdDirectPrivate(pipeState.source, "state") === @streamErrored) {
+ action();
+ return;
+ }
+
+ @getByIdDirectPrivate(pipeState.reader, "closedPromiseCapability").@promise.@then(@undefined, action);
+}
+
+function pipeToErrorsMustBePropagatedBackward(pipeState)
+{
+ "use strict";
+ const action = () => {
+ const error = @getByIdDirectPrivate(pipeState.destination, "storedError");
+ if (!pipeState.preventCancel) {
+ @pipeToShutdownWithAction(pipeState, () => @readableStreamCancel(pipeState.source, error), error);
+ return;
+ }
+ @pipeToShutdown(pipeState, error);
+ };
+ if (@getByIdDirectPrivate(pipeState.destination, "state") === "errored") {
+ action();
+ return;
+ }
+ @getByIdDirectPrivate(pipeState.writer, "closedPromise").@promise.@then(@undefined, action);
+}
+
+function pipeToClosingMustBePropagatedForward(pipeState)
+{
+ "use strict";
+ const action = () => {
+ pipeState.pendingReadPromiseCapability.@resolve.@call(@undefined, false);
+ const error = @getByIdDirectPrivate(pipeState.source, "storedError");
+ if (!pipeState.preventClose) {
+ @pipeToShutdownWithAction(pipeState, () => @writableStreamDefaultWriterCloseWithErrorPropagation(pipeState.writer));
+ return;
+ }
+ @pipeToShutdown(pipeState);
+ };
+ if (@getByIdDirectPrivate(pipeState.source, "state") === @streamClosed) {
+ action();
+ return;
+ }
+ @getByIdDirectPrivate(pipeState.reader, "closedPromiseCapability").@promise.@then(action, @undefined);
+}
+
+function pipeToClosingMustBePropagatedBackward(pipeState)
+{
+ "use strict";
+ if (!@writableStreamCloseQueuedOrInFlight(pipeState.destination) && @getByIdDirectPrivate(pipeState.destination, "state") !== "closed")
+ return;
+
+ // @assert no chunks have been read/written
+
+ const error = @makeTypeError("closing is propagated backward");
+ if (!pipeState.preventCancel) {
+ @pipeToShutdownWithAction(pipeState, () => @readableStreamCancel(pipeState.source, error), error);
+ return;
+ }
+ @pipeToShutdown(pipeState, error);
+}
+
+function pipeToShutdownWithAction(pipeState, action)
+{
+ "use strict";
+
+ if (pipeState.shuttingDown)
+ return;
+
+ pipeState.shuttingDown = true;
+
+ const hasError = arguments.length > 2;
+ const error = arguments[2];
+ const finalize = () => {
+ const promise = action();
+ promise.@then(() => {
+ if (hasError)
+ @pipeToFinalize(pipeState, error);
+ else
+ @pipeToFinalize(pipeState);
+ }, (e) => {
+ @pipeToFinalize(pipeState, e);
+ });
+ };
+
+ if (@getByIdDirectPrivate(pipeState.destination, "state") === "writable" && !@writableStreamCloseQueuedOrInFlight(pipeState.destination)) {
+ pipeState.pendingReadPromiseCapability.@promise.@then(() => {
+ pipeState.pendingWritePromise.@then(finalize, finalize);
+ }, (e) => @pipeToFinalize(pipeState, e));
+ return;
+ }
+
+ finalize();
+}
+
+function pipeToShutdown(pipeState)
+{
+ "use strict";
+
+ if (pipeState.shuttingDown)
+ return;
+
+ pipeState.shuttingDown = true;
+
+ const hasError = arguments.length > 1;
+ const error = arguments[1];
+ const finalize = () => {
+ if (hasError)
+ @pipeToFinalize(pipeState, error);
+ else
+ @pipeToFinalize(pipeState);
+ };
+
+ if (@getByIdDirectPrivate(pipeState.destination, "state") === "writable" && !@writableStreamCloseQueuedOrInFlight(pipeState.destination)) {
+ pipeState.pendingReadPromiseCapability.@promise.@then(() => {
+ pipeState.pendingWritePromise.@then(finalize, finalize);
+ }, (e) => @pipeToFinalize(pipeState, e));
+ return;
+ }
+ finalize();
+}
+
+function pipeToFinalize(pipeState)
+{
+ "use strict";
+
+ @writableStreamDefaultWriterRelease(pipeState.writer);
+ @readableStreamReaderGenericRelease(pipeState.reader);
+
+ // Instead of removing the abort algorithm as per spec, we make it a no-op which is equivalent.
+ pipeState.finalized = true;
+
+ if (arguments.length > 1)
+ pipeState.promiseCapability.@reject.@call(@undefined, arguments[1]);
+ else
+ pipeState.promiseCapability.@resolve.@call();
+}
+
+function readableStreamTee(stream, shouldClone)
+{
+ "use strict";
+
+ @assert(@isReadableStream(stream));
+ @assert(typeof(shouldClone) === "boolean");
+
+ const reader = new @ReadableStreamDefaultReader(stream);
+
+ const teeState = {
+ closedOrErrored: false,
+ canceled1: false,
+ canceled2: false,
+ reason1: @undefined,
+ reason2: @undefined,
+ };
+
+ teeState.cancelPromiseCapability = @newPromiseCapability(@Promise);
+
+ const pullFunction = @readableStreamTeePullFunction(teeState, reader, shouldClone);
+
+ const branch1Source = { };
+ @putByIdDirectPrivate(branch1Source, "pull", pullFunction);
+ @putByIdDirectPrivate(branch1Source, "cancel", @readableStreamTeeBranch1CancelFunction(teeState, stream));
+
+ const branch2Source = { };
+ @putByIdDirectPrivate(branch2Source, "pull", pullFunction);
+ @putByIdDirectPrivate(branch2Source, "cancel", @readableStreamTeeBranch2CancelFunction(teeState, stream));
+
+ const branch1 = new @ReadableStream(branch1Source);
+ const branch2 = new @ReadableStream(branch2Source);
+
+ @getByIdDirectPrivate(reader, "closedPromiseCapability").@promise.@then(@undefined, function(e) {
+ if (teeState.closedOrErrored)
+ return;
+ @readableStreamDefaultControllerError(branch1.@readableStreamController, e);
+ @readableStreamDefaultControllerError(branch2.@readableStreamController, e);
+ teeState.closedOrErrored = true;
+ if (!teeState.canceled1 || !teeState.canceled2)
+ teeState.cancelPromiseCapability.@resolve.@call();
+ });
+
+ // Additional fields compared to the spec, as they are needed within pull/cancel functions.
+ teeState.branch1 = branch1;
+ teeState.branch2 = branch2;
+
+ return [branch1, branch2];
+}
+
+function readableStreamTeePullFunction(teeState, reader, shouldClone)
+{
+ "use strict";
+
+ return function() {
+ @Promise.prototype.@then.@call(@readableStreamDefaultReaderRead(reader), function(result) {
+ @assert(@isObject(result));
+ @assert(typeof result.done === "boolean");
+ if (result.done && !teeState.closedOrErrored) {
+ if (!teeState.canceled1)
+ @readableStreamDefaultControllerClose(teeState.branch1.@readableStreamController);
+ if (!teeState.canceled2)
+ @readableStreamDefaultControllerClose(teeState.branch2.@readableStreamController);
+ teeState.closedOrErrored = true;
+ if (!teeState.canceled1 || !teeState.canceled2)
+ teeState.cancelPromiseCapability.@resolve.@call();
+ }
+ if (teeState.closedOrErrored)
+ return;
+ if (!teeState.canceled1)
+ @readableStreamDefaultControllerEnqueue(teeState.branch1.@readableStreamController, result.value);
+ if (!teeState.canceled2)
+ @readableStreamDefaultControllerEnqueue(teeState.branch2.@readableStreamController, shouldClone ? @structuredCloneForStream(result.value) : result.value);
+ });
+ }
+}
+
+function readableStreamTeeBranch1CancelFunction(teeState, stream)
+{
+ "use strict";
+
+ return function(r) {
+ teeState.canceled1 = true;
+ teeState.reason1 = r;
+ if (teeState.canceled2) {
+ @readableStreamCancel(stream, [teeState.reason1, teeState.reason2]).@then(
+ teeState.cancelPromiseCapability.@resolve,
+ teeState.cancelPromiseCapability.@reject);
+ }
+ return teeState.cancelPromiseCapability.@promise;
+ }
+}
+
+function readableStreamTeeBranch2CancelFunction(teeState, stream)
+{
+ "use strict";
+
+ return function(r) {
+ teeState.canceled2 = true;
+ teeState.reason2 = r;
+ if (teeState.canceled1) {
+ @readableStreamCancel(stream, [teeState.reason1, teeState.reason2]).@then(
+ teeState.cancelPromiseCapability.@resolve,
+ teeState.cancelPromiseCapability.@reject);
+ }
+ return teeState.cancelPromiseCapability.@promise;
+ }
+}
+
+function isReadableStream(stream)
+{
+ "use strict";
+
+ // Spec tells to return true only if stream has a readableStreamController internal slot.
+ // However, since it is a private slot, it cannot be checked using hasOwnProperty().
+ // Therefore, readableStreamController is initialized with null value.
+ return @isObject(stream) && @getByIdDirectPrivate(stream, "readableStreamController") !== @undefined;
+}
+
+function isReadableStreamDefaultReader(reader)
+{
+ "use strict";
+
+ // Spec tells to return true only if reader has a readRequests internal slot.
+ // However, since it is a private slot, it cannot be checked using hasOwnProperty().
+ // Since readRequests is initialized with an empty array, the following test is ok.
+ return @isObject(reader) && !!@getByIdDirectPrivate(reader, "readRequests");
+}
+
+function isReadableStreamDefaultController(controller)
+{
+ "use strict";
+
+ // Spec tells to return true only if controller has an underlyingSource internal slot.
+ // However, since it is a private slot, it cannot be checked using hasOwnProperty().
+ // underlyingSource is obtained in ReadableStream constructor: if undefined, it is set
+ // to an empty object. Therefore, following test is ok.
+ return @isObject(controller) && !!@getByIdDirectPrivate(controller, "underlyingSource");
+}
+
+
+@globalPrivate
+function assignDirectStream() {
+ "use strict";
+
+ var stream = this;
+}
+
+
+function handleDirectStreamError(e) {
+ "use strict";
+
+ var controller = this;
+ var sink = controller.@sink;
+ if (sink) {
+ @putByIdDirectPrivate(controller, "sink", @undefined);
+ try {
+ sink.close(e);
+ } catch (f) {}
+ }
+
+ this.error = this.drain = this.write = this.close = this.end = @onReadableStreamDirectControllerClosed;
+
+ if (typeof this.@underlyingSource.close === 'function') {
+ try {
+ this.@underlyingSource.close.@call(this.@underlyingSource, e);
+ } catch (e) {
+ }
+ }
+
+ try {
+ var pend = controller._pendingRead;
+ if (pend) {
+ controller._pendingRead = @undefined;
+ @rejectPromise(pend, e);
+ }
+ } catch (f) {}
+ var stream = controller.@controlledReadableStream;
+ if (stream) @readableStreamError(stream, e);
+}
+
+function handleDirectStreamErrorReject(e) {
+ @handleDirectStreamError.@call(this, e);
+ return @Promise.@reject(e);
+}
+
+function onPullDirectStream(controller)
+{
+
+ "use strict";
+
+ var stream = controller.@controlledReadableStream;
+ if (!stream || @getByIdDirectPrivate(stream, "state") !== @streamReadable)
+ return;
+
+ // pull is in progress
+ // this is a recursive call
+ // ignore it
+ if (controller._deferClose === -1) {
+ return;
+ }
+
+
+ controller._deferClose = -1;
+ controller._deferDrain = -1;
+ var deferClose;
+ var deferDrain;
+
+ // Direct streams allow @pull to be called multiple times, unlike the spec.
+ // Backpressure is handled by the destination, not by the underlying source.
+ // In this case, we rely on the heuristic that repeatedly draining in the same tick
+ // is bad for performance
+ // this code is only run when consuming a direct stream from JS
+ // without the HTTP server or anything else
+ try {
+ var result = controller.@underlyingSource.pull(
+ controller,
+ );
+
+ if (result && @isPromise(result)) {
+ if (controller._handleError === @undefined) {
+ controller._handleError = @handleDirectStreamErrorReject.bind(controller);
+ }
+
+ @Promise.prototype.catch.@call(result, controller._handleError);
+ }
+ } catch(e) {
+ return @handleDirectStreamErrorReject.@call(controller, e);
+ } finally {
+ deferClose = controller._deferClose;
+ deferDrain = controller._deferDrain;
+ controller._deferDrain = controller._deferClose = 0;
+ }
+
+
+ var promiseToReturn;
+
+
+ if (controller._pendingRead === @undefined) {
+ controller._pendingRead = promiseToReturn = @newPromise();
+ } else {
+ promiseToReturn = @readableStreamAddReadRequest(stream);
+ }
+
+
+ // they called close during @pull()
+ // we delay that
+ if (deferClose === 1) {
+ var reason = controller._deferCloseReason;
+ controller._deferCloseReason = @undefined;
+ @onCloseDirectStream.@call(controller, reason);
+ return promiseToReturn;
+ }
+
+ // not done, but they called drain()
+ if (deferDrain === 1) {
+ @onDrainDirectStream.@call(controller);
+ }
+
+
+ return promiseToReturn;
+}
+
+function noopDoneFunction() {
+ return @Promise.@resolve({value: @undefined, done: true});
+}
+
+function onReadableStreamDirectControllerClosed(reason)
+{
+ "use strict";
+ @throwTypeError("ReadableStreamDirectController is now closed");
+}
+
+function onCloseDirectStream(reason)
+{
+ "use strict";
+ var stream = this.@controlledReadableStream;
+ if (!stream || @getByIdDirectPrivate(stream, "state") !== @streamReadable)
+ return;
+
+ if (this._deferClose !== 0) {
+ this._deferClose = 1;
+ this._deferCloseReason = reason;
+ return;
+ }
+
+ @putByIdDirectPrivate(stream, "state", @streamClosing);
+ if (typeof this.@underlyingSource.close === 'function') {
+ try {
+ this.@underlyingSource.close.@call(this.@underlyingSource, reason);
+ } catch (e) {
+
+ }
+ }
+
+ var drained;
+ try {
+ drained = this.@sink.end();
+ @putByIdDirectPrivate(this, "sink", @undefined);
+ } catch (e) {
+ if (this._pendingRead) {
+ var read = this._pendingRead;
+ this._pendingRead = @undefined;
+ @rejectPromise(read, e);
+ }
+ @readableStreamError(stream, e);
+ return;
+ }
+
+ this.error = this.drain = this.write = this.close = this.end = @onReadableStreamDirectControllerClosed;
+
+ var reader = @getByIdDirectPrivate(stream, "reader");
+
+ if (reader && @isReadableStreamDefaultReader(reader)) {
+ var _pendingRead = this._pendingRead;
+ if (_pendingRead && @isPromise(_pendingRead) && drained?.byteLength) {
+ this._pendingRead = @undefined;
+ @fulfillPromise(_pendingRead, {value: drained, done: false});
+ @readableStreamClose(stream);
+ return;
+ }
+ }
+
+ if (drained?.byteLength) {
+ var requests = @getByIdDirectPrivate(reader, "readRequests");
+ if (requests?.isNotEmpty()) {
+ @readableStreamFulfillReadRequest(stream, drained, false);
+ @readableStreamClose(stream);
+ return;
+ }
+
+ @putByIdDirectPrivate(stream, "state", @streamReadable);
+ this.@pull = () => {
+ var thisResult = @createFulfilledPromise({value: drained, done: false});
+ drained = @undefined;
+ @readableStreamClose(stream);
+ stream = @undefined;
+ return thisResult;
+ };
+ } else if (this._pendingRead) {
+ var read = this._pendingRead;
+ this._pendingRead = @undefined;
+ @putByIdDirectPrivate(this, "pull", @noopDoneFunction);
+ @fulfillPromise(read, {value: @undefined, done: true});
+ }
+
+ @readableStreamClose(stream);
+}
+
+function onDrainDirectStream()
+{
+ "use strict";
+
+ var stream = this.@controlledReadableStream;
+ var reader = @getByIdDirectPrivate(stream, "reader");
+ if (!reader || !@isReadableStreamDefaultReader(reader)) {
+ return;
+ }
+
+ var _pendingRead = this._pendingRead;
+ this._pendingRead = @undefined;
+ if (_pendingRead && @isPromise(_pendingRead)) {
+ var drained = this.@sink.drain();
+ if (drained?.byteLength) {
+ this._pendingRead = @getByIdDirectPrivate(stream, "readRequests")?.shift();
+ @fulfillPromise(_pendingRead, {value: drained, done: false});
+ } else {
+ this._pendingRead = _pendingRead;
+ }
+ } else if (@getByIdDirectPrivate(stream, "readRequests")?.isNotEmpty()) {
+ var drained = this.@sink.drain();
+ if (drained?.byteLength) {
+ @readableStreamFulfillReadRequest(stream, drained, false);
+ }
+ } else if (this._deferDrain === -1) {
+ this._deferDrain = 1;
+ }
+
+}
+
+function initializeArrayBufferStream(underlyingSource, highWaterMark)
+{
+ "use strict";
+
+ // This is the fallback implementation for direct streams
+ // When we don't know what the destination type is
+ // We assume it is a Uint8Array.
+
+ var opts = highWaterMark ? {highWaterMark, stream: true, asUint8Array: true} : {stream: true, asUint8Array: true};
+ var sink = new globalThis.Bun.ArrayBufferSink();
+ sink.start(opts);
+
+ var controller = {
+ @underlyingSource: underlyingSource,
+ @pull: @onPullDirectStream,
+ @controlledReadableStream: this,
+ @sink: sink,
+ close: @onCloseDirectStream,
+ write: sink.write.bind(sink),
+ error: @handleDirectStreamError,
+ end: @onCloseDirectStream,
+ @close: @onCloseDirectStream,
+ drain: @onDrainDirectStream,
+ _pendingRead: @undefined,
+ _deferClose: 0,
+ _deferDrain: 0,
+ _deferCloseReason: @undefined,
+ _handleError: @undefined,
+ };
+
+
+ @putByIdDirectPrivate(this, "readableStreamController", controller);
+
+}
+
+function readableStreamError(stream, error)
+{
+ "use strict";
+
+ @assert(@isReadableStream(stream));
+ @assert(@getByIdDirectPrivate(stream, "state") === @streamReadable);
+ @putByIdDirectPrivate(stream, "state", @streamErrored);
+ @putByIdDirectPrivate(stream, "storedError", error);
+
+ const reader = @getByIdDirectPrivate(stream, "reader");
+
+ if (!reader)
+ return;
+
+ if (@isReadableStreamDefaultReader(reader)) {
+ const requests = @getByIdDirectPrivate(reader, "readRequests");
+ @putByIdDirectPrivate(reader, "readRequests", @createFIFO());
+ for (var request = requests.shift(); request; request = requests.shift())
+ @rejectPromise(request, error);
+ } else {
+ @assert(@isReadableStreamBYOBReader(reader));
+ const requests = @getByIdDirectPrivate(reader, "readIntoRequests");
+ @putByIdDirectPrivate(reader, "readIntoRequests", @createFIFO());
+ for (var request = requests.shift(); request; request = requests.shift())
+ @rejectPromise(request, error);
+ }
+
+ @getByIdDirectPrivate(reader, "closedPromiseCapability").@reject.@call(@undefined, error);
+ const promise = @getByIdDirectPrivate(reader, "closedPromiseCapability").@promise;
+ @markPromiseAsHandled(promise);
+}
+
+function readableStreamDefaultControllerShouldCallPull(controller)
+{
+ "use strict";
+
+ const stream = @getByIdDirectPrivate(controller, "controlledReadableStream");
+
+ if (!@readableStreamDefaultControllerCanCloseOrEnqueue(controller))
+ return false;
+ if (!(@getByIdDirectPrivate(controller, "started") === 1))
+ return false;
+ if ((!@isReadableStreamLocked(stream) || !@getByIdDirectPrivate(@getByIdDirectPrivate(stream, "reader"), "readRequests")?.isNotEmpty()) && @readableStreamDefaultControllerGetDesiredSize(controller) <= 0)
+ return false;
+ const desiredSize = @readableStreamDefaultControllerGetDesiredSize(controller);
+ @assert(desiredSize !== null);
+ return desiredSize > 0;
+}
+
+function readableStreamDefaultControllerCallPullIfNeeded(controller)
+{
+ "use strict";
+
+ // FIXME: use @readableStreamDefaultControllerShouldCallPull
+ const stream = @getByIdDirectPrivate(controller, "controlledReadableStream");
+
+ if (!@readableStreamDefaultControllerCanCloseOrEnqueue(controller))
+ return;
+ if (!(@getByIdDirectPrivate(controller, "started") === 1))
+ return;
+ if ((!@isReadableStreamLocked(stream) || !@getByIdDirectPrivate(@getByIdDirectPrivate(stream, "reader"), "readRequests")?.isNotEmpty()) && @readableStreamDefaultControllerGetDesiredSize(controller) <= 0)
+ return;
+
+ if (@getByIdDirectPrivate(controller, "pulling")) {
+ @putByIdDirectPrivate(controller, "pullAgain", true);
+ return;
+ }
+
+
+ @assert(!@getByIdDirectPrivate(controller, "pullAgain"));
+ @putByIdDirectPrivate(controller, "pulling", true);
+
+ @getByIdDirectPrivate(controller, "pullAlgorithm").@call(@undefined).@then(function() {
+ @putByIdDirectPrivate(controller, "pulling", false);
+ if (@getByIdDirectPrivate(controller, "pullAgain")) {
+ @putByIdDirectPrivate(controller, "pullAgain", false);
+
+ @readableStreamDefaultControllerCallPullIfNeeded(controller);
+ }
+ }, function(error) {
+ @readableStreamDefaultControllerError(controller, error);
+ });
+}
+
+function isReadableStreamLocked(stream)
+{
+ "use strict";
+
+ @assert(@isReadableStream(stream));
+ return !!@getByIdDirectPrivate(stream, "reader");
+}
+
+function readableStreamDefaultControllerGetDesiredSize(controller)
+{
+ "use strict";
+
+ const stream = @getByIdDirectPrivate(controller, "controlledReadableStream");
+ const state = @getByIdDirectPrivate(stream, "state");
+
+ if (state === @streamErrored)
+ return null;
+ if (state === @streamClosed)
+ return 0;
+
+ return @getByIdDirectPrivate(controller, "strategy").highWaterMark - @getByIdDirectPrivate(controller, "queue").size;
+}
+
+
+function readableStreamReaderGenericCancel(reader, reason)
+{
+ "use strict";
+
+ const stream = @getByIdDirectPrivate(reader, "ownerReadableStream");
+ @assert(!!stream);
+ return @readableStreamCancel(stream, reason);
+}
+
+function readableStreamCancel(stream, reason)
+{
+ "use strict";
+
+ @putByIdDirectPrivate(stream, "disturbed", true);
+ const state = @getByIdDirectPrivate(stream, "state");
+ if (state === @streamClosed)
+ return @Promise.@resolve();
+ if (state === @streamErrored)
+ return @Promise.@reject(@getByIdDirectPrivate(stream, "storedError"));
+ @readableStreamClose(stream);
+
+ var controller = @getByIdDirectPrivate(stream, "readableStreamController");
+ return controller.@cancel(controller, reason).@then(function() { });
+}
+
+function readableStreamDefaultControllerCancel(controller, reason)
+{
+ "use strict";
+
+ @putByIdDirectPrivate(controller, "queue", @newQueue());
+ return @getByIdDirectPrivate(controller, "cancelAlgorithm").@call(@undefined, reason);
+}
+
+function readableStreamDefaultControllerPull(controller)
+{
+ "use strict";
+
+ var queue = @getByIdDirectPrivate(controller, "queue");
+ if (queue.content.isNotEmpty()) {
+ const chunk = @dequeueValue(queue);
+ if (@getByIdDirectPrivate(controller, "closeRequested") && queue.content.isEmpty())
+ @readableStreamClose(@getByIdDirectPrivate(controller, "controlledReadableStream"));
+ else
+ @readableStreamDefaultControllerCallPullIfNeeded(controller);
+
+ return @createFulfilledPromise({ value: chunk, done: false });
+ }
+ const pendingPromise = @readableStreamAddReadRequest(@getByIdDirectPrivate(controller, "controlledReadableStream"));
+ @readableStreamDefaultControllerCallPullIfNeeded(controller);
+ return pendingPromise;
+}
+
+function readableStreamDefaultControllerClose(controller)
+{
+ "use strict";
+
+ @assert(@readableStreamDefaultControllerCanCloseOrEnqueue(controller));
+ @putByIdDirectPrivate(controller, "closeRequested", true);
+ if (@getByIdDirectPrivate(controller, "queue")?.content?.isEmpty())
+ @readableStreamClose(@getByIdDirectPrivate(controller, "controlledReadableStream"));
+}
+
+function readableStreamClose(stream)
+{
+ "use strict";
+
+ @assert(@getByIdDirectPrivate(stream, "state") === @streamReadable);
+ @putByIdDirectPrivate(stream, "state", @streamClosed);
+ if (!@getByIdDirectPrivate(stream, "reader"))
+ return;
+
+ if (@isReadableStreamDefaultReader(@getByIdDirectPrivate(stream, "reader"))) {
+ const requests = @getByIdDirectPrivate(@getByIdDirectPrivate(stream, "reader"), "readRequests");
+ if (requests.isNotEmpty()) {
+ @putByIdDirectPrivate(@getByIdDirectPrivate(stream, "reader"), "readRequests", @createFIFO());
+
+ for (var request = requests.shift(); request; request = requests.shift())
+ @fulfillPromise(request, { value: @undefined, done: true });
+ }
+ }
+
+ @getByIdDirectPrivate(@getByIdDirectPrivate(stream, "reader"), "closedPromiseCapability").@resolve.@call();
+}
+
+function readableStreamFulfillReadRequest(stream, chunk, done)
+{
+ "use strict";
+ const readRequest = @getByIdDirectPrivate(@getByIdDirectPrivate(stream, "reader"), "readRequests").shift();
+ @fulfillPromise(readRequest, { value: chunk, done: done });
+}
+
+function readableStreamDefaultControllerEnqueue(controller, chunk)
+{
+ "use strict";
+
+ const stream = @getByIdDirectPrivate(controller, "controlledReadableStream");
+ // this is checked by callers
+ @assert(@readableStreamDefaultControllerCanCloseOrEnqueue(controller));
+
+ if (@isReadableStreamLocked(stream) && @getByIdDirectPrivate(@getByIdDirectPrivate(stream, "reader"), "readRequests")?.isNotEmpty()) {
+ @readableStreamFulfillReadRequest(stream, chunk, false);
+ @readableStreamDefaultControllerCallPullIfNeeded(controller);
+ return;
+ }
+
+ try {
+ let chunkSize = 1;
+ if (@getByIdDirectPrivate(controller, "strategy").size !== @undefined)
+ chunkSize = @getByIdDirectPrivate(controller, "strategy").size(chunk);
+ @enqueueValueWithSize(@getByIdDirectPrivate(controller, "queue"), chunk, chunkSize);
+ }
+ catch(error) {
+ @readableStreamDefaultControllerError(controller, error);
+ throw error;
+ }
+ @readableStreamDefaultControllerCallPullIfNeeded(controller);
+}
+
+function readableStreamDefaultReaderRead(reader)
+{
+ "use strict";
+
+ const stream = @getByIdDirectPrivate(reader, "ownerReadableStream");
+ @assert(!!stream);
+ const state = @getByIdDirectPrivate(stream, "state");
+
+ @putByIdDirectPrivate(stream, "disturbed", true);
+ if (state === @streamClosed)
+ return @createFulfilledPromise({ value: @undefined, done: true });
+ if (state === @streamErrored)
+ return @Promise.@reject(@getByIdDirectPrivate(stream, "storedError"));
+ @assert(state === @streamReadable);
+
+ return @getByIdDirectPrivate(stream, "readableStreamController").@pull(@getByIdDirectPrivate(stream, "readableStreamController"));
+}
+
+function readableStreamAddReadRequest(stream)
+{
+ "use strict";
+
+ @assert(@isReadableStreamDefaultReader(@getByIdDirectPrivate(stream, "reader")));
+ @assert(@getByIdDirectPrivate(stream, "state") == @streamReadable);
+
+ const readRequest = @newPromise();
+
+ @getByIdDirectPrivate(@getByIdDirectPrivate(stream, "reader"), "readRequests").push(readRequest);
+
+ return readRequest;
+}
+
+function isReadableStreamDisturbed(stream)
+{
+ "use strict";
+
+ @assert(@isReadableStream(stream));
+ return @getByIdDirectPrivate(stream, "disturbed");
+}
+
+function readableStreamReaderGenericRelease(reader)
+{
+ "use strict";
+
+ @assert(!!@getByIdDirectPrivate(reader, "ownerReadableStream"));
+ @assert(@getByIdDirectPrivate(@getByIdDirectPrivate(reader, "ownerReadableStream"), "reader") === reader);
+
+ if (@getByIdDirectPrivate(@getByIdDirectPrivate(reader, "ownerReadableStream"), "state") === @streamReadable)
+ @getByIdDirectPrivate(reader, "closedPromiseCapability").@reject.@call(@undefined, @makeTypeError("releasing lock of reader whose stream is still in readable state"));
+ else
+ @putByIdDirectPrivate(reader, "closedPromiseCapability", { @promise: @newHandledRejectedPromise(@makeTypeError("reader released lock")) });
+
+ const promise = @getByIdDirectPrivate(reader, "closedPromiseCapability").@promise;
+ @markPromiseAsHandled(promise);
+ @putByIdDirectPrivate(@getByIdDirectPrivate(reader, "ownerReadableStream"), "reader", @undefined);
+ @putByIdDirectPrivate(reader, "ownerReadableStream", @undefined);
+}
+
+function readableStreamDefaultControllerCanCloseOrEnqueue(controller)
+{
+ "use strict";
+
+ return !@getByIdDirectPrivate(controller, "closeRequested") && @getByIdDirectPrivate(@getByIdDirectPrivate(controller, "controlledReadableStream"), "state") === @streamReadable;
+}
+
+
+function lazyLoadStream(stream, autoAllocateChunkSize) {
+ "use strict";
+
+ var nativeType = @getByIdDirectPrivate(stream, "bunNativeType");
+ var nativePtr = @getByIdDirectPrivate(stream, "bunNativePtr");
+ var cached = @lazyStreamPrototypeMap;
+ var Prototype = cached.@get(nativeType);
+ if (Prototype === @undefined) {
+ var [pull, start, cancel, setClose, deinit] = @lazyLoad(nativeType);
+ var closer = [false];
+ var handleResult;
+ function handleNativeReadableStreamPromiseResult(val) {
+ "use strict";
+ var {c, v} = this;
+ this.c = @undefined;
+ this.v = @undefined;
+ handleResult(val, c, v);
+ }
+
+ handleResult = function handleResult(result, controller, view) {
+ "use strict";
+
+ if (result && @isPromise(result)) {
+ return result.then(handleNativeReadableStreamPromiseResult.bind({c: controller, v: view}), (err) => controller.error(err));
+ } else if (result !== false) {
+ if (view && view.byteLength === result) {
+ controller.byobRequest.respondWithNewView(view);
+ } else {
+ controller.byobRequest.respond(result);
+ }
+ }
+
+ if (closer[0] || result === false) {
+ @enqueueJob(() => controller.close());
+ closer[0] = false;
+ }
+ };
+
+ Prototype = class NativeReadableStreamSource {
+ constructor(tag, autoAllocateChunkSize) {
+ this.pull = this.pull_.bind(tag);
+ this.cancel = this.cancel_.bind(tag);
+ this.autoAllocateChunkSize = autoAllocateChunkSize;
+ }
+
+ pull;
+ cancel;
+
+ type = "bytes";
+ autoAllocateChunkSize = 0;
+
+ static startSync = start;
+
+ pull_(controller) {
+ closer[0] = false;
+ var result;
+
+ const view = controller.byobRequest.view;
+ try {
+ result = pull(this, view, closer);
+ } catch(err) {
+ return controller.error(err);
+ }
+
+ return handleResult(result, controller, view);
+ }
+
+ cancel_(reason) {
+ cancel(this, reason);
+ }
+ static deinit = deinit;
+ static registry = new FinalizationRegistry(deinit);
+ }
+ cached.@set(nativeType, Prototype);
+ }
+
+ const chunkSize = Prototype.startSync(nativePtr, autoAllocateChunkSize);
+
+ // empty file, no need for native back-and-forth on this
+ if (chunkSize === 0) {
+ @readableStreamClose(stream);
+ return null;
+ }
+ var instance = new Prototype(nativePtr, chunkSize);
+ Prototype.registry.register(instance, nativePtr);
+ return instance;
+} \ No newline at end of file