aboutsummaryrefslogtreecommitdiff
path: root/src/bun.js/builtins/js/ReadableStream.js
diff options
context:
space:
mode:
authorGravatar Jarred Sumner <709451+Jarred-Sumner@users.noreply.github.com> 2022-06-29 07:08:37 -0700
committerGravatar Jarred Sumner <709451+Jarred-Sumner@users.noreply.github.com> 2022-06-29 07:08:37 -0700
commit013dfb04988d2bb569d3872d22d17648d95c32ca (patch)
treefa10b508dd56190609000805203b8e6f71a0bad1 /src/bun.js/builtins/js/ReadableStream.js
parent7168a4032837af414197ccfbf10c72da0815b661 (diff)
downloadbun-013dfb04988d2bb569d3872d22d17648d95c32ca.tar.gz
bun-013dfb04988d2bb569d3872d22d17648d95c32ca.tar.zst
bun-013dfb04988d2bb569d3872d22d17648d95c32ca.zip
[web streams] Handle direct streams in `readableStreamToArray`
Diffstat (limited to 'src/bun.js/builtins/js/ReadableStream.js')
-rw-r--r--src/bun.js/builtins/js/ReadableStream.js89
1 files changed, 31 insertions, 58 deletions
diff --git a/src/bun.js/builtins/js/ReadableStream.js b/src/bun.js/builtins/js/ReadableStream.js
index 7f1723b12..95f379be5 100644
--- a/src/bun.js/builtins/js/ReadableStream.js
+++ b/src/bun.js/builtins/js/ReadableStream.js
@@ -46,6 +46,7 @@ function initializeReadableStream(underlyingSource, strategy)
@putByIdDirectPrivate(this, "storedError", @undefined);
@putByIdDirectPrivate(this, "disturbed", false);
+
// Initialized with null value to enable distinction with undefined case.
@putByIdDirectPrivate(this, "readableStreamController", null);
@@ -62,6 +63,7 @@ function initializeReadableStream(underlyingSource, strategy)
if (@getByIdDirectPrivate(underlyingSource, "pull") !== @undefined && !isLazy) {
const size = @getByIdDirectPrivate(strategy, "size");
const highWaterMark = @getByIdDirectPrivate(strategy, "highWaterMark");
+ @putByIdDirectPrivate(this, "highWaterMark", highWaterMark);
@putByIdDirectPrivate(this, "underlyingSource", @undefined);
@setupReadableStreamDefaultController(this, underlyingSource, size, highWaterMark !== @undefined ? highWaterMark : 1, @getByIdDirectPrivate(underlyingSource, "start"), @getByIdDirectPrivate(underlyingSource, "pull"), @getByIdDirectPrivate(underlyingSource, "cancel"));
@@ -69,10 +71,13 @@ function initializeReadableStream(underlyingSource, strategy)
}
if (isDirect) {
@putByIdDirectPrivate(this, "underlyingSource", underlyingSource);
+ @putByIdDirectPrivate(this, "highWaterMark", @getByIdDirectPrivate(strategy, "highWaterMark"));
@putByIdDirectPrivate(this, "start", () => @createReadableStreamController(this, underlyingSource, strategy));
} else if (isLazy) {
const autoAllocateChunkSize = underlyingSource.autoAllocateChunkSize;
+ @putByIdDirectPrivate(this, "highWaterMark", @undefined);
@putByIdDirectPrivate(this, "underlyingSource", @undefined);
+ @putByIdDirectPrivate(this, "highWaterMark", autoAllocateChunkSize || @getByIdDirectPrivate(strategy, "highWaterMark"));
@putByIdDirectPrivate(this, "start", () => {
@@ -83,6 +88,7 @@ function initializeReadableStream(underlyingSource, strategy)
});
} else {
@putByIdDirectPrivate(this, "underlyingSource", @undefined);
+ @putByIdDirectPrivate(this, "highWaterMark", @getByIdDirectPrivate(strategy, "highWaterMark"));
@putByIdDirectPrivate(this, "start", @undefined);
@createReadableStreamController(this, underlyingSource, strategy);
}
@@ -96,42 +102,54 @@ function initializeReadableStream(underlyingSource, strategy)
function readableStreamToArray(stream) {
"use strict";
- if (!stream || @getByIdDirectPrivate(stream, "state") === @streamClosed) {
- return null;
+ // this is a direct stream
+ var underlyingSource = @getByIdDirectPrivate(stream, "underlyingSource");
+ if (underlyingSource !== @undefined) {
+ const promise = @initializeArrayStream.@call(stream, underlyingSource, @undefined);
+ var reader = stream.getReader();
+ return (async function() {
+ while (@getByIdDirectPrivate(stream, "state") === @streamReadable) {
+ var thisResult = await reader.read();
+ if (thisResult.done) {
+ break;
+ }
+ }
+
+ try {
+ reader.releaseLock();
+ } catch(e) {
+ }
+
+ return await promise;
+ })();
}
+
var reader = stream.getReader();
var manyResult = reader.readMany();
async function processManyResult(result) {
if (result.done) {
- return null;
+ return [];
}
var chunks = result.value || [];
while (true) {
var thisResult = await reader.read();
-
if (thisResult.done) {
- return chunks;
+ break;
}
-
- chunks.push(thisResult.value);
+ chunks = chunks.concat(thisResult.value);
}
return chunks;
- };
-
+ }
if (manyResult && @isPromise(manyResult)) {
return manyResult.@then(processManyResult);
}
- if (manyResult && manyResult.done) {
- return null;
- }
-
return processManyResult(manyResult);
}
@@ -177,7 +195,6 @@ function readableStreamToText(stream) {
function readableStreamToJSON(stream) {
"use strict";
- // TODO: optimize this to skip the extra ArrayBuffer
return @readableStreamToText(stream).@then(globalThis.JSON.parse);
}
@@ -213,50 +230,6 @@ function readableStreamToBlob(stream) {
}
@globalPrivate
-function readableStreamToArrayPublic(stream) {
- "use strict";
-
- if (@getByIdDirectPrivate(stream, "state") === @streamClosed) {
- return [];
- }
- var reader = stream.getReader();
-
- var manyResult = reader.readMany();
-
- var processManyResult = (0, (async function(result) {
- if (result.done) {
- return [];
- }
-
- var chunks = result.value || [];
-
- while (true) {
- var thisResult = await reader.read();
- if (thisResult.done) {
- return chunks;
- }
-
- chunks.push(thisResult.value);
- }
-
- return chunks;
- }));
-
-
- if (manyResult && @isPromise(manyResult)) {
- return manyResult.then(processManyResult);
- }
-
- if (manyResult && manyResult.done) {
- return [];
- }
-
- return processManyResult(manyResult);
-}
-
-
-
-@globalPrivate
function consumeReadableStream(nativePtr, nativeType, inputStream) {
"use strict";
const symbol = globalThis.Symbol.for("Bun.consumeReadableStreamPrototype");