diff options
author | 2022-07-01 03:24:16 -0700 | |
---|---|---|
committer | 2022-07-01 03:24:16 -0700 | |
commit | 4812fb8faf7e89dd8cb585f687bd887dc552efe2 (patch) | |
tree | a6b0a88ec427f68f4989bedb4bed87ca81daa6c2 /src | |
parent | e105cfcca8ec86464c33501861d506ec529882b5 (diff) | |
download | bun-4812fb8faf7e89dd8cb585f687bd887dc552efe2.tar.gz bun-4812fb8faf7e89dd8cb585f687bd887dc552efe2.tar.zst bun-4812fb8faf7e89dd8cb585f687bd887dc552efe2.zip |
Fix failing tests for ReadableStream -> {text, arrayBuffer, blob}
Diffstat (limited to 'src')
-rw-r--r-- | src/bun.js/builtins/README.md | 11 | ||||
-rw-r--r-- | src/bun.js/builtins/WebCoreJSBuiltins.cpp | 2 | ||||
-rw-r--r-- | src/bun.js/builtins/js/ReadableStream.js | 116 | ||||
-rw-r--r-- | src/bun.js/builtins/js/ReadableStreamInternals.js | 3121 | ||||
-rw-r--r-- | src/bun.js/webcore/encoding.zig | 1 | ||||
-rw-r--r-- | src/bun.js/webcore/streams.zig | 12 |
6 files changed, 1841 insertions, 1422 deletions
diff --git a/src/bun.js/builtins/README.md b/src/bun.js/builtins/README.md index 8ec32e8b5..cd90fe44c 100644 --- a/src/bun.js/builtins/README.md +++ b/src/bun.js/builtins/README.md @@ -1,5 +1,16 @@ # JavaScript Builtins +TLDR: + +```bash +# Delete the built files +make clean-bindings generate-bindings && \ + # Compile all the C++ files which live in ../bindings + make jsc-bindings-mac -j10 && \ + # Re-link the binary without compiling zig (so it's faster) + make bun-link-lld-debug +``` + JavaScript files in [./js](./js) use JavaScriptCore's builtins syntax ```js diff --git a/src/bun.js/builtins/WebCoreJSBuiltins.cpp b/src/bun.js/builtins/WebCoreJSBuiltins.cpp index 00cde9f54..dd2a8cd5b 100644 --- a/src/bun.js/builtins/WebCoreJSBuiltins.cpp +++ b/src/bun.js/builtins/WebCoreJSBuiltins.cpp @@ -35,9 +35,9 @@ #include "ByteLengthQueuingStrategyBuiltins.cpp" #include "CountQueuingStrategyBuiltins.cpp" +#include "ImportMetaObjectBuiltins.cpp" #include "JSBufferConstructorBuiltins.cpp" #include "JSBufferPrototypeBuiltins.cpp" -#include "JSZigGlobalObjectBuiltins.cpp" #include "ReadableByteStreamControllerBuiltins.cpp" #include "ReadableByteStreamInternalsBuiltins.cpp" #include "ReadableStreamBYOBReaderBuiltins.cpp" diff --git a/src/bun.js/builtins/js/ReadableStream.js b/src/bun.js/builtins/js/ReadableStream.js index 95f379be5..fbd8148ba 100644 --- a/src/bun.js/builtins/js/ReadableStream.js +++ b/src/bun.js/builtins/js/ReadableStream.js @@ -104,129 +104,51 @@ function readableStreamToArray(stream) { // this is a direct stream var underlyingSource = @getByIdDirectPrivate(stream, "underlyingSource"); - if (underlyingSource !== @undefined) { - const promise = @initializeArrayStream.@call(stream, underlyingSource, @undefined); - var reader = stream.getReader(); - return (async function() { - while (@getByIdDirectPrivate(stream, "state") === @streamReadable) { - var thisResult = await reader.read(); - if (thisResult.done) { - break; - } - } - - try { - reader.releaseLock(); - } catch(e) { - } - - return await promise; - })(); + if (underlyingSource !== @undefined) { + return @readableStreamToArrayDirect(stream, underlyingSource); } - var reader = stream.getReader(); - var manyResult = reader.readMany(); - - async function processManyResult(result) { - - if (result.done) { - return []; - } - - var chunks = result.value || []; - - while (true) { - var thisResult = await reader.read(); - if (thisResult.done) { - break; - } - chunks = chunks.concat(thisResult.value); - } + return @readableStreamIntoArray(stream); +} - return chunks; - } +@globalPrivate +function readableStreamToText(stream) { + "use strict"; - if (manyResult && @isPromise(manyResult)) { - return manyResult.@then(processManyResult); + // this is a direct stream + var underlyingSource = @getByIdDirectPrivate(stream, "underlyingSource"); + if (underlyingSource !== @undefined) { + return @readableStreamToTextDirect(stream, underlyingSource); } - return processManyResult(manyResult); + return @readableStreamIntoText(stream); } @globalPrivate -function readableStreamToText(stream) { +function readableStreamToArrayBuffer(stream) { "use strict"; // this is a direct stream var underlyingSource = @getByIdDirectPrivate(stream, "underlyingSource"); - if (underlyingSource !== @undefined) { - - const promise = @initializeTextStream.@call(stream, underlyingSource, @undefined); - var reader = stream.getReader(); - return (async function() { - while (@getByIdDirectPrivate(stream, "state") === @streamReadable) { - var thisResult = await reader.read(); - if (thisResult.done) { - break; - } - } - try { - reader.releaseLock(); - } catch(e) { - } - - return await promise; - })(); - } - - // TODO: optimize this to skip the extra ArrayBuffer - var toArrayBuffer = globalThis.Bun.readableStreamToArrayBuffer(stream); - if (toArrayBuffer && @isPromise(toArrayBuffer)) { - return toArrayBuffer.@then(function(arrayBuffer) { - return new globalThis.TextDecoder().decode(arrayBuffer); - }); + if (underlyingSource !== @undefined) { + return @readableStreamToArrayBufferDirect(stream, underlyingSource); } - return new globalThis.TextDecoder().decode(arrayBuffer); + return globalThis.Bun.readableStreamToArray(stream).@then(globalThis.Bun.concatArrayBuffers); } @globalPrivate function readableStreamToJSON(stream) { "use strict"; - return @readableStreamToText(stream).@then(globalThis.JSON.parse); + return globalThis.Bun.readableStreamToText(stream).@then(globalThis.JSON.parse); } @globalPrivate function readableStreamToBlob(stream) { "use strict"; - - var underlyingSource = @getByIdDirectPrivate(stream, "underlyingSource"); - if (underlyingSource != @undefined) { - var toArrayBuffer = globalThis.Bun.readableStreamToArrayBuffer(stream); - if (toArrayBuffer && @isPromise(toArrayBuffer)) { - return toArrayBuffer.@then(function(arrayBuffer) { - return new globalThis.Blob([arrayBuffer]); - }); - } - - return new globalThis.Blob([toArrayBuffer]); - } - - - const array = @readableStreamToArray(stream); - if (array === null) { - return new globalThis.Blob(); - } - - return array.@then(function(chunks) { - if (chunks === null || chunks.length === 0) { - return new globalThis.Blob(); - } - - return new globalThis.Blob(chunks); - }); + return @Promise.resolve(globalThis.Bun.readableStreamToArray(stream)).@then(array => new Blob(array)); } @globalPrivate @@ -459,7 +381,7 @@ function pipeTo(destination) // FIXME: https://bugs.webkit.org/show_bug.cgi?id=159869. // Built-in generator should be able to parse function signature to compute the function length correctly. - let options = arguments[1]; + let options = @argument(1); let preventClose = false; let preventAbort = false; diff --git a/src/bun.js/builtins/js/ReadableStreamInternals.js b/src/bun.js/builtins/js/ReadableStreamInternals.js index ce8a85445..c959f24fa 100644 --- a/src/bun.js/builtins/js/ReadableStreamInternals.js +++ b/src/bun.js/builtins/js/ReadableStreamInternals.js @@ -26,1559 +26,2036 @@ // @internal -function readableStreamReaderGenericInitialize(reader, stream) -{ - "use strict"; - - @putByIdDirectPrivate(reader, "ownerReadableStream", stream); - @putByIdDirectPrivate(stream, "reader", reader); - if (@getByIdDirectPrivate(stream, "state") === @streamReadable) - @putByIdDirectPrivate(reader, "closedPromiseCapability", @newPromiseCapability(@Promise)); - else if (@getByIdDirectPrivate(stream, "state") === @streamClosed) - @putByIdDirectPrivate(reader, "closedPromiseCapability", { @promise: @Promise.@resolve() }); - else { - @assert(@getByIdDirectPrivate(stream, "state") === @streamErrored); - @putByIdDirectPrivate(reader, "closedPromiseCapability", { @promise: @newHandledRejectedPromise(@getByIdDirectPrivate(stream, "storedError")) }); - } +function readableStreamReaderGenericInitialize(reader, stream) { + "use strict"; + + @putByIdDirectPrivate(reader, "ownerReadableStream", stream); + @putByIdDirectPrivate(stream, "reader", reader); + if (@getByIdDirectPrivate(stream, "state") === @streamReadable) + @putByIdDirectPrivate( + reader, + "closedPromiseCapability", + @newPromiseCapability(@Promise) + ); + else if (@getByIdDirectPrivate(stream, "state") === @streamClosed) + @putByIdDirectPrivate(reader, "closedPromiseCapability", { + @promise: @Promise.@resolve(), + }); + else { + @assert(@getByIdDirectPrivate(stream, "state") === @streamErrored); + @putByIdDirectPrivate(reader, "closedPromiseCapability", { + @promise: @newHandledRejectedPromise( + @getByIdDirectPrivate(stream, "storedError") + ), + }); + } } -function privateInitializeReadableStreamDefaultController(stream, underlyingSource, size, highWaterMark) -{ - "use strict"; - - if (!@isReadableStream(stream)) - @throwTypeError("ReadableStreamDefaultController needs a ReadableStream"); - - // readableStreamController is initialized with null value. - if (@getByIdDirectPrivate(stream, "readableStreamController") !== null) - @throwTypeError("ReadableStream already has a controller"); - - - - @putByIdDirectPrivate(this, "controlledReadableStream", stream); - @putByIdDirectPrivate(this, "underlyingSource", underlyingSource); - @putByIdDirectPrivate(this, "queue", @newQueue()); - @putByIdDirectPrivate(this, "started", -1); - @putByIdDirectPrivate(this, "closeRequested", false); - @putByIdDirectPrivate(this, "pullAgain", false); - @putByIdDirectPrivate(this, "pulling", false); - @putByIdDirectPrivate(this, "strategy", @validateAndNormalizeQueuingStrategy(size, highWaterMark)); - - return this; +function privateInitializeReadableStreamDefaultController( + stream, + underlyingSource, + size, + highWaterMark +) { + "use strict"; + + if (!@isReadableStream(stream)) + @throwTypeError("ReadableStreamDefaultController needs a ReadableStream"); + + // readableStreamController is initialized with null value. + if (@getByIdDirectPrivate(stream, "readableStreamController") !== null) + @throwTypeError("ReadableStream already has a controller"); + + @putByIdDirectPrivate(this, "controlledReadableStream", stream); + @putByIdDirectPrivate(this, "underlyingSource", underlyingSource); + @putByIdDirectPrivate(this, "queue", @newQueue()); + @putByIdDirectPrivate(this, "started", -1); + @putByIdDirectPrivate(this, "closeRequested", false); + @putByIdDirectPrivate(this, "pullAgain", false); + @putByIdDirectPrivate(this, "pulling", false); + @putByIdDirectPrivate( + this, + "strategy", + @validateAndNormalizeQueuingStrategy(size, highWaterMark) + ); + + return this; } -function readableStreamDefaultControllerError(controller, error) -{ - "use strict"; +function readableStreamDefaultControllerError(controller, error) { + "use strict"; - const stream = @getByIdDirectPrivate(controller, "controlledReadableStream"); - if (@getByIdDirectPrivate(stream, "state") !== @streamReadable) - return; - @putByIdDirectPrivate(controller, "queue", @newQueue()); - - @readableStreamError(stream, error); -} + const stream = @getByIdDirectPrivate(controller, "controlledReadableStream"); + if (@getByIdDirectPrivate(stream, "state") !== @streamReadable) return; + @putByIdDirectPrivate(controller, "queue", @newQueue()); -function readableStreamPipeTo(stream, sink) -{ - "use strict"; - @assert(@isReadableStream(stream)); + @readableStreamError(stream, error); +} - const reader = new @ReadableStreamDefaultReader(stream); +function readableStreamPipeTo(stream, sink) { + "use strict"; + @assert(@isReadableStream(stream)); - @getByIdDirectPrivate(reader, "closedPromiseCapability").@promise.@then(() => { }, (e) => { sink.error(e); }); + const reader = new @ReadableStreamDefaultReader(stream); - function doPipe() { - @readableStreamDefaultReaderRead(reader).@then(function(result) { - if (result.done) { - sink.close(); - return; - } - try { - sink.enqueue(result.value); - } catch (e) { - sink.error("ReadableStream chunk enqueueing in the sink failed"); - return; - } - doPipe(); - }, function(e) { - sink.error(e); - }); + @getByIdDirectPrivate(reader, "closedPromiseCapability").@promise.@then( + () => {}, + (e) => { + sink.error(e); } - doPipe(); + ); + + function doPipe() { + @readableStreamDefaultReaderRead(reader).@then( + function (result) { + if (result.done) { + sink.close(); + return; + } + try { + sink.enqueue(result.value); + } catch (e) { + sink.error("ReadableStream chunk enqueueing in the sink failed"); + return; + } + doPipe(); + }, + function (e) { + sink.error(e); + } + ); + } + doPipe(); } +function acquireReadableStreamDefaultReader(stream) { + "use strict"; + var start = @getByIdDirectPrivate(stream, "start"); + if (start) { + start.@call(stream); + } - -function acquireReadableStreamDefaultReader(stream) -{ - "use strict"; - var start = @getByIdDirectPrivate(stream, "start"); - if (start) { - start.@call(stream); - } - - return new @ReadableStreamDefaultReader(stream); + return new @ReadableStreamDefaultReader(stream); } // https://streams.spec.whatwg.org/#set-up-readable-stream-default-controller, starting from step 6. // The other part is implemented in privateInitializeReadableStreamDefaultController. -function setupReadableStreamDefaultController(stream, underlyingSource, size, highWaterMark, startMethod, pullMethod, cancelMethod) -{ - "use strict"; - - const controller = new @ReadableStreamDefaultController(stream, underlyingSource, size, highWaterMark, @isReadableStream); - - const pullAlgorithm = () => @promiseInvokeOrNoopMethod(underlyingSource, pullMethod, [controller]); - const cancelAlgorithm = (reason) => @promiseInvokeOrNoopMethod(underlyingSource, cancelMethod, [reason]); - - @putByIdDirectPrivate(controller, "pullAlgorithm", pullAlgorithm); - @putByIdDirectPrivate(controller, "cancelAlgorithm", cancelAlgorithm); - @putByIdDirectPrivate(controller, "pull", @readableStreamDefaultControllerPull); - @putByIdDirectPrivate(controller, "cancel", @readableStreamDefaultControllerCancel); - @putByIdDirectPrivate(stream, "readableStreamController", controller); - - @readableStreamDefaultControllerStart(controller); - +function setupReadableStreamDefaultController( + stream, + underlyingSource, + size, + highWaterMark, + startMethod, + pullMethod, + cancelMethod +) { + "use strict"; + + const controller = new @ReadableStreamDefaultController( + stream, + underlyingSource, + size, + highWaterMark, + @isReadableStream + ); + + const pullAlgorithm = () => + @promiseInvokeOrNoopMethod(underlyingSource, pullMethod, [controller]); + const cancelAlgorithm = (reason) => + @promiseInvokeOrNoopMethod(underlyingSource, cancelMethod, [reason]); + + @putByIdDirectPrivate(controller, "pullAlgorithm", pullAlgorithm); + @putByIdDirectPrivate(controller, "cancelAlgorithm", cancelAlgorithm); + @putByIdDirectPrivate( + controller, + "pull", + @readableStreamDefaultControllerPull + ); + @putByIdDirectPrivate( + controller, + "cancel", + @readableStreamDefaultControllerCancel + ); + @putByIdDirectPrivate(stream, "readableStreamController", controller); + + @readableStreamDefaultControllerStart(controller); } - function createReadableStreamController(stream, underlyingSource, strategy) { - "use strict"; - - const type = underlyingSource.type; - const typeString = @toString(type); - - if (typeString === "bytes") { - // if (!@readableByteStreamAPIEnabled()) - // @throwTypeError("ReadableByteStreamController is not implemented"); - - if (strategy.highWaterMark === @undefined) - strategy.highWaterMark = 0; - if (strategy.size !== @undefined) - @throwRangeError("Strategy for a ReadableByteStreamController cannot have a size"); - - @putByIdDirectPrivate(stream, "readableStreamController", new @ReadableByteStreamController(stream, underlyingSource, strategy.highWaterMark, @isReadableStream)); - } else if (typeString === "direct") { - var highWaterMark = strategy?.highWaterMark; - @initializeArrayBufferStream.@call(stream, underlyingSource, highWaterMark); - } else if (type === @undefined) { - if (strategy.highWaterMark === @undefined) - strategy.highWaterMark = 1; - - @setupReadableStreamDefaultController(stream, underlyingSource, strategy.size, strategy.highWaterMark, underlyingSource.start, underlyingSource.pull, underlyingSource.cancel); - } else - @throwRangeError("Invalid type for underlying source"); - + "use strict"; + + const type = underlyingSource.type; + const typeString = @toString(type); + + if (typeString === "bytes") { + // if (!@readableByteStreamAPIEnabled()) + // @throwTypeError("ReadableByteStreamController is not implemented"); + + if (strategy.highWaterMark === @undefined) strategy.highWaterMark = 0; + if (strategy.size !== @undefined) + @throwRangeError( + "Strategy for a ReadableByteStreamController cannot have a size" + ); + + @putByIdDirectPrivate( + stream, + "readableStreamController", + new @ReadableByteStreamController( + stream, + underlyingSource, + strategy.highWaterMark, + @isReadableStream + ) + ); + } else if (typeString === "direct") { + var highWaterMark = strategy?.highWaterMark; + @initializeArrayBufferStream.@call( + stream, + underlyingSource, + highWaterMark + ); + } else if (type === @undefined) { + if (strategy.highWaterMark === @undefined) strategy.highWaterMark = 1; + + @setupReadableStreamDefaultController( + stream, + underlyingSource, + strategy.size, + strategy.highWaterMark, + underlyingSource.start, + underlyingSource.pull, + underlyingSource.cancel + ); + } else @throwRangeError("Invalid type for underlying source"); } function readableStreamDefaultControllerStart(controller) { - "use strict"; - - - - if (@getByIdDirectPrivate(controller, "started") !== -1) - return; - - const underlyingSource = @getByIdDirectPrivate(controller, "underlyingSource"); - const startMethod = underlyingSource.start; - @putByIdDirectPrivate(controller, "started", 0); - - @promiseInvokeOrNoopMethodNoCatch(underlyingSource, startMethod, [controller]).@then(() => { - @putByIdDirectPrivate(controller, "started", 1); - @assert(!@getByIdDirectPrivate(controller, "pulling")); - @assert(!@getByIdDirectPrivate(controller, "pullAgain")); - @readableStreamDefaultControllerCallPullIfNeeded(controller); - }, (error) => { - @readableStreamDefaultControllerError(controller, error); - }); + "use strict"; + + if (@getByIdDirectPrivate(controller, "started") !== -1) return; + + const underlyingSource = @getByIdDirectPrivate( + controller, + "underlyingSource" + ); + const startMethod = underlyingSource.start; + @putByIdDirectPrivate(controller, "started", 0); + + @promiseInvokeOrNoopMethodNoCatch(underlyingSource, startMethod, [ + controller, + ]).@then( + () => { + @putByIdDirectPrivate(controller, "started", 1); + @assert(!@getByIdDirectPrivate(controller, "pulling")); + @assert(!@getByIdDirectPrivate(controller, "pullAgain")); + @readableStreamDefaultControllerCallPullIfNeeded(controller); + }, + (error) => { + @readableStreamDefaultControllerError(controller, error); + } + ); } - // FIXME: Replace readableStreamPipeTo by below function. // This method implements the latest https://streams.spec.whatwg.org/#readable-stream-pipe-to. -function readableStreamPipeToWritableStream(source, destination, preventClose, preventAbort, preventCancel, signal) -{ - "use strict"; - - @assert(@isReadableStream(source)); - @assert(@isWritableStream(destination)); - @assert(!@isReadableStreamLocked(source)); - @assert(!@isWritableStreamLocked(destination)); - @assert(signal === @undefined || @isAbortSignal(signal)); - - if (@getByIdDirectPrivate(source, "underlyingByteSource") !== @undefined) - return @Promise.@reject("Piping to a readable bytestream is not supported"); - - let pipeState = { source : source, destination : destination, preventAbort : preventAbort, preventCancel : preventCancel, preventClose : preventClose, signal : signal }; - - pipeState.reader = @acquireReadableStreamDefaultReader(source); - pipeState.writer = @acquireWritableStreamDefaultWriter(destination); - - @putByIdDirectPrivate(source, "disturbed", true); - - pipeState.finalized = false; - pipeState.shuttingDown = false; - pipeState.promiseCapability = @newPromiseCapability(@Promise); - pipeState.pendingReadPromiseCapability = @newPromiseCapability(@Promise); - pipeState.pendingReadPromiseCapability.@resolve.@call(); - pipeState.pendingWritePromise = @Promise.@resolve(); - - if (signal !== @undefined) { - const algorithm = () => { - if (pipeState.finalized) - return; - - const error = @makeDOMException("AbortError", "abort pipeTo from signal"); - - @pipeToShutdownWithAction(pipeState, () => { - const shouldAbortDestination = !pipeState.preventAbort && @getByIdDirectPrivate(pipeState.destination, "state") === "writable"; - const promiseDestination = shouldAbortDestination ? @writableStreamAbort(pipeState.destination, error) : @Promise.@resolve(); - - const shouldAbortSource = !pipeState.preventCancel && @getByIdDirectPrivate(pipeState.source, "state") === @streamReadable; - const promiseSource = shouldAbortSource ? @readableStreamCancel(pipeState.source, error) : @Promise.@resolve(); - - let promiseCapability = @newPromiseCapability(@Promise); - let shouldWait = true; - let handleResolvedPromise = () => { - if (shouldWait) { - shouldWait = false; - return; - } - promiseCapability.@resolve.@call(); - } - let handleRejectedPromise = (e) => { - promiseCapability.@reject.@call(@undefined, e); - } - promiseDestination.@then(handleResolvedPromise, handleRejectedPromise); - promiseSource.@then(handleResolvedPromise, handleRejectedPromise); - return promiseCapability.@promise; - }, error); - }; - if (@whenSignalAborted(signal, algorithm)) - return pipeState.promiseCapability.@promise; - } - - @pipeToErrorsMustBePropagatedForward(pipeState); - @pipeToErrorsMustBePropagatedBackward(pipeState); - @pipeToClosingMustBePropagatedForward(pipeState); - @pipeToClosingMustBePropagatedBackward(pipeState); - - @pipeToLoop(pipeState); +function readableStreamPipeToWritableStream( + source, + destination, + preventClose, + preventAbort, + preventCancel, + signal +) { + "use strict"; + + @assert(@isReadableStream(source)); + @assert(@isWritableStream(destination)); + @assert(!@isReadableStreamLocked(source)); + @assert(!@isWritableStreamLocked(destination)); + @assert(signal === @undefined || @isAbortSignal(signal)); + + if (@getByIdDirectPrivate(source, "underlyingByteSource") !== @undefined) + return @Promise.@reject( + "Piping to a readable bytestream is not supported" + ); + + let pipeState = { + source: source, + destination: destination, + preventAbort: preventAbort, + preventCancel: preventCancel, + preventClose: preventClose, + signal: signal, + }; + + pipeState.reader = @acquireReadableStreamDefaultReader(source); + pipeState.writer = @acquireWritableStreamDefaultWriter(destination); + + @putByIdDirectPrivate(source, "disturbed", true); + + pipeState.finalized = false; + pipeState.shuttingDown = false; + pipeState.promiseCapability = @newPromiseCapability(@Promise); + pipeState.pendingReadPromiseCapability = @newPromiseCapability(@Promise); + pipeState.pendingReadPromiseCapability.@resolve.@call(); + pipeState.pendingWritePromise = @Promise.@resolve(); + + if (signal !== @undefined) { + const algorithm = () => { + if (pipeState.finalized) return; + + const error = @makeDOMException( + "AbortError", + "abort pipeTo from signal" + ); + + @pipeToShutdownWithAction( + pipeState, + () => { + const shouldAbortDestination = + !pipeState.preventAbort && + @getByIdDirectPrivate(pipeState.destination, "state") === + "writable"; + const promiseDestination = shouldAbortDestination + ? @writableStreamAbort(pipeState.destination, error) + : @Promise.@resolve(); + + const shouldAbortSource = + !pipeState.preventCancel && + @getByIdDirectPrivate(pipeState.source, "state") === + @streamReadable; + const promiseSource = shouldAbortSource + ? @readableStreamCancel(pipeState.source, error) + : @Promise.@resolve(); + + let promiseCapability = @newPromiseCapability(@Promise); + let shouldWait = true; + let handleResolvedPromise = () => { + if (shouldWait) { + shouldWait = false; + return; + } + promiseCapability.@resolve.@call(); + }; + let handleRejectedPromise = (e) => { + promiseCapability.@reject.@call(@undefined, e); + }; + promiseDestination.@then( + handleResolvedPromise, + handleRejectedPromise + ); + promiseSource.@then(handleResolvedPromise, handleRejectedPromise); + return promiseCapability.@promise; + }, + error + ); + }; + if (@whenSignalAborted(signal, algorithm)) + return pipeState.promiseCapability.@promise; + } - return pipeState.promiseCapability.@promise; -} + @pipeToErrorsMustBePropagatedForward(pipeState); + @pipeToErrorsMustBePropagatedBackward(pipeState); + @pipeToClosingMustBePropagatedForward(pipeState); + @pipeToClosingMustBePropagatedBackward(pipeState); -function pipeToLoop(pipeState) -{ - "use strict"; - if (pipeState.shuttingDown) - return; + @pipeToLoop(pipeState); - @pipeToDoReadWrite(pipeState).@then((result) => { - if (result) - @pipeToLoop(pipeState); - }); + return pipeState.promiseCapability.@promise; } -function pipeToDoReadWrite(pipeState) -{ - "use strict"; - @assert(!pipeState.shuttingDown); - - pipeState.pendingReadPromiseCapability = @newPromiseCapability(@Promise); - @getByIdDirectPrivate(pipeState.writer, "readyPromise").@promise.@then(() => { - if (pipeState.shuttingDown) { - pipeState.pendingReadPromiseCapability.@resolve.@call(@undefined, false); - return; - } +function pipeToLoop(pipeState) { + "use strict"; + if (pipeState.shuttingDown) return; - @readableStreamDefaultReaderRead(pipeState.reader).@then((result) => { - const canWrite = !result.done && @getByIdDirectPrivate(pipeState.writer, "stream") !== @undefined; - pipeState.pendingReadPromiseCapability.@resolve.@call(@undefined, canWrite); - if (!canWrite) - return; - - pipeState.pendingWritePromise = @writableStreamDefaultWriterWrite(pipeState.writer, result.value); - }, (e) => { - pipeState.pendingReadPromiseCapability.@resolve.@call(@undefined, false); - }); - }, (e) => { - pipeState.pendingReadPromiseCapability.@resolve.@call(@undefined, false); - }); - return pipeState.pendingReadPromiseCapability.@promise; + @pipeToDoReadWrite(pipeState).@then((result) => { + if (result) @pipeToLoop(pipeState); + }); } -function pipeToErrorsMustBePropagatedForward(pipeState) -{ - "use strict"; - - const action = () => { - pipeState.pendingReadPromiseCapability.@resolve.@call(@undefined, false); - const error = @getByIdDirectPrivate(pipeState.source, "storedError"); - if (!pipeState.preventAbort) { - @pipeToShutdownWithAction(pipeState, () => @writableStreamAbort(pipeState.destination, error), error); - return; - } - @pipeToShutdown(pipeState, error); - }; - - if (@getByIdDirectPrivate(pipeState.source, "state") === @streamErrored) { - action(); +function pipeToDoReadWrite(pipeState) { + "use strict"; + @assert(!pipeState.shuttingDown); + + pipeState.pendingReadPromiseCapability = @newPromiseCapability(@Promise); + @getByIdDirectPrivate(pipeState.writer, "readyPromise").@promise.@then( + () => { + if (pipeState.shuttingDown) { + pipeState.pendingReadPromiseCapability.@resolve.@call( + @undefined, + false + ); return; - } - - @getByIdDirectPrivate(pipeState.reader, "closedPromiseCapability").@promise.@then(@undefined, action); -} - -function pipeToErrorsMustBePropagatedBackward(pipeState) -{ - "use strict"; - const action = () => { - const error = @getByIdDirectPrivate(pipeState.destination, "storedError"); - if (!pipeState.preventCancel) { - @pipeToShutdownWithAction(pipeState, () => @readableStreamCancel(pipeState.source, error), error); - return; + } + + @readableStreamDefaultReaderRead(pipeState.reader).@then( + (result) => { + const canWrite = + !result.done && + @getByIdDirectPrivate(pipeState.writer, "stream") !== @undefined; + pipeState.pendingReadPromiseCapability.@resolve.@call( + @undefined, + canWrite + ); + if (!canWrite) return; + + pipeState.pendingWritePromise = @writableStreamDefaultWriterWrite( + pipeState.writer, + result.value + ); + }, + (e) => { + pipeState.pendingReadPromiseCapability.@resolve.@call( + @undefined, + false + ); } - @pipeToShutdown(pipeState, error); - }; - if (@getByIdDirectPrivate(pipeState.destination, "state") === "errored") { - action(); - return; + ); + }, + (e) => { + pipeState.pendingReadPromiseCapability.@resolve.@call( + @undefined, + false + ); } - @getByIdDirectPrivate(pipeState.writer, "closedPromise").@promise.@then(@undefined, action); + ); + return pipeState.pendingReadPromiseCapability.@promise; } -function pipeToClosingMustBePropagatedForward(pipeState) -{ - "use strict"; - const action = () => { - pipeState.pendingReadPromiseCapability.@resolve.@call(@undefined, false); - const error = @getByIdDirectPrivate(pipeState.source, "storedError"); - if (!pipeState.preventClose) { - @pipeToShutdownWithAction(pipeState, () => @writableStreamDefaultWriterCloseWithErrorPropagation(pipeState.writer)); - return; - } - @pipeToShutdown(pipeState); - }; - if (@getByIdDirectPrivate(pipeState.source, "state") === @streamClosed) { - action(); - return; +function pipeToErrorsMustBePropagatedForward(pipeState) { + "use strict"; + + const action = () => { + pipeState.pendingReadPromiseCapability.@resolve.@call(@undefined, false); + const error = @getByIdDirectPrivate(pipeState.source, "storedError"); + if (!pipeState.preventAbort) { + @pipeToShutdownWithAction( + pipeState, + () => @writableStreamAbort(pipeState.destination, error), + error + ); + return; } - @getByIdDirectPrivate(pipeState.reader, "closedPromiseCapability").@promise.@then(action, @undefined); -} + @pipeToShutdown(pipeState, error); + }; -function pipeToClosingMustBePropagatedBackward(pipeState) -{ - "use strict"; - if (!@writableStreamCloseQueuedOrInFlight(pipeState.destination) && @getByIdDirectPrivate(pipeState.destination, "state") !== "closed") - return; + if (@getByIdDirectPrivate(pipeState.source, "state") === @streamErrored) { + action(); + return; + } - // @assert no chunks have been read/written + @getByIdDirectPrivate( + pipeState.reader, + "closedPromiseCapability" + ).@promise.@then(@undefined, action); +} - const error = @makeTypeError("closing is propagated backward"); +function pipeToErrorsMustBePropagatedBackward(pipeState) { + "use strict"; + const action = () => { + const error = @getByIdDirectPrivate(pipeState.destination, "storedError"); if (!pipeState.preventCancel) { - @pipeToShutdownWithAction(pipeState, () => @readableStreamCancel(pipeState.source, error), error); - return; + @pipeToShutdownWithAction( + pipeState, + () => @readableStreamCancel(pipeState.source, error), + error + ); + return; } @pipeToShutdown(pipeState, error); + }; + if (@getByIdDirectPrivate(pipeState.destination, "state") === "errored") { + action(); + return; + } + @getByIdDirectPrivate(pipeState.writer, "closedPromise").@promise.@then( + @undefined, + action + ); } -function pipeToShutdownWithAction(pipeState, action) -{ - "use strict"; - - if (pipeState.shuttingDown) - return; - - pipeState.shuttingDown = true; - - const hasError = arguments.length > 2; - const error = arguments[2]; - const finalize = () => { - const promise = action(); - promise.@then(() => { - if (hasError) - @pipeToFinalize(pipeState, error); - else - @pipeToFinalize(pipeState); - }, (e) => { - @pipeToFinalize(pipeState, e); - }); - }; - - if (@getByIdDirectPrivate(pipeState.destination, "state") === "writable" && !@writableStreamCloseQueuedOrInFlight(pipeState.destination)) { - pipeState.pendingReadPromiseCapability.@promise.@then(() => { - pipeState.pendingWritePromise.@then(finalize, finalize); - }, (e) => @pipeToFinalize(pipeState, e)); - return; +function pipeToClosingMustBePropagatedForward(pipeState) { + "use strict"; + const action = () => { + pipeState.pendingReadPromiseCapability.@resolve.@call(@undefined, false); + const error = @getByIdDirectPrivate(pipeState.source, "storedError"); + if (!pipeState.preventClose) { + @pipeToShutdownWithAction(pipeState, () => + @writableStreamDefaultWriterCloseWithErrorPropagation(pipeState.writer) + ); + return; } - - finalize(); + @pipeToShutdown(pipeState); + }; + if (@getByIdDirectPrivate(pipeState.source, "state") === @streamClosed) { + action(); + return; + } + @getByIdDirectPrivate( + pipeState.reader, + "closedPromiseCapability" + ).@promise.@then(action, @undefined); } -function pipeToShutdown(pipeState) -{ - "use strict"; - - if (pipeState.shuttingDown) - return; - - pipeState.shuttingDown = true; - - const hasError = arguments.length > 1; - const error = arguments[1]; - const finalize = () => { - if (hasError) - @pipeToFinalize(pipeState, error); - else - @pipeToFinalize(pipeState); - }; - - if (@getByIdDirectPrivate(pipeState.destination, "state") === "writable" && !@writableStreamCloseQueuedOrInFlight(pipeState.destination)) { - pipeState.pendingReadPromiseCapability.@promise.@then(() => { - pipeState.pendingWritePromise.@then(finalize, finalize); - }, (e) => @pipeToFinalize(pipeState, e)); - return; - } - finalize(); +function pipeToClosingMustBePropagatedBackward(pipeState) { + "use strict"; + if ( + !@writableStreamCloseQueuedOrInFlight(pipeState.destination) && + @getByIdDirectPrivate(pipeState.destination, "state") !== "closed" + ) + return; + + // @assert no chunks have been read/written + + const error = @makeTypeError("closing is propagated backward"); + if (!pipeState.preventCancel) { + @pipeToShutdownWithAction( + pipeState, + () => @readableStreamCancel(pipeState.source, error), + error + ); + return; + } + @pipeToShutdown(pipeState, error); } -function pipeToFinalize(pipeState) -{ - "use strict"; - - @writableStreamDefaultWriterRelease(pipeState.writer); - @readableStreamReaderGenericRelease(pipeState.reader); - - // Instead of removing the abort algorithm as per spec, we make it a no-op which is equivalent. - pipeState.finalized = true; - - if (arguments.length > 1) - pipeState.promiseCapability.@reject.@call(@undefined, arguments[1]); - else - pipeState.promiseCapability.@resolve.@call(); +function pipeToShutdownWithAction(pipeState, action) { + "use strict"; + + if (pipeState.shuttingDown) return; + + pipeState.shuttingDown = true; + + const hasError = arguments.length > 2; + const error = arguments[2]; + const finalize = () => { + const promise = action(); + promise.@then( + () => { + if (hasError) @pipeToFinalize(pipeState, error); + else @pipeToFinalize(pipeState); + }, + (e) => { + @pipeToFinalize(pipeState, e); + } + ); + }; + + if ( + @getByIdDirectPrivate(pipeState.destination, "state") === "writable" && + !@writableStreamCloseQueuedOrInFlight(pipeState.destination) + ) { + pipeState.pendingReadPromiseCapability.@promise.@then( + () => { + pipeState.pendingWritePromise.@then(finalize, finalize); + }, + (e) => @pipeToFinalize(pipeState, e) + ); + return; + } + + finalize(); } -function readableStreamTee(stream, shouldClone) -{ - "use strict"; - - @assert(@isReadableStream(stream)); - @assert(typeof(shouldClone) === "boolean"); - - const reader = new @ReadableStreamDefaultReader(stream); - - const teeState = { - closedOrErrored: false, - canceled1: false, - canceled2: false, - reason1: @undefined, - reason2: @undefined, - }; - - teeState.cancelPromiseCapability = @newPromiseCapability(@Promise); +function pipeToShutdown(pipeState) { + "use strict"; + + if (pipeState.shuttingDown) return; + + pipeState.shuttingDown = true; + + const hasError = arguments.length > 1; + const error = arguments[1]; + const finalize = () => { + if (hasError) @pipeToFinalize(pipeState, error); + else @pipeToFinalize(pipeState); + }; + + if ( + @getByIdDirectPrivate(pipeState.destination, "state") === "writable" && + !@writableStreamCloseQueuedOrInFlight(pipeState.destination) + ) { + pipeState.pendingReadPromiseCapability.@promise.@then( + () => { + pipeState.pendingWritePromise.@then(finalize, finalize); + }, + (e) => @pipeToFinalize(pipeState, e) + ); + return; + } + finalize(); +} - const pullFunction = @readableStreamTeePullFunction(teeState, reader, shouldClone); +function pipeToFinalize(pipeState) { + "use strict"; - const branch1Source = { }; - @putByIdDirectPrivate(branch1Source, "pull", pullFunction); - @putByIdDirectPrivate(branch1Source, "cancel", @readableStreamTeeBranch1CancelFunction(teeState, stream)); + @writableStreamDefaultWriterRelease(pipeState.writer); + @readableStreamReaderGenericRelease(pipeState.reader); - const branch2Source = { }; - @putByIdDirectPrivate(branch2Source, "pull", pullFunction); - @putByIdDirectPrivate(branch2Source, "cancel", @readableStreamTeeBranch2CancelFunction(teeState, stream)); + // Instead of removing the abort algorithm as per spec, we make it a no-op which is equivalent. + pipeState.finalized = true; - const branch1 = new @ReadableStream(branch1Source); - const branch2 = new @ReadableStream(branch2Source); + if (arguments.length > 1) + pipeState.promiseCapability.@reject.@call(@undefined, arguments[1]); + else pipeState.promiseCapability.@resolve.@call(); +} - @getByIdDirectPrivate(reader, "closedPromiseCapability").@promise.@then(@undefined, function(e) { - if (teeState.closedOrErrored) - return; - @readableStreamDefaultControllerError(branch1.@readableStreamController, e); - @readableStreamDefaultControllerError(branch2.@readableStreamController, e); - teeState.closedOrErrored = true; - if (!teeState.canceled1 || !teeState.canceled2) - teeState.cancelPromiseCapability.@resolve.@call(); - }); +function readableStreamTee(stream, shouldClone) { + "use strict"; + + @assert(@isReadableStream(stream)); + @assert(typeof shouldClone === "boolean"); + + const reader = new @ReadableStreamDefaultReader(stream); + + const teeState = { + closedOrErrored: false, + canceled1: false, + canceled2: false, + reason1: @undefined, + reason2: @undefined, + }; + + teeState.cancelPromiseCapability = @newPromiseCapability(@Promise); + + const pullFunction = @readableStreamTeePullFunction( + teeState, + reader, + shouldClone + ); + + const branch1Source = {}; + @putByIdDirectPrivate(branch1Source, "pull", pullFunction); + @putByIdDirectPrivate( + branch1Source, + "cancel", + @readableStreamTeeBranch1CancelFunction(teeState, stream) + ); + + const branch2Source = {}; + @putByIdDirectPrivate(branch2Source, "pull", pullFunction); + @putByIdDirectPrivate( + branch2Source, + "cancel", + @readableStreamTeeBranch2CancelFunction(teeState, stream) + ); + + const branch1 = new @ReadableStream(branch1Source); + const branch2 = new @ReadableStream(branch2Source); + + @getByIdDirectPrivate(reader, "closedPromiseCapability").@promise.@then( + @undefined, + function (e) { + if (teeState.closedOrErrored) return; + @readableStreamDefaultControllerError( + branch1.@readableStreamController, + e + ); + @readableStreamDefaultControllerError( + branch2.@readableStreamController, + e + ); + teeState.closedOrErrored = true; + if (!teeState.canceled1 || !teeState.canceled2) + teeState.cancelPromiseCapability.@resolve.@call(); + } + ); - // Additional fields compared to the spec, as they are needed within pull/cancel functions. - teeState.branch1 = branch1; - teeState.branch2 = branch2; + // Additional fields compared to the spec, as they are needed within pull/cancel functions. + teeState.branch1 = branch1; + teeState.branch2 = branch2; - return [branch1, branch2]; + return [branch1, branch2]; } -function readableStreamTeePullFunction(teeState, reader, shouldClone) -{ - "use strict"; - - return function() { - @Promise.prototype.@then.@call(@readableStreamDefaultReaderRead(reader), function(result) { - @assert(@isObject(result)); - @assert(typeof result.done === "boolean"); - if (result.done && !teeState.closedOrErrored) { - if (!teeState.canceled1) - @readableStreamDefaultControllerClose(teeState.branch1.@readableStreamController); - if (!teeState.canceled2) - @readableStreamDefaultControllerClose(teeState.branch2.@readableStreamController); - teeState.closedOrErrored = true; - if (!teeState.canceled1 || !teeState.canceled2) - teeState.cancelPromiseCapability.@resolve.@call(); - } - if (teeState.closedOrErrored) - return; - if (!teeState.canceled1) - @readableStreamDefaultControllerEnqueue(teeState.branch1.@readableStreamController, result.value); - if (!teeState.canceled2) - @readableStreamDefaultControllerEnqueue(teeState.branch2.@readableStreamController, shouldClone ? @structuredCloneForStream(result.value) : result.value); - }); - } +function readableStreamTeePullFunction(teeState, reader, shouldClone) { + "use strict"; + + return function () { + @Promise.prototype.@then.@call( + @readableStreamDefaultReaderRead(reader), + function (result) { + @assert(@isObject(result)); + @assert(typeof result.done === "boolean"); + if (result.done && !teeState.closedOrErrored) { + if (!teeState.canceled1) + @readableStreamDefaultControllerClose( + teeState.branch1.@readableStreamController + ); + if (!teeState.canceled2) + @readableStreamDefaultControllerClose( + teeState.branch2.@readableStreamController + ); + teeState.closedOrErrored = true; + if (!teeState.canceled1 || !teeState.canceled2) + teeState.cancelPromiseCapability.@resolve.@call(); + } + if (teeState.closedOrErrored) return; + if (!teeState.canceled1) + @readableStreamDefaultControllerEnqueue( + teeState.branch1.@readableStreamController, + result.value + ); + if (!teeState.canceled2) + @readableStreamDefaultControllerEnqueue( + teeState.branch2.@readableStreamController, + shouldClone + ? @structuredCloneForStream(result.value) + : result.value + ); + } + ); + }; } -function readableStreamTeeBranch1CancelFunction(teeState, stream) -{ - "use strict"; - - return function(r) { - teeState.canceled1 = true; - teeState.reason1 = r; - if (teeState.canceled2) { - @readableStreamCancel(stream, [teeState.reason1, teeState.reason2]).@then( - teeState.cancelPromiseCapability.@resolve, - teeState.cancelPromiseCapability.@reject); - } - return teeState.cancelPromiseCapability.@promise; +function readableStreamTeeBranch1CancelFunction(teeState, stream) { + "use strict"; + + return function (r) { + teeState.canceled1 = true; + teeState.reason1 = r; + if (teeState.canceled2) { + @readableStreamCancel(stream, [ + teeState.reason1, + teeState.reason2, + ]).@then( + teeState.cancelPromiseCapability.@resolve, + teeState.cancelPromiseCapability.@reject + ); } + return teeState.cancelPromiseCapability.@promise; + }; } -function readableStreamTeeBranch2CancelFunction(teeState, stream) -{ - "use strict"; - - return function(r) { - teeState.canceled2 = true; - teeState.reason2 = r; - if (teeState.canceled1) { - @readableStreamCancel(stream, [teeState.reason1, teeState.reason2]).@then( - teeState.cancelPromiseCapability.@resolve, - teeState.cancelPromiseCapability.@reject); - } - return teeState.cancelPromiseCapability.@promise; +function readableStreamTeeBranch2CancelFunction(teeState, stream) { + "use strict"; + + return function (r) { + teeState.canceled2 = true; + teeState.reason2 = r; + if (teeState.canceled1) { + @readableStreamCancel(stream, [ + teeState.reason1, + teeState.reason2, + ]).@then( + teeState.cancelPromiseCapability.@resolve, + teeState.cancelPromiseCapability.@reject + ); } + return teeState.cancelPromiseCapability.@promise; + }; } -function isReadableStream(stream) -{ - "use strict"; +function isReadableStream(stream) { + "use strict"; - // Spec tells to return true only if stream has a readableStreamController internal slot. - // However, since it is a private slot, it cannot be checked using hasOwnProperty(). - // Therefore, readableStreamController is initialized with null value. - return @isObject(stream) && @getByIdDirectPrivate(stream, "readableStreamController") !== @undefined; + // Spec tells to return true only if stream has a readableStreamController internal slot. + // However, since it is a private slot, it cannot be checked using hasOwnProperty(). + // Therefore, readableStreamController is initialized with null value. + return ( + @isObject(stream) && + @getByIdDirectPrivate(stream, "readableStreamController") !== @undefined + ); } -function isReadableStreamDefaultReader(reader) -{ - "use strict"; +function isReadableStreamDefaultReader(reader) { + "use strict"; - // Spec tells to return true only if reader has a readRequests internal slot. - // However, since it is a private slot, it cannot be checked using hasOwnProperty(). - // Since readRequests is initialized with an empty array, the following test is ok. - return @isObject(reader) && !!@getByIdDirectPrivate(reader, "readRequests"); + // Spec tells to return true only if reader has a readRequests internal slot. + // However, since it is a private slot, it cannot be checked using hasOwnProperty(). + // Since readRequests is initialized with an empty array, the following test is ok. + return @isObject(reader) && !!@getByIdDirectPrivate(reader, "readRequests"); } -function isReadableStreamDefaultController(controller) -{ - "use strict"; - - // Spec tells to return true only if controller has an underlyingSource internal slot. - // However, since it is a private slot, it cannot be checked using hasOwnProperty(). - // underlyingSource is obtained in ReadableStream constructor: if undefined, it is set - // to an empty object. Therefore, following test is ok. - return @isObject(controller) && !!@getByIdDirectPrivate(controller, "underlyingSource"); +function isReadableStreamDefaultController(controller) { + "use strict"; + + // Spec tells to return true only if controller has an underlyingSource internal slot. + // However, since it is a private slot, it cannot be checked using hasOwnProperty(). + // underlyingSource is obtained in ReadableStream constructor: if undefined, it is set + // to an empty object. Therefore, following test is ok. + return ( + @isObject(controller) && + !!@getByIdDirectPrivate(controller, "underlyingSource") + ); } +function readDirectStream(stream, sink, underlyingSource) { + "use strict"; -@globalPrivate -function assignToStream(stream, sink) { - "use strict"; - - // The stream is either a direct stream or a "default" JS stream - const underlyingSource = @getByIdDirectPrivate(stream, "underlyingSource"); - - // we know it's a direct stream when @underlyingSource is set - if (underlyingSource) { - var originalClose = underlyingSource.close; - var reader; - var close = (reason) => { - originalClose && originalClose(reason); - try { - reader && reader.releaseLock(); - } catch (e) {} - @readableStreamClose(stream, reason); - - } - var pull = underlyingSource.pull; + var originalClose = underlyingSource.close; + var reader; + var close = (reason) => { + originalClose && originalClose(reason); + try { + reader && reader.releaseLock(); + } catch (e) {} + @readableStreamClose(stream, reason); + @putByIdDirectPrivate(stream, "underlyingSource", @undefined); + @putByIdDirectPrivate(stream, "readableStreamController", null); + close = @undefined; + reader = @undefined; + }; + var pull = underlyingSource.pull; + + if (!pull) { + close(); + return; + } + + if (!@isCallable(pull)) { + close(); + @throwTypeError("pull is not a function"); + return; + } + + @putByIdDirectPrivate(stream, "readableStreamController", sink); + @putByIdDirectPrivate(stream, "start", @undefined); + @putByIdDirectPrivate(stream, "underlyingSource", @undefined); + + const highWaterMark = @getByIdDirectPrivate(stream, "highWaterMark"); + + if (highWaterMark) { + sink.start({ + highWaterMark, + }); + } - if (!pull) { - close(); - return; - } + @startDirectStream.@call(sink, stream, pull, close); - if (!@isCallable(pull)) { - close(); - @throwTypeError("pull is not a function"); - return; - } - - @putByIdDirectPrivate(stream, "readableStreamController", sink); - @putByIdDirectPrivate(stream, "start", @undefined); - @putByIdDirectPrivate(stream, "underlyingSource", @undefined); + // lock the stream, relying on close() or end() to eventaully close it + reader = stream.getReader(); - const highWaterMark = @getByIdDirectPrivate(stream, "highWaterMark"); + pull(sink); +} - if (highWaterMark) { - sink.start({ - highWaterMark, - }); - } +@globalPrivate; +function assignToStream(stream, sink) { + "use strict"; - @startDirectStream.@call(sink, stream, pull, close); - - - // lock the stream, relying on close() or end() to eventaully close it - reader = stream.getReader(); + // The stream is either a direct stream or a "default" JS stream + const underlyingSource = @getByIdDirectPrivate(stream, "underlyingSource"); - pull(sink); - return; - } + // we know it's a direct stream when @underlyingSource is set + if (underlyingSource) { + return @readDirectStream(stream, sink, underlyingSource); + } - - return (async function() { - "use strict"; + return @readStreamIntoSink(stream, sink, true); +} - var didClose = false; - try { - var reader = stream.getReader(); +async function readStreamIntoSink(stream, sink, isNative) { + "use strict"; - var many = reader.readMany(); - if (many && @isPromise(many)) { - many = await many; - } - if (many.done) { - didClose = true; - sink.end(); - return; - } + var didClose = false; + var didThrow = false; + try { + var reader = stream.getReader(); + var many = reader.readMany(); + if (many && @isPromise(many)) { + many = await many; + } + if (many.done) { + didClose = true; + return sink.end(); + } + var wroteCount = many.value.length; + const highWaterMark = @getByIdDirectPrivate(stream, "highWaterMark"); + if (isNative) @startDirectStream.@call(sink, stream, @undefined, () => !didThrow && stream.cancel()); - - var wroteCount = many.value.length; - const highWaterMark = @getByIdDirectPrivate(stream, "highWaterMark"); + if (highWaterMark) sink.start({ highWaterMark }); + - if (highWaterMark) - sink.start({highWaterMark}); + for ( + var i = 0, values = many.value, length = many.value.length; + i < length; + i++ + ) { + sink.write(values[i]); + } - for (var i = 0, values = many.value, length = many.value.length; i < length; i++) { - sink.write(values[i]); - } + var streamState = @getByIdDirectPrivate(stream, "state"); + if (streamState === @streamClosed) { + didClose = true; + return sink.end(); + } - var streamState = @getByIdDirectPrivate(stream, "state"); + while (true) { + var { value, done } = await reader.read(); + if (done) { + didClose = true; + return sink.end(); + } + sink.write(value); + } + } catch (e) { + didThrow = true; - if (streamState === @streamClosed) { - didClose = true; - return sink.end(); - } - if (wroteCount > 0) { - sink.drain(); - } - - while (true) { - var result = await reader.read(); + try { + reader = @undefined; + stream.cancel(e); + } catch (j) {} + + if (sink && !didClose) { + didClose = true; + try { + sink.close(e); + } catch (j) { + throw new globalThis.AggregateError([e, j]); + } + } - if (result.done) { - didClose = true; - return sink.end(); - } - sink.write(result.value); - } - } catch (e) { - if (sink && !didClose) { - didClose = true; - try { - sink.close(); - } catch(j) { - throw j; - } - } + throw e; + } finally { + if (reader) { + try { + reader.releaseLock(); + } catch (e) {} + reader = @undefined; + } + sink = @undefined; + var streamState = @getByIdDirectPrivate(stream, "state"); + if (stream) { + + // make it easy for this to be GC'd + // but don't do property transitions + var readableStreamController = @getByIdDirectPrivate( + stream, + "readableStreamController" + ); + if (readableStreamController) { + if ( + @getByIdDirectPrivate(readableStreamController, "underlyingSource") + ) + @putByIdDirectPrivate( + readableStreamController, + "underlyingSource", + @undefined + ); + if ( + @getByIdDirectPrivate( + readableStreamController, + "controlledReadableStream" + ) + ) + @putByIdDirectPrivate( + readableStreamController, + "controlledReadableStream", + @undefined + ); + + @putByIdDirectPrivate(stream, "readableStreamController", null); + if (@getByIdDirectPrivate(stream, "underlyingSource")) + @putByIdDirectPrivate(stream, "underlyingSource", @undefined); + readableStreamController = @undefined; + } + + if (!didThrow && streamState !== @streamClosed && streamState !== @streamErrored) { + @readableStreamClose(stream); + } + stream = @undefined; - throw e; - } - })(); + + } + } } - function handleDirectStreamError(e) { - "use strict"; - - var controller = this; - var sink = controller.@sink; - if (sink) { - @putByIdDirectPrivate(controller, "sink", @undefined); - try { - sink.close(e); - } catch (f) {} - } - - this.error = this.drain = this.write = this.close = this.end = @onReadableStreamDirectControllerClosed; - - if (typeof this.@underlyingSource.close === 'function') { - try { - this.@underlyingSource.close.@call(this.@underlyingSource, e); - } catch (e) { - } - } + "use strict"; + var controller = this; + var sink = controller.@sink; + if (sink) { + @putByIdDirectPrivate(controller, "sink", @undefined); try { - var pend = controller._pendingRead; - if (pend) { - controller._pendingRead = @undefined; - @rejectPromise(pend, e); - } + sink.close(e); } catch (f) {} - var stream = controller.@controlledReadableStream; - if (stream) @readableStreamError(stream, e); -} - -function handleDirectStreamErrorReject(e) { - @handleDirectStreamError.@call(this, e); - return @Promise.@reject(e); -} + } -function onPullDirectStream(controller) -{ - - "use strict"; - - var stream = controller.@controlledReadableStream; - if (!stream || @getByIdDirectPrivate(stream, "state") !== @streamReadable) - return; + this.error = + this.drain = + this.write = + this.close = + this.end = + @onReadableStreamDirectControllerClosed; - // pull is in progress - // this is a recursive call - // ignore it - if (controller._deferClose === -1) { - return; - } - - - controller._deferClose = -1; - controller._deferDrain = -1; - var deferClose; - var deferDrain; - - // Direct streams allow @pull to be called multiple times, unlike the spec. - // Backpressure is handled by the destination, not by the underlying source. - // In this case, we rely on the heuristic that repeatedly draining in the same tick - // is bad for performance - // this code is only run when consuming a direct stream from JS - // without the HTTP server or anything else + if (typeof this.@underlyingSource.close === "function") { try { - var result = controller.@underlyingSource.pull( - controller, - ); - - if (result && @isPromise(result)) { - if (controller._handleError === @undefined) { - controller._handleError = @handleDirectStreamErrorReject.bind(controller); - } - - @Promise.prototype.catch.@call(result, controller._handleError); - } - } catch(e) { - return @handleDirectStreamErrorReject.@call(controller, e); - } finally { - deferClose = controller._deferClose; - deferDrain = controller._deferDrain; - controller._deferDrain = controller._deferClose = 0; - } - - - var promiseToReturn; - - - if (controller._pendingRead === @undefined) { - controller._pendingRead = promiseToReturn = @newPromise(); - } else { - promiseToReturn = @readableStreamAddReadRequest(stream); + this.@underlyingSource.close.@call(this.@underlyingSource, e); + } catch (e) {} + } + + try { + var pend = controller._pendingRead; + if (pend) { + controller._pendingRead = @undefined; + @rejectPromise(pend, e); } + } catch (f) {} + var stream = controller.@controlledReadableStream; + if (stream) @readableStreamError(stream, e); +} +function handleDirectStreamErrorReject(e) { + @handleDirectStreamError.@call(this, e); + return @Promise.@reject(e); +} - // they called close during @pull() - // we delay that - if (deferClose === 1) { - var reason = controller._deferCloseReason; - controller._deferCloseReason = @undefined; - @onCloseDirectStream.@call(controller, reason); - return promiseToReturn; +function onPullDirectStream(controller) { + "use strict"; + + var stream = controller.@controlledReadableStream; + if (!stream || @getByIdDirectPrivate(stream, "state") !== @streamReadable) + return; + + // pull is in progress + // this is a recursive call + // ignore it + if (controller._deferClose === -1) { + return; + } + + controller._deferClose = -1; + controller._deferDrain = -1; + var deferClose; + var deferDrain; + + // Direct streams allow @pull to be called multiple times, unlike the spec. + // Backpressure is handled by the destination, not by the underlying source. + // In this case, we rely on the heuristic that repeatedly draining in the same tick + // is bad for performance + // this code is only run when consuming a direct stream from JS + // without the HTTP server or anything else + try { + var result = controller.@underlyingSource.pull(controller); + + if (result && @isPromise(result)) { + if (controller._handleError === @undefined) { + controller._handleError = + @handleDirectStreamErrorReject.bind(controller); + } + + @Promise.prototype.catch.@call(result, controller._handleError); } + } catch (e) { + return @handleDirectStreamErrorReject.@call(controller, e); + } finally { + deferClose = controller._deferClose; + deferDrain = controller._deferDrain; + controller._deferDrain = controller._deferClose = 0; + } + + var promiseToReturn; + + if (controller._pendingRead === @undefined) { + controller._pendingRead = promiseToReturn = @newPromise(); + } else { + promiseToReturn = @readableStreamAddReadRequest(stream); + } + + // they called close during @pull() + // we delay that + if (deferClose === 1) { + var reason = controller._deferCloseReason; + controller._deferCloseReason = @undefined; + @onCloseDirectStream.@call(controller, reason); + return promiseToReturn; + } - // not done, but they called drain() - if (deferDrain === 1) { - @onDrainDirectStream.@call(controller); - } - + // not done, but they called drain() + if (deferDrain === 1) { + @onDrainDirectStream.@call(controller); + } - return promiseToReturn; + return promiseToReturn; } function noopDoneFunction() { - return @Promise.@resolve({value: @undefined, done: true}); + return @Promise.@resolve({ value: @undefined, done: true }); } -function onReadableStreamDirectControllerClosed(reason) -{ - "use strict"; - @throwTypeError("ReadableStreamDirectController is now closed"); +function onReadableStreamDirectControllerClosed(reason) { + "use strict"; + @throwTypeError("ReadableStreamDirectController is now closed"); } -function onCloseDirectStream(reason) -{ - "use strict"; - var stream = this.@controlledReadableStream; - if (!stream || @getByIdDirectPrivate(stream, "state") !== @streamReadable) - return; +function onCloseDirectStream(reason) { + "use strict"; + var stream = this.@controlledReadableStream; + if (!stream || @getByIdDirectPrivate(stream, "state") !== @streamReadable) + return; - if (this._deferClose !== 0) { - this._deferClose = 1; - this._deferCloseReason = reason; - return; - } - - @putByIdDirectPrivate(stream, "state", @streamClosing); - if (typeof this.@underlyingSource.close === 'function') { - try { - this.@underlyingSource.close.@call(this.@underlyingSource, reason); - } catch (e) { - - } - } + if (this._deferClose !== 0) { + this._deferClose = 1; + this._deferCloseReason = reason; + return; + } - var drained; + @putByIdDirectPrivate(stream, "state", @streamClosing); + if (typeof this.@underlyingSource.close === "function") { try { - drained = this.@sink.end(); - @putByIdDirectPrivate(this, "sink", @undefined); - } catch (e) { - if (this._pendingRead) { - var read = this._pendingRead; - this._pendingRead = @undefined; - @rejectPromise(read, e); - } - @readableStreamError(stream, e); - return; + this.@underlyingSource.close.@call(this.@underlyingSource, reason); + } catch (e) {} + } + + var drained; + try { + drained = this.@sink.end(); + @putByIdDirectPrivate(this, "sink", @undefined); + } catch (e) { + if (this._pendingRead) { + var read = this._pendingRead; + this._pendingRead = @undefined; + @rejectPromise(read, e); } + @readableStreamError(stream, e); + return; + } - this.error = this.drain = this.write = this.close = this.end = @onReadableStreamDirectControllerClosed; + this.error = + this.drain = + this.write = + this.close = + this.end = + @onReadableStreamDirectControllerClosed; - var reader = @getByIdDirectPrivate(stream, "reader"); + var reader = @getByIdDirectPrivate(stream, "reader"); - if (reader && @isReadableStreamDefaultReader(reader)) { - var _pendingRead = this._pendingRead; - if (_pendingRead && @isPromise(_pendingRead) && drained?.byteLength) { - this._pendingRead = @undefined; - @fulfillPromise(_pendingRead, {value: drained, done: false}); - @readableStreamClose(stream); - return; - } + if (reader && @isReadableStreamDefaultReader(reader)) { + var _pendingRead = this._pendingRead; + if (_pendingRead && @isPromise(_pendingRead) && drained?.byteLength) { + this._pendingRead = @undefined; + @fulfillPromise(_pendingRead, { value: drained, done: false }); + @readableStreamClose(stream); + return; } - - if (drained?.byteLength) { - var requests = @getByIdDirectPrivate(reader, "readRequests"); - if (requests?.isNotEmpty()) { - @readableStreamFulfillReadRequest(stream, drained, false); - @readableStreamClose(stream); - return; - } - - @putByIdDirectPrivate(stream, "state", @streamReadable); - this.@pull = () => { - var thisResult = @createFulfilledPromise({value: drained, done: false}); - drained = @undefined; - @readableStreamClose(stream); - stream = @undefined; - return thisResult; - }; - } else if (this._pendingRead) { - var read = this._pendingRead; - this._pendingRead = @undefined; - @putByIdDirectPrivate(this, "pull", @noopDoneFunction); - @fulfillPromise(read, {value: @undefined, done: true}); + } + + if (drained?.byteLength) { + var requests = @getByIdDirectPrivate(reader, "readRequests"); + if (requests?.isNotEmpty()) { + @readableStreamFulfillReadRequest(stream, drained, false); + @readableStreamClose(stream); + return; } - @readableStreamClose(stream); + @putByIdDirectPrivate(stream, "state", @streamReadable); + this.@pull = () => { + var thisResult = @createFulfilledPromise({ + value: drained, + done: false, + }); + drained = @undefined; + @readableStreamClose(stream); + stream = @undefined; + return thisResult; + }; + } else if (this._pendingRead) { + var read = this._pendingRead; + this._pendingRead = @undefined; + @putByIdDirectPrivate(this, "pull", @noopDoneFunction); + @fulfillPromise(read, { value: @undefined, done: true }); + } + + @readableStreamClose(stream); } -function onDrainDirectStream() -{ - "use strict"; +function onDrainDirectStream() { + "use strict"; - var stream = this.@controlledReadableStream; - var reader = @getByIdDirectPrivate(stream, "reader"); - if (!reader || !@isReadableStreamDefaultReader(reader)) { - return; - } + var stream = this.@controlledReadableStream; + var reader = @getByIdDirectPrivate(stream, "reader"); + if (!reader || !@isReadableStreamDefaultReader(reader)) { + return; + } - var _pendingRead = this._pendingRead; - this._pendingRead = @undefined; - if (_pendingRead && @isPromise(_pendingRead)) { - var drained = this.@sink.drain(); - if (drained?.byteLength) { - this._pendingRead = @getByIdDirectPrivate(stream, "readRequests")?.shift(); - @fulfillPromise(_pendingRead, {value: drained, done: false}); - } else { - this._pendingRead = _pendingRead; - } - } else if (@getByIdDirectPrivate(stream, "readRequests")?.isNotEmpty()) { - var drained = this.@sink.drain(); - if (drained?.byteLength) { - @readableStreamFulfillReadRequest(stream, drained, false); - } - } else if (this._deferDrain === -1) { - this._deferDrain = 1; + var _pendingRead = this._pendingRead; + this._pendingRead = @undefined; + if (_pendingRead && @isPromise(_pendingRead)) { + var drained = this.@sink.drain(); + if (drained?.byteLength) { + this._pendingRead = @getByIdDirectPrivate( + stream, + "readRequests" + )?.shift(); + @fulfillPromise(_pendingRead, { value: drained, done: false }); + } else { + this._pendingRead = _pendingRead; } - + } else if (@getByIdDirectPrivate(stream, "readRequests")?.isNotEmpty()) { + var drained = this.@sink.drain(); + if (drained?.byteLength) { + @readableStreamFulfillReadRequest(stream, drained, false); + } + } else if (this._deferDrain === -1) { + this._deferDrain = 1; + } } -function initializeTextStream(underlyingSource, highWaterMark) -{ - "use strict"; - - var sink; - var fifo = @createFIFO(); - var hasString = false; - var hasBuffer = false; - var rope = ''; - var estimatedLength = @toLength(0); - var closingPromise = @newPromise(); - var calledDone = false; - var isView = @ArrayBuffer.@isView; - - - sink = { - start() { - - }, - write(chunk) { - if (typeof chunk === 'string') { - var chunkLength = @toLength(chunk.length); - if (chunkLength > 0) { - rope += chunk; - hasString = true; - // TODO: utf16 byte length - estimatedLength += chunkLength; - - } - - return chunkLength; - } - - if (!chunk || !@isObject(chunk) || !((isView(chunk)) || chunk instanceof @ArrayBuffer)) { - @throwTypeError("Expected text, ArrayBuffer or ArrayBufferView"); - } - - const byteLength = @toLength(chunk.byteLength); - if (byteLength > 0) { - hasBuffer = true; - if (rope.length > 0) { - fifo.push(rope); - rope = ''; - } - fifo.push(chunk); - } - estimatedLength += byteLength; - return byteLength; +function createTextStream(highWaterMark) { + "use strict"; + + var sink; + var array = []; + var hasString = false; + var hasBuffer = false; + var rope = ""; + var estimatedLength = @toLength(0); + var capability = @newPromiseCapability(@Promise); + var calledDone = false; + + sink = { + start() {}, + write(chunk) { + if (typeof chunk === "string") { + var chunkLength = @toLength(chunk.length); + if (chunkLength > 0) { + rope += chunk; + hasString = true; + // TODO: utf16 byte length + estimatedLength += chunkLength; + } - }, + return chunkLength; + } + + if ( + !chunk || + !(@ArrayBuffer.@isView(chunk) || chunk instanceof @ArrayBuffer) + ) { + @throwTypeError("Expected text, ArrayBuffer or ArrayBufferView"); + } + + const byteLength = @toLength(chunk.byteLength); + if (byteLength > 0) { + hasBuffer = true; + if (rope.length > 0) { + @arrayPush(array, rope, chunk); + rope = ""; + } else { + @arrayPush(array, chunk); + } + } + estimatedLength += byteLength; + return byteLength; + }, + + drain() { + return 0; + }, + + end() { + if (calledDone) { + return ""; + } + return sink.fulfill(); + }, + + fulfill() { + calledDone = true; + const result = sink.finishInternal(); + + @fulfillPromise(capability.@promise, result); + return result; + }, + + finishInternal() { + if (!hasString && !hasBuffer) { + return ""; + } + + if (hasString && !hasBuffer) { + return rope; + } + + if (hasBuffer && !hasString) { + return new globalThis.TextDecoder().decode( + globalThis.Bun.concatArrayBuffers(array) + ); + } + + // worst case: mixed content + + var arrayBufferSink = new globalThis.Bun.ArrayBufferSink(); + arrayBufferSink.start({ + highWaterMark: estimatedLength, + asUint8Array: true, + }); + for (let item of array) { + arrayBufferSink.write(item); + } + array.length = 0; + if (rope.length > 0) { + arrayBufferSink.write(rope); + rope = ""; + } + + // TODO: use builtin + return new globalThis.TextDecoder().decode(arrayBufferSink.end()); + }, + + close() { + try { + if (!calledDone) { + calledDone = true; + sink.fulfill(); + } + } catch (e) {} + }, + }; - drain() { - return 0; - }, + return [sink, capability]; +} - end() { - if (calledDone) { - return ""; - } - return sink.fulfill(); - }, +function initializeTextStream(underlyingSource, highWaterMark) { + "use strict"; + var [sink, closingPromise] = @createTextStream(highWaterMark); + + var controller = { + @underlyingSource: underlyingSource, + @pull: @onPullDirectStream, + @controlledReadableStream: this, + @sink: sink, + close: @onCloseDirectStream, + write: sink.write, + error: @handleDirectStreamError, + end: @onCloseDirectStream, + @close: @onCloseDirectStream, + drain: @onDrainDirectStream, + _pendingRead: @undefined, + _deferClose: 0, + _deferDrain: 0, + _deferCloseReason: @undefined, + _handleError: @undefined, + }; + + @putByIdDirectPrivate(this, "readableStreamController", controller); + @putByIdDirectPrivate(this, "underlyingSource", @undefined); + @putByIdDirectPrivate(this, "start", @undefined); + return closingPromise; +} - fulfill() { - calledDone = true; - const result = sink.finishInternal(); - @fulfillPromise(closingPromise, result); - return result; - }, +function initializeArrayStream(underlyingSource, highWaterMark) { + "use strict"; + + var array = []; + var closingPromise = @newPromiseCapability(@Promise); + var calledDone = false; + + function fulfill() { + calledDone = true; + closingPromise.@resolve.@call(@undefined, array); + return array; + } + + var sink = { + start() {}, + write(chunk) { + @arrayPush(array, chunk); + return chunk.byteLength || chunk.length; + }, + + drain() { + return 0; + }, + + end() { + if (calledDone) { + return []; + } + return fulfill(); + }, + + close() { + if (!calledDone) { + fulfill(); + } + }, + }; + + var controller = { + @underlyingSource: underlyingSource, + @pull: @onPullDirectStream, + @controlledReadableStream: this, + @sink: sink, + close: @onCloseDirectStream, + write: sink.write, + error: @handleDirectStreamError, + end: @onCloseDirectStream, + @close: @onCloseDirectStream, + drain: @onDrainDirectStream, + _pendingRead: @undefined, + _deferClose: 0, + _deferDrain: 0, + _deferCloseReason: @undefined, + _handleError: @undefined, + }; + + @putByIdDirectPrivate(this, "readableStreamController", controller); + @putByIdDirectPrivate(this, "underlyingSource", @undefined); + @putByIdDirectPrivate(this, "start", @undefined); + return closingPromise; +} - finishInternal() { - if (!hasString && !hasBuffer) { - return ""; - } +function initializeArrayBufferStream(underlyingSource, highWaterMark) { + "use strict"; + + // This is the fallback implementation for direct streams + // When we don't know what the destination type is + // We assume it is a Uint8Array. + + var opts = + highWaterMark && typeof highWaterMark === "number" + ? { highWaterMark, stream: true, asUint8Array: true } + : { stream: true, asUint8Array: true }; + var sink = new globalThis.Bun.ArrayBufferSink(); + sink.start(opts); + + var controller = { + @underlyingSource: underlyingSource, + @pull: @onPullDirectStream, + @controlledReadableStream: this, + @sink: sink, + close: @onCloseDirectStream, + write: sink.write.bind(sink), + error: @handleDirectStreamError, + end: @onCloseDirectStream, + @close: @onCloseDirectStream, + drain: @onDrainDirectStream, + _pendingRead: @undefined, + _deferClose: 0, + _deferDrain: 0, + _deferCloseReason: @undefined, + _handleError: @undefined, + }; + + @putByIdDirectPrivate(this, "readableStreamController", controller); + @putByIdDirectPrivate(this, "underlyingSource", @undefined); + @putByIdDirectPrivate(this, "start", @undefined); +} - if (hasString && !hasBuffer) { - return rope; - } +function readableStreamError(stream, error) { + "use strict"; + + @assert(@isReadableStream(stream)); + @assert(@getByIdDirectPrivate(stream, "state") === @streamReadable); + @putByIdDirectPrivate(stream, "state", @streamErrored); + @putByIdDirectPrivate(stream, "storedError", error); + + const reader = @getByIdDirectPrivate(stream, "reader"); + + if (!reader) return; + + if (@isReadableStreamDefaultReader(reader)) { + const requests = @getByIdDirectPrivate(reader, "readRequests"); + @putByIdDirectPrivate(reader, "readRequests", @createFIFO()); + for (var request = requests.shift(); request; request = requests.shift()) + @rejectPromise(request, error); + } else { + @assert(@isReadableStreamBYOBReader(reader)); + const requests = @getByIdDirectPrivate(reader, "readIntoRequests"); + @putByIdDirectPrivate(reader, "readIntoRequests", @createFIFO()); + for (var request = requests.shift(); request; request = requests.shift()) + @rejectPromise(request, error); + } + + @getByIdDirectPrivate(reader, "closedPromiseCapability").@reject.@call( + @undefined, + error + ); + const promise = @getByIdDirectPrivate( + reader, + "closedPromiseCapability" + ).@promise; + @markPromiseAsHandled(promise); +} - if (hasBuffer && !hasString) { - return new globalThis.TextDecoder().decode( - globalThis.Bun.concatArrayBuffers(fifo.toArray(false))); - } +function readableStreamDefaultControllerShouldCallPull(controller) { + "use strict"; + + const stream = @getByIdDirectPrivate(controller, "controlledReadableStream"); + + if (!@readableStreamDefaultControllerCanCloseOrEnqueue(controller)) + return false; + if (!(@getByIdDirectPrivate(controller, "started") === 1)) return false; + if ( + (!@isReadableStreamLocked(stream) || + !@getByIdDirectPrivate( + @getByIdDirectPrivate(stream, "reader"), + "readRequests" + )?.isNotEmpty()) && + @readableStreamDefaultControllerGetDesiredSize(controller) <= 0 + ) + return false; + const desiredSize = + @readableStreamDefaultControllerGetDesiredSize(controller); + @assert(desiredSize !== null); + return desiredSize > 0; +} - // worst case: mixed content - var array = fifo.toArray(false); - - var arrayBufferSink = new globalThis.Bun.ArrayBufferSink(); - arrayBufferSink.start({ - highWaterMark: estimatedLength, - asUint8Array: true, - }); - for (let item of array) { - arrayBufferSink.write( - item - ); - } - if (rope.length > 0) { - arrayBufferSink.write(rope); - } - - // TODO: use builtin - return new globalThis.TextDecoder().decode( - arrayBufferSink.end() - ); - }, +function readableStreamDefaultControllerCallPullIfNeeded(controller) { + "use strict"; + + // FIXME: use @readableStreamDefaultControllerShouldCallPull + const stream = @getByIdDirectPrivate(controller, "controlledReadableStream"); + + if (!@readableStreamDefaultControllerCanCloseOrEnqueue(controller)) return; + if (!(@getByIdDirectPrivate(controller, "started") === 1)) return; + if ( + (!@isReadableStreamLocked(stream) || + !@getByIdDirectPrivate( + @getByIdDirectPrivate(stream, "reader"), + "readRequests" + )?.isNotEmpty()) && + @readableStreamDefaultControllerGetDesiredSize(controller) <= 0 + ) + return; + + if (@getByIdDirectPrivate(controller, "pulling")) { + @putByIdDirectPrivate(controller, "pullAgain", true); + return; + } + + @assert(!@getByIdDirectPrivate(controller, "pullAgain")); + @putByIdDirectPrivate(controller, "pulling", true); + + @getByIdDirectPrivate(controller, "pullAlgorithm") + .@call(@undefined) + .@then( + function () { + @putByIdDirectPrivate(controller, "pulling", false); + if (@getByIdDirectPrivate(controller, "pullAgain")) { + @putByIdDirectPrivate(controller, "pullAgain", false); - close() { - try { - if (!calledDone) { - calledDone = true; - sink.fulfill(); - } - } catch(e) { - - } finally { - rope = ''; - hasString = false; - hasBuffer = false; - estimatedLength = 0; - } + @readableStreamDefaultControllerCallPullIfNeeded(controller); } - }; - - var controller = { - @underlyingSource: underlyingSource, - @pull: @onPullDirectStream, - @controlledReadableStream: this, - @sink: sink, - close: @onCloseDirectStream, - write: sink.write, - error: @handleDirectStreamError, - end: @onCloseDirectStream, - @close: @onCloseDirectStream, - drain: @onDrainDirectStream, - _pendingRead: @undefined, - _deferClose: 0, - _deferDrain: 0, - _deferCloseReason: @undefined, - _handleError: @undefined, - }; - - - @putByIdDirectPrivate(this, "readableStreamController", controller); - @putByIdDirectPrivate(this, "underlyingSource", @undefined); - @putByIdDirectPrivate(this, "start", @undefined); - return closingPromise; + }, + function (error) { + @readableStreamDefaultControllerError(controller, error); + } + ); } -function initializeArrayStream(underlyingSource, highWaterMark) -{ - "use strict"; - - var array = []; - var closingPromise = @newPromise(); - var calledDone = false; - - function fulfill() { - calledDone = true; - @fulfillPromise(closingPromise, array); - return array; - } - - var sink = { - start() { +function isReadableStreamLocked(stream) { + "use strict"; - }, - write(chunk) { - array.push(chunk); - return chunk.length; - }, + @assert(@isReadableStream(stream)); + return !!@getByIdDirectPrivate(stream, "reader"); +} - drain() { - return 0; - }, +function readableStreamDefaultControllerGetDesiredSize(controller) { + "use strict"; - end() { - if (calledDone) { - return []; - } - return fulfill(); - }, + const stream = @getByIdDirectPrivate(controller, "controlledReadableStream"); + const state = @getByIdDirectPrivate(stream, "state"); - close() { - if (!calledDone) { - fulfill(); - } - } - }; + if (state === @streamErrored) return null; + if (state === @streamClosed) return 0; - var controller = { - @underlyingSource: underlyingSource, - @pull: @onPullDirectStream, - @controlledReadableStream: this, - @sink: sink, - close: @onCloseDirectStream, - write: sink.write, - error: @handleDirectStreamError, - end: @onCloseDirectStream, - @close: @onCloseDirectStream, - drain: @onDrainDirectStream, - _pendingRead: @undefined, - _deferClose: 0, - _deferDrain: 0, - _deferCloseReason: @undefined, - _handleError: @undefined, - }; - - - @putByIdDirectPrivate(this, "readableStreamController", controller); - @putByIdDirectPrivate(this, "underlyingSource", @undefined); - @putByIdDirectPrivate(this, "start", @undefined); - return closingPromise; + return ( + @getByIdDirectPrivate(controller, "strategy").highWaterMark - + @getByIdDirectPrivate(controller, "queue").size + ); } -function initializeArrayBufferStream(underlyingSource, highWaterMark) -{ - "use strict"; +function readableStreamReaderGenericCancel(reader, reason) { + "use strict"; - // This is the fallback implementation for direct streams - // When we don't know what the destination type is - // We assume it is a Uint8Array. - - var opts = highWaterMark && typeof highWaterMark === 'number' ? {highWaterMark, stream: true, asUint8Array: true} : {stream: true, asUint8Array: true}; - var sink = new globalThis.Bun.ArrayBufferSink(); - sink.start(opts); - - var controller = { - @underlyingSource: underlyingSource, - @pull: @onPullDirectStream, - @controlledReadableStream: this, - @sink: sink, - close: @onCloseDirectStream, - write: sink.write.bind(sink), - error: @handleDirectStreamError, - end: @onCloseDirectStream, - @close: @onCloseDirectStream, - drain: @onDrainDirectStream, - _pendingRead: @undefined, - _deferClose: 0, - _deferDrain: 0, - _deferCloseReason: @undefined, - _handleError: @undefined, - }; - - - @putByIdDirectPrivate(this, "readableStreamController", controller); - @putByIdDirectPrivate(this, "underlyingSource", @undefined); - @putByIdDirectPrivate(this, "start", @undefined); - + const stream = @getByIdDirectPrivate(reader, "ownerReadableStream"); + @assert(!!stream); + return @readableStreamCancel(stream, reason); } -function readableStreamError(stream, error) -{ - "use strict"; +function readableStreamCancel(stream, reason) { + "use strict"; - @assert(@isReadableStream(stream)); - @assert(@getByIdDirectPrivate(stream, "state") === @streamReadable); - @putByIdDirectPrivate(stream, "state", @streamErrored); - @putByIdDirectPrivate(stream, "storedError", error); - - const reader = @getByIdDirectPrivate(stream, "reader"); - - if (!reader) - return; - - if (@isReadableStreamDefaultReader(reader)) { - const requests = @getByIdDirectPrivate(reader, "readRequests"); - @putByIdDirectPrivate(reader, "readRequests", @createFIFO()); - for (var request = requests.shift(); request; request = requests.shift()) - @rejectPromise(request, error); - } else { - @assert(@isReadableStreamBYOBReader(reader)); - const requests = @getByIdDirectPrivate(reader, "readIntoRequests"); - @putByIdDirectPrivate(reader, "readIntoRequests", @createFIFO()); - for (var request = requests.shift(); request; request = requests.shift()) - @rejectPromise(request, error); - } + @putByIdDirectPrivate(stream, "disturbed", true); + const state = @getByIdDirectPrivate(stream, "state"); + if (state === @streamClosed) return @Promise.@resolve(); + if (state === @streamErrored) + return @Promise.@reject(@getByIdDirectPrivate(stream, "storedError")); + @readableStreamClose(stream); - @getByIdDirectPrivate(reader, "closedPromiseCapability").@reject.@call(@undefined, error); - const promise = @getByIdDirectPrivate(reader, "closedPromiseCapability").@promise; - @markPromiseAsHandled(promise); + var controller = @getByIdDirectPrivate(stream, "readableStreamController"); + return controller.@cancel(controller, reason).@then(function () {}); } -function readableStreamDefaultControllerShouldCallPull(controller) -{ - "use strict"; +function readableStreamDefaultControllerCancel(controller, reason) { + "use strict"; - const stream = @getByIdDirectPrivate(controller, "controlledReadableStream"); - - if (!@readableStreamDefaultControllerCanCloseOrEnqueue(controller)) - return false; - if (!(@getByIdDirectPrivate(controller, "started") === 1)) - return false; - if ((!@isReadableStreamLocked(stream) || !@getByIdDirectPrivate(@getByIdDirectPrivate(stream, "reader"), "readRequests")?.isNotEmpty()) && @readableStreamDefaultControllerGetDesiredSize(controller) <= 0) - return false; - const desiredSize = @readableStreamDefaultControllerGetDesiredSize(controller); - @assert(desiredSize !== null); - return desiredSize > 0; + @putByIdDirectPrivate(controller, "queue", @newQueue()); + return @getByIdDirectPrivate(controller, "cancelAlgorithm").@call( + @undefined, + reason + ); } -function readableStreamDefaultControllerCallPullIfNeeded(controller) -{ - "use strict"; +function readableStreamDefaultControllerPull(controller) { + "use strict"; + + var queue = @getByIdDirectPrivate(controller, "queue"); + if (queue.content.isNotEmpty()) { + const chunk = @dequeueValue(queue); + if ( + @getByIdDirectPrivate(controller, "closeRequested") && + queue.content.isEmpty() + ) + @readableStreamClose( + @getByIdDirectPrivate(controller, "controlledReadableStream") + ); + else @readableStreamDefaultControllerCallPullIfNeeded(controller); + + return @createFulfilledPromise({ value: chunk, done: false }); + } + const pendingPromise = @readableStreamAddReadRequest( + @getByIdDirectPrivate(controller, "controlledReadableStream") + ); + @readableStreamDefaultControllerCallPullIfNeeded(controller); + return pendingPromise; +} - // FIXME: use @readableStreamDefaultControllerShouldCallPull - const stream = @getByIdDirectPrivate(controller, "controlledReadableStream"); +function readableStreamDefaultControllerClose(controller) { + "use strict"; - if (!@readableStreamDefaultControllerCanCloseOrEnqueue(controller)) - return; - if (!(@getByIdDirectPrivate(controller, "started") === 1)) - return; - if ((!@isReadableStreamLocked(stream) || !@getByIdDirectPrivate(@getByIdDirectPrivate(stream, "reader"), "readRequests")?.isNotEmpty()) && @readableStreamDefaultControllerGetDesiredSize(controller) <= 0) - return; + @assert(@readableStreamDefaultControllerCanCloseOrEnqueue(controller)); + @putByIdDirectPrivate(controller, "closeRequested", true); + if (@getByIdDirectPrivate(controller, "queue")?.content?.isEmpty()) + @readableStreamClose( + @getByIdDirectPrivate(controller, "controlledReadableStream") + ); +} - if (@getByIdDirectPrivate(controller, "pulling")) { - @putByIdDirectPrivate(controller, "pullAgain", true); - return; +function readableStreamClose(stream) { + "use strict"; + + @assert(@getByIdDirectPrivate(stream, "state") === @streamReadable); + @putByIdDirectPrivate(stream, "state", @streamClosed); + if (!@getByIdDirectPrivate(stream, "reader")) return; + + if ( + @isReadableStreamDefaultReader(@getByIdDirectPrivate(stream, "reader")) + ) { + const requests = @getByIdDirectPrivate( + @getByIdDirectPrivate(stream, "reader"), + "readRequests" + ); + if (requests.isNotEmpty()) { + @putByIdDirectPrivate( + @getByIdDirectPrivate(stream, "reader"), + "readRequests", + @createFIFO() + ); + + for (var request = requests.shift(); request; request = requests.shift()) + @fulfillPromise(request, { value: @undefined, done: true }); } + } - - @assert(!@getByIdDirectPrivate(controller, "pullAgain")); - @putByIdDirectPrivate(controller, "pulling", true); - - @getByIdDirectPrivate(controller, "pullAlgorithm").@call(@undefined).@then(function() { - @putByIdDirectPrivate(controller, "pulling", false); - if (@getByIdDirectPrivate(controller, "pullAgain")) { - @putByIdDirectPrivate(controller, "pullAgain", false); - - @readableStreamDefaultControllerCallPullIfNeeded(controller); - } - }, function(error) { - @readableStreamDefaultControllerError(controller, error); - }); + @getByIdDirectPrivate( + @getByIdDirectPrivate(stream, "reader"), + "closedPromiseCapability" + ).@resolve.@call(); } -function isReadableStreamLocked(stream) -{ - "use strict"; +function readableStreamFulfillReadRequest(stream, chunk, done) { + "use strict"; + const readRequest = @getByIdDirectPrivate( + @getByIdDirectPrivate(stream, "reader"), + "readRequests" + ).shift(); + @fulfillPromise(readRequest, { value: chunk, done: done }); +} - @assert(@isReadableStream(stream)); - return !!@getByIdDirectPrivate(stream, "reader"); +function readableStreamDefaultControllerEnqueue(controller, chunk) { + "use strict"; + + const stream = @getByIdDirectPrivate(controller, "controlledReadableStream"); + // this is checked by callers + @assert(@readableStreamDefaultControllerCanCloseOrEnqueue(controller)); + + if ( + @isReadableStreamLocked(stream) && + @getByIdDirectPrivate( + @getByIdDirectPrivate(stream, "reader"), + "readRequests" + )?.isNotEmpty() + ) { + @readableStreamFulfillReadRequest(stream, chunk, false); + @readableStreamDefaultControllerCallPullIfNeeded(controller); + return; + } + + try { + let chunkSize = 1; + if (@getByIdDirectPrivate(controller, "strategy").size !== @undefined) + chunkSize = @getByIdDirectPrivate(controller, "strategy").size(chunk); + @enqueueValueWithSize( + @getByIdDirectPrivate(controller, "queue"), + chunk, + chunkSize + ); + } catch (error) { + @readableStreamDefaultControllerError(controller, error); + throw error; + } + @readableStreamDefaultControllerCallPullIfNeeded(controller); } -function readableStreamDefaultControllerGetDesiredSize(controller) -{ - "use strict"; +function readableStreamDefaultReaderRead(reader) { + "use strict"; - const stream = @getByIdDirectPrivate(controller, "controlledReadableStream"); - const state = @getByIdDirectPrivate(stream, "state"); + const stream = @getByIdDirectPrivate(reader, "ownerReadableStream"); + @assert(!!stream); + const state = @getByIdDirectPrivate(stream, "state"); - if (state === @streamErrored) - return null; - if (state === @streamClosed) - return 0; + @putByIdDirectPrivate(stream, "disturbed", true); + if (state === @streamClosed) + return @createFulfilledPromise({ value: @undefined, done: true }); + if (state === @streamErrored) + return @Promise.@reject(@getByIdDirectPrivate(stream, "storedError")); + @assert(state === @streamReadable); - return @getByIdDirectPrivate(controller, "strategy").highWaterMark - @getByIdDirectPrivate(controller, "queue").size; + return @getByIdDirectPrivate(stream, "readableStreamController").@pull( + @getByIdDirectPrivate(stream, "readableStreamController") + ); } +function readableStreamAddReadRequest(stream) { + "use strict"; -function readableStreamReaderGenericCancel(reader, reason) -{ - "use strict"; + @assert( + @isReadableStreamDefaultReader(@getByIdDirectPrivate(stream, "reader")) + ); + @assert(@getByIdDirectPrivate(stream, "state") == @streamReadable); - const stream = @getByIdDirectPrivate(reader, "ownerReadableStream"); - @assert(!!stream); - return @readableStreamCancel(stream, reason); -} - -function readableStreamCancel(stream, reason) -{ - "use strict"; + const readRequest = @newPromise(); - @putByIdDirectPrivate(stream, "disturbed", true); - const state = @getByIdDirectPrivate(stream, "state"); - if (state === @streamClosed) - return @Promise.@resolve(); - if (state === @streamErrored) - return @Promise.@reject(@getByIdDirectPrivate(stream, "storedError")); - @readableStreamClose(stream); + @getByIdDirectPrivate( + @getByIdDirectPrivate(stream, "reader"), + "readRequests" + ).push(readRequest); - var controller = @getByIdDirectPrivate(stream, "readableStreamController"); - return controller.@cancel(controller, reason).@then(function() { }); + return readRequest; } -function readableStreamDefaultControllerCancel(controller, reason) -{ - "use strict"; +function isReadableStreamDisturbed(stream) { + "use strict"; - @putByIdDirectPrivate(controller, "queue", @newQueue()); - return @getByIdDirectPrivate(controller, "cancelAlgorithm").@call(@undefined, reason); + @assert(@isReadableStream(stream)); + return @getByIdDirectPrivate(stream, "disturbed"); } -function readableStreamDefaultControllerPull(controller) -{ - "use strict"; - - var queue = @getByIdDirectPrivate(controller, "queue"); - if (queue.content.isNotEmpty()) { - const chunk = @dequeueValue(queue); - if (@getByIdDirectPrivate(controller, "closeRequested") && queue.content.isEmpty()) - @readableStreamClose(@getByIdDirectPrivate(controller, "controlledReadableStream")); - else - @readableStreamDefaultControllerCallPullIfNeeded(controller); +function readableStreamReaderGenericRelease(reader) { + "use strict"; + + @assert(!!@getByIdDirectPrivate(reader, "ownerReadableStream")); + @assert( + @getByIdDirectPrivate( + @getByIdDirectPrivate(reader, "ownerReadableStream"), + "reader" + ) === reader + ); + + if ( + @getByIdDirectPrivate( + @getByIdDirectPrivate(reader, "ownerReadableStream"), + "state" + ) === @streamReadable + ) + @getByIdDirectPrivate(reader, "closedPromiseCapability").@reject.@call( + @undefined, + @makeTypeError( + "releasing lock of reader whose stream is still in readable state" + ) + ); + else + @putByIdDirectPrivate(reader, "closedPromiseCapability", { + @promise: @newHandledRejectedPromise( + @makeTypeError("reader released lock") + ), + }); - return @createFulfilledPromise({ value: chunk, done: false }); - } - const pendingPromise = @readableStreamAddReadRequest(@getByIdDirectPrivate(controller, "controlledReadableStream")); - @readableStreamDefaultControllerCallPullIfNeeded(controller); - return pendingPromise; + const promise = @getByIdDirectPrivate( + reader, + "closedPromiseCapability" + ).@promise; + @markPromiseAsHandled(promise); + @putByIdDirectPrivate( + @getByIdDirectPrivate(reader, "ownerReadableStream"), + "reader", + @undefined + ); + @putByIdDirectPrivate(reader, "ownerReadableStream", @undefined); } -function readableStreamDefaultControllerClose(controller) -{ - "use strict"; +function readableStreamDefaultControllerCanCloseOrEnqueue(controller) { + "use strict"; - @assert(@readableStreamDefaultControllerCanCloseOrEnqueue(controller)); - @putByIdDirectPrivate(controller, "closeRequested", true); - if (@getByIdDirectPrivate(controller, "queue")?.content?.isEmpty()) - @readableStreamClose(@getByIdDirectPrivate(controller, "controlledReadableStream")); + return ( + !@getByIdDirectPrivate(controller, "closeRequested") && + @getByIdDirectPrivate( + @getByIdDirectPrivate(controller, "controlledReadableStream"), + "state" + ) === @streamReadable + ); } -function readableStreamClose(stream) -{ - "use strict"; - - @assert(@getByIdDirectPrivate(stream, "state") === @streamReadable); - @putByIdDirectPrivate(stream, "state", @streamClosed); - if (!@getByIdDirectPrivate(stream, "reader")) - return; - - if (@isReadableStreamDefaultReader(@getByIdDirectPrivate(stream, "reader"))) { - const requests = @getByIdDirectPrivate(@getByIdDirectPrivate(stream, "reader"), "readRequests"); - if (requests.isNotEmpty()) { - @putByIdDirectPrivate(@getByIdDirectPrivate(stream, "reader"), "readRequests", @createFIFO()); - - for (var request = requests.shift(); request; request = requests.shift()) - @fulfillPromise(request, { value: @undefined, done: true }); - } +function lazyLoadStream(stream, autoAllocateChunkSize) { + "use strict"; + + var nativeType = @getByIdDirectPrivate(stream, "bunNativeType"); + var nativePtr = @getByIdDirectPrivate(stream, "bunNativePtr"); + var cached = @lazyStreamPrototypeMap; + var Prototype = cached.@get(nativeType); + if (Prototype === @undefined) { + var [pull, start, cancel, setClose, deinit] = @lazyLoad(nativeType); + var closer = [false]; + var handleResult; + function handleNativeReadableStreamPromiseResult(val) { + "use strict"; + var { c, v } = this; + this.c = @undefined; + this.v = @undefined; + handleResult(val, c, v); } - @getByIdDirectPrivate(@getByIdDirectPrivate(stream, "reader"), "closedPromiseCapability").@resolve.@call(); -} + handleResult = function handleResult(result, controller, view) { + "use strict"; -function readableStreamFulfillReadRequest(stream, chunk, done) -{ - "use strict"; - const readRequest = @getByIdDirectPrivate(@getByIdDirectPrivate(stream, "reader"), "readRequests").shift(); - @fulfillPromise(readRequest, { value: chunk, done: done }); -} - -function readableStreamDefaultControllerEnqueue(controller, chunk) -{ - "use strict"; + if (result && @isPromise(result)) { + return result.then( + handleNativeReadableStreamPromiseResult.bind({ + c: controller, + v: view, + }), + (err) => controller.error(err) + ); + } else if (result !== false) { + if (view && view.byteLength === result) { + controller.byobRequest.respondWithNewView(view); + } else { + controller.byobRequest.respond(result); + } + } - const stream = @getByIdDirectPrivate(controller, "controlledReadableStream"); - // this is checked by callers - @assert(@readableStreamDefaultControllerCanCloseOrEnqueue(controller)); + if (closer[0] || result === false) { + @enqueueJob(() => controller.close()); + closer[0] = false; + } + }; - if (@isReadableStreamLocked(stream) && @getByIdDirectPrivate(@getByIdDirectPrivate(stream, "reader"), "readRequests")?.isNotEmpty()) { - @readableStreamFulfillReadRequest(stream, chunk, false); - @readableStreamDefaultControllerCallPullIfNeeded(controller); - return; - } + Prototype = class NativeReadableStreamSource { + constructor(tag, autoAllocateChunkSize) { + this.pull = this.pull_.bind(tag); + this.cancel = this.cancel_.bind(tag); + this.autoAllocateChunkSize = autoAllocateChunkSize; + } - try { - let chunkSize = 1; - if (@getByIdDirectPrivate(controller, "strategy").size !== @undefined) - chunkSize = @getByIdDirectPrivate(controller, "strategy").size(chunk); - @enqueueValueWithSize(@getByIdDirectPrivate(controller, "queue"), chunk, chunkSize); - } - catch(error) { - @readableStreamDefaultControllerError(controller, error); - throw error; - } - @readableStreamDefaultControllerCallPullIfNeeded(controller); -} + pull; + cancel; -function readableStreamDefaultReaderRead(reader) -{ - "use strict"; + type = "bytes"; + autoAllocateChunkSize = 0; - const stream = @getByIdDirectPrivate(reader, "ownerReadableStream"); - @assert(!!stream); - const state = @getByIdDirectPrivate(stream, "state"); + static startSync = start; - @putByIdDirectPrivate(stream, "disturbed", true); - if (state === @streamClosed) - return @createFulfilledPromise({ value: @undefined, done: true }); - if (state === @streamErrored) - return @Promise.@reject(@getByIdDirectPrivate(stream, "storedError")); - @assert(state === @streamReadable); + pull_(controller) { + closer[0] = false; + var result; - return @getByIdDirectPrivate(stream, "readableStreamController").@pull(@getByIdDirectPrivate(stream, "readableStreamController")); -} + const view = controller.byobRequest.view; + try { + result = pull(this, view, closer); + } catch (err) { + return controller.error(err); + } -function readableStreamAddReadRequest(stream) -{ - "use strict"; + return handleResult(result, controller, view); + } - @assert(@isReadableStreamDefaultReader(@getByIdDirectPrivate(stream, "reader"))); - @assert(@getByIdDirectPrivate(stream, "state") == @streamReadable); + cancel_(reason) { + cancel(this, reason); + } + static deinit = deinit; + static registry = new FinalizationRegistry(deinit); + }; + cached.@set(nativeType, Prototype); + } - const readRequest = @newPromise(); - - @getByIdDirectPrivate(@getByIdDirectPrivate(stream, "reader"), "readRequests").push(readRequest); + const chunkSize = Prototype.startSync(nativePtr, autoAllocateChunkSize); - return readRequest; + // empty file, no need for native back-and-forth on this + if (chunkSize === 0) { + @readableStreamClose(stream); + return null; + } + var instance = new Prototype(nativePtr, chunkSize); + Prototype.registry.register(instance, nativePtr); + return instance; } -function isReadableStreamDisturbed(stream) -{ - "use strict"; +function readableStreamIntoArray(stream) { + "use strict"; - @assert(@isReadableStream(stream)); - return @getByIdDirectPrivate(stream, "disturbed"); -} + var reader = stream.getReader(); + var manyResult = reader.readMany(); -function readableStreamReaderGenericRelease(reader) -{ - "use strict"; + async function processManyResult(result) { + if (result.done) { + return []; + } - @assert(!!@getByIdDirectPrivate(reader, "ownerReadableStream")); - @assert(@getByIdDirectPrivate(@getByIdDirectPrivate(reader, "ownerReadableStream"), "reader") === reader); + var chunks = result.value || []; - if (@getByIdDirectPrivate(@getByIdDirectPrivate(reader, "ownerReadableStream"), "state") === @streamReadable) - @getByIdDirectPrivate(reader, "closedPromiseCapability").@reject.@call(@undefined, @makeTypeError("releasing lock of reader whose stream is still in readable state")); - else - @putByIdDirectPrivate(reader, "closedPromiseCapability", { @promise: @newHandledRejectedPromise(@makeTypeError("reader released lock")) }); + while (true) { + var thisResult = await reader.read(); + if (thisResult.done) { + break; + } + chunks = chunks.concat(thisResult.value); + } - const promise = @getByIdDirectPrivate(reader, "closedPromiseCapability").@promise; - @markPromiseAsHandled(promise); - @putByIdDirectPrivate(@getByIdDirectPrivate(reader, "ownerReadableStream"), "reader", @undefined); - @putByIdDirectPrivate(reader, "ownerReadableStream", @undefined); -} + return chunks; + } -function readableStreamDefaultControllerCanCloseOrEnqueue(controller) -{ - "use strict"; + if (manyResult && @isPromise(manyResult)) { + return manyResult.@then(processManyResult); + } - return !@getByIdDirectPrivate(controller, "closeRequested") && @getByIdDirectPrivate(@getByIdDirectPrivate(controller, "controlledReadableStream"), "state") === @streamReadable; + return processManyResult(manyResult); } +function readableStreamIntoText(stream) { + "use strict"; + + const [textStream, closer] = @createTextStream( + @getByIdDirectPrivate(stream, "highWaterMark") + ); + const prom = @readStreamIntoSink(stream, textStream, false); + if (prom && @isPromise(prom)) { + return @Promise.@resolve(prom).@then(closer.@promise); + } + return closer.@promise; +} -function lazyLoadStream(stream, autoAllocateChunkSize) { - "use strict"; - - var nativeType = @getByIdDirectPrivate(stream, "bunNativeType"); - var nativePtr = @getByIdDirectPrivate(stream, "bunNativePtr"); - var cached = @lazyStreamPrototypeMap; - var Prototype = cached.@get(nativeType); - if (Prototype === @undefined) { - var [pull, start, cancel, setClose, deinit] = @lazyLoad(nativeType); - var closer = [false]; - var handleResult; - function handleNativeReadableStreamPromiseResult(val) { - "use strict"; - var {c, v} = this; - this.c = @undefined; - this.v = @undefined; - handleResult(val, c, v); +function readableStreamToArrayBufferDirect(stream, underlyingSource) { + "use strict"; + + var sink = new globalThis.Bun.ArrayBufferSink(); + @putByIdDirectPrivate(stream, "underlyingSource", @undefined); + var highWaterMark = @getByIdDirectPrivate(stream, "highWaterMark"); + sink.start(highWaterMark ? { highWaterMark } : {}); + var capability = @newPromiseCapability(@Promise); + var ended = false; + var pull = underlyingSource.pull; + var close = underlyingSource.close; + + var controller = { + start() {}, + close(reason) { + if (!ended) { + ended = true; + if (close) { + close(); } - - handleResult = function handleResult(result, controller, view) { - "use strict"; - - if (result && @isPromise(result)) { - return result.then(handleNativeReadableStreamPromiseResult.bind({c: controller, v: view}), (err) => controller.error(err)); - } else if (result !== false) { - if (view && view.byteLength === result) { - controller.byobRequest.respondWithNewView(view); - } else { - controller.byobRequest.respond(result); - } - } - - if (closer[0] || result === false) { - @enqueueJob(() => controller.close()); - closer[0] = false; - } - }; - - Prototype = class NativeReadableStreamSource { - constructor(tag, autoAllocateChunkSize) { - this.pull = this.pull_.bind(tag); - this.cancel = this.cancel_.bind(tag); - this.autoAllocateChunkSize = autoAllocateChunkSize; - } - pull; - cancel; + @fulfillPromise(capability.@promise, sink.end()); + } + }, + end() { + if (!ended) { + ended = true; + if (close) { + close(); + } + @fulfillPromise(capability.@promise, sink.end()); + } + }, + drain() { + return 0; + }, + write: sink.write.bind(sink), + }; + + var didError = false; + try { + const firstPull = pull(controller); + if (firstPull && @isObject(firstPull) && @isPromise(firstPull)) { + return (async function (controller, promise, pull) { + while (!ended) { + await pull(controller); + } + return await promise; + })(controller, promise, pull); + } - type = "bytes"; - autoAllocateChunkSize = 0; + return capability.@promise; + } catch (e) { + didError = true; + @readableStreamError(stream, e); + return @Promise.@reject(e); + } finally { + if (!didError && stream) @readableStreamClose(stream); + controller = close = sink = pull = stream = @undefined; + } +} - static startSync = start; - - pull_(controller) { - closer[0] = false; - var result; +async function readableStreamToTextDirect(stream, underlyingSource) { + "use strict"; + const capability = @initializeTextStream.@call( + stream, + underlyingSource, + @undefined + ); + var reader = stream.getReader(); + + while (@getByIdDirectPrivate(stream, "state") === @streamReadable) { + var thisResult = await reader.read(); + if (thisResult.done) { + break; + } + } - const view = controller.byobRequest.view; - try { - result = pull(this, view, closer); - } catch(err) { - return controller.error(err); - } + try { + reader.releaseLock(); + } catch (e) {} + reader = @undefined; + stream = @undefined; - return handleResult(result, controller, view); - } + return capability.@promise; +} - cancel_(reason) { - cancel(this, reason); - } - static deinit = deinit; - static registry = new FinalizationRegistry(deinit); - } - cached.@set(nativeType, Prototype); +async function readableStreamToArrayDirect(stream, underlyingSource) { + const capability = @initializeArrayStream.@call( + stream, + underlyingSource, + @undefined + ); + underlyingSource = @undefined; + var reader = stream.getReader(); + try { + while (@getByIdDirectPrivate(stream, "state") === @streamReadable) { + var thisResult = await reader.read(); + if (thisResult.done) { + break; + } } - const chunkSize = Prototype.startSync(nativePtr, autoAllocateChunkSize); - - // empty file, no need for native back-and-forth on this - if (chunkSize === 0) { - @readableStreamClose(stream); - return null; - } - var instance = new Prototype(nativePtr, chunkSize); - Prototype.registry.register(instance, nativePtr); - return instance; -}
\ No newline at end of file + try { + reader.releaseLock(); + } catch (e) {} + reader = @undefined; + + return @Promise.@resolve(capability.@promise); + } catch (e) { + throw e; + } finally { + stream = @undefined; + reader = @undefined; + } + + return capability.@promise; +} diff --git a/src/bun.js/webcore/encoding.zig b/src/bun.js/webcore/encoding.zig index 102da12d5..690b2e86d 100644 --- a/src/bun.js/webcore/encoding.zig +++ b/src/bun.js/webcore/encoding.zig @@ -56,6 +56,7 @@ pub const TextEncoder = struct { // unless it's huge // JSC will GC Uint8Array that occupy less than 512 bytes // so it's extra good for that case + // this also means there won't be reallocations for small strings var buf: [2048]u8 = undefined; var ctx = globalThis.ref(); diff --git a/src/bun.js/webcore/streams.zig b/src/bun.js/webcore/streams.zig index 1bbd6cdba..9c5ce2c8e 100644 --- a/src/bun.js/webcore/streams.zig +++ b/src/bun.js/webcore/streams.zig @@ -1749,6 +1749,7 @@ pub fn HTTPServerWritable(comptime ssl: bool) type { return .{ .owned = @truncate(Blob.SizeType, written) }; } + // In this case, it's always an error pub fn end(this: *@This(), err: ?JSC.Node.Syscall.Error) JSC.Node.Maybe(void) { log("end({s})", .{err}); @@ -1770,7 +1771,8 @@ pub fn HTTPServerWritable(comptime ssl: bool) type { if (readable.len == 0) { this.signal.close(err); this.done = true; - this.res.endStream(false); + // we do not close the stream here + // this.res.endStream(false); this.finalize(); return .{ .result = {} }; } @@ -1778,6 +1780,10 @@ pub fn HTTPServerWritable(comptime ssl: bool) type { if (!this.hasBackpressure()) { if (this.send(readable)) { this.handleWrote(readable.len); + this.signal.close(err); + this.done = true; + this.res.endStream(false); + this.finalize(); return .{ .result = {} }; } @@ -1852,6 +1858,8 @@ pub fn HTTPServerWritable(comptime ssl: bool) type { this.finalize(); } + // This can be called _many_ times for the same instance + // so it must zero out state instead of make it pub fn finalize(this: *@This()) void { log("finalize()", .{}); @@ -1866,7 +1874,7 @@ pub fn HTTPServerWritable(comptime ssl: bool) type { this.buffer = bun.ByteList.init(""); this.pooled_buffer = null; pooled.release(); - } else if (!ByteListPool.full()) { + } else if (this.buffer.cap == 0) {} else if (!ByteListPool.full()) { var entry = ByteListPool.get(this.allocator); entry.data = this.buffer; this.buffer = bun.ByteList.init(""); |