diff options
Diffstat (limited to 'src/bun.js/builtins/js/ReadableStream.js')
-rw-r--r-- | src/bun.js/builtins/js/ReadableStream.js | 89 |
1 files changed, 31 insertions, 58 deletions
diff --git a/src/bun.js/builtins/js/ReadableStream.js b/src/bun.js/builtins/js/ReadableStream.js index 7f1723b12..95f379be5 100644 --- a/src/bun.js/builtins/js/ReadableStream.js +++ b/src/bun.js/builtins/js/ReadableStream.js @@ -46,6 +46,7 @@ function initializeReadableStream(underlyingSource, strategy) @putByIdDirectPrivate(this, "storedError", @undefined); @putByIdDirectPrivate(this, "disturbed", false); + // Initialized with null value to enable distinction with undefined case. @putByIdDirectPrivate(this, "readableStreamController", null); @@ -62,6 +63,7 @@ function initializeReadableStream(underlyingSource, strategy) if (@getByIdDirectPrivate(underlyingSource, "pull") !== @undefined && !isLazy) { const size = @getByIdDirectPrivate(strategy, "size"); const highWaterMark = @getByIdDirectPrivate(strategy, "highWaterMark"); + @putByIdDirectPrivate(this, "highWaterMark", highWaterMark); @putByIdDirectPrivate(this, "underlyingSource", @undefined); @setupReadableStreamDefaultController(this, underlyingSource, size, highWaterMark !== @undefined ? highWaterMark : 1, @getByIdDirectPrivate(underlyingSource, "start"), @getByIdDirectPrivate(underlyingSource, "pull"), @getByIdDirectPrivate(underlyingSource, "cancel")); @@ -69,10 +71,13 @@ function initializeReadableStream(underlyingSource, strategy) } if (isDirect) { @putByIdDirectPrivate(this, "underlyingSource", underlyingSource); + @putByIdDirectPrivate(this, "highWaterMark", @getByIdDirectPrivate(strategy, "highWaterMark")); @putByIdDirectPrivate(this, "start", () => @createReadableStreamController(this, underlyingSource, strategy)); } else if (isLazy) { const autoAllocateChunkSize = underlyingSource.autoAllocateChunkSize; + @putByIdDirectPrivate(this, "highWaterMark", @undefined); @putByIdDirectPrivate(this, "underlyingSource", @undefined); + @putByIdDirectPrivate(this, "highWaterMark", autoAllocateChunkSize || @getByIdDirectPrivate(strategy, "highWaterMark")); @putByIdDirectPrivate(this, "start", () => { @@ -83,6 +88,7 @@ function initializeReadableStream(underlyingSource, strategy) }); } else { @putByIdDirectPrivate(this, "underlyingSource", @undefined); + @putByIdDirectPrivate(this, "highWaterMark", @getByIdDirectPrivate(strategy, "highWaterMark")); @putByIdDirectPrivate(this, "start", @undefined); @createReadableStreamController(this, underlyingSource, strategy); } @@ -96,42 +102,54 @@ function initializeReadableStream(underlyingSource, strategy) function readableStreamToArray(stream) { "use strict"; - if (!stream || @getByIdDirectPrivate(stream, "state") === @streamClosed) { - return null; + // this is a direct stream + var underlyingSource = @getByIdDirectPrivate(stream, "underlyingSource"); + if (underlyingSource !== @undefined) { + const promise = @initializeArrayStream.@call(stream, underlyingSource, @undefined); + var reader = stream.getReader(); + return (async function() { + while (@getByIdDirectPrivate(stream, "state") === @streamReadable) { + var thisResult = await reader.read(); + if (thisResult.done) { + break; + } + } + + try { + reader.releaseLock(); + } catch(e) { + } + + return await promise; + })(); } + var reader = stream.getReader(); var manyResult = reader.readMany(); async function processManyResult(result) { if (result.done) { - return null; + return []; } var chunks = result.value || []; while (true) { var thisResult = await reader.read(); - if (thisResult.done) { - return chunks; + break; } - - chunks.push(thisResult.value); + chunks = chunks.concat(thisResult.value); } return chunks; - }; - + } if (manyResult && @isPromise(manyResult)) { return manyResult.@then(processManyResult); } - if (manyResult && manyResult.done) { - return null; - } - return processManyResult(manyResult); } @@ -177,7 +195,6 @@ function readableStreamToText(stream) { function readableStreamToJSON(stream) { "use strict"; - // TODO: optimize this to skip the extra ArrayBuffer return @readableStreamToText(stream).@then(globalThis.JSON.parse); } @@ -213,50 +230,6 @@ function readableStreamToBlob(stream) { } @globalPrivate -function readableStreamToArrayPublic(stream) { - "use strict"; - - if (@getByIdDirectPrivate(stream, "state") === @streamClosed) { - return []; - } - var reader = stream.getReader(); - - var manyResult = reader.readMany(); - - var processManyResult = (0, (async function(result) { - if (result.done) { - return []; - } - - var chunks = result.value || []; - - while (true) { - var thisResult = await reader.read(); - if (thisResult.done) { - return chunks; - } - - chunks.push(thisResult.value); - } - - return chunks; - })); - - - if (manyResult && @isPromise(manyResult)) { - return manyResult.then(processManyResult); - } - - if (manyResult && manyResult.done) { - return []; - } - - return processManyResult(manyResult); -} - - - -@globalPrivate function consumeReadableStream(nativePtr, nativeType, inputStream) { "use strict"; const symbol = globalThis.Symbol.for("Bun.consumeReadableStreamPrototype"); |