aboutsummaryrefslogtreecommitdiff
path: root/src/bun.js
diff options
context:
space:
mode:
authorGravatar Jarred Sumner <709451+Jarred-Sumner@users.noreply.github.com> 2022-06-29 07:08:37 -0700
committerGravatar Jarred Sumner <709451+Jarred-Sumner@users.noreply.github.com> 2022-06-29 07:08:37 -0700
commit013dfb04988d2bb569d3872d22d17648d95c32ca (patch)
treefa10b508dd56190609000805203b8e6f71a0bad1 /src/bun.js
parent7168a4032837af414197ccfbf10c72da0815b661 (diff)
downloadbun-013dfb04988d2bb569d3872d22d17648d95c32ca.tar.gz
bun-013dfb04988d2bb569d3872d22d17648d95c32ca.tar.zst
bun-013dfb04988d2bb569d3872d22d17648d95c32ca.zip
[web streams] Handle direct streams in `readableStreamToArray`
Diffstat (limited to 'src/bun.js')
-rw-r--r--src/bun.js/builtins/js/ReadableStream.js89
-rw-r--r--src/bun.js/builtins/js/ReadableStreamInternals.js105
2 files changed, 125 insertions, 69 deletions
diff --git a/src/bun.js/builtins/js/ReadableStream.js b/src/bun.js/builtins/js/ReadableStream.js
index 7f1723b12..95f379be5 100644
--- a/src/bun.js/builtins/js/ReadableStream.js
+++ b/src/bun.js/builtins/js/ReadableStream.js
@@ -46,6 +46,7 @@ function initializeReadableStream(underlyingSource, strategy)
@putByIdDirectPrivate(this, "storedError", @undefined);
@putByIdDirectPrivate(this, "disturbed", false);
+
// Initialized with null value to enable distinction with undefined case.
@putByIdDirectPrivate(this, "readableStreamController", null);
@@ -62,6 +63,7 @@ function initializeReadableStream(underlyingSource, strategy)
if (@getByIdDirectPrivate(underlyingSource, "pull") !== @undefined && !isLazy) {
const size = @getByIdDirectPrivate(strategy, "size");
const highWaterMark = @getByIdDirectPrivate(strategy, "highWaterMark");
+ @putByIdDirectPrivate(this, "highWaterMark", highWaterMark);
@putByIdDirectPrivate(this, "underlyingSource", @undefined);
@setupReadableStreamDefaultController(this, underlyingSource, size, highWaterMark !== @undefined ? highWaterMark : 1, @getByIdDirectPrivate(underlyingSource, "start"), @getByIdDirectPrivate(underlyingSource, "pull"), @getByIdDirectPrivate(underlyingSource, "cancel"));
@@ -69,10 +71,13 @@ function initializeReadableStream(underlyingSource, strategy)
}
if (isDirect) {
@putByIdDirectPrivate(this, "underlyingSource", underlyingSource);
+ @putByIdDirectPrivate(this, "highWaterMark", @getByIdDirectPrivate(strategy, "highWaterMark"));
@putByIdDirectPrivate(this, "start", () => @createReadableStreamController(this, underlyingSource, strategy));
} else if (isLazy) {
const autoAllocateChunkSize = underlyingSource.autoAllocateChunkSize;
+ @putByIdDirectPrivate(this, "highWaterMark", @undefined);
@putByIdDirectPrivate(this, "underlyingSource", @undefined);
+ @putByIdDirectPrivate(this, "highWaterMark", autoAllocateChunkSize || @getByIdDirectPrivate(strategy, "highWaterMark"));
@putByIdDirectPrivate(this, "start", () => {
@@ -83,6 +88,7 @@ function initializeReadableStream(underlyingSource, strategy)
});
} else {
@putByIdDirectPrivate(this, "underlyingSource", @undefined);
+ @putByIdDirectPrivate(this, "highWaterMark", @getByIdDirectPrivate(strategy, "highWaterMark"));
@putByIdDirectPrivate(this, "start", @undefined);
@createReadableStreamController(this, underlyingSource, strategy);
}
@@ -96,42 +102,54 @@ function initializeReadableStream(underlyingSource, strategy)
function readableStreamToArray(stream) {
"use strict";
- if (!stream || @getByIdDirectPrivate(stream, "state") === @streamClosed) {
- return null;
+ // this is a direct stream
+ var underlyingSource = @getByIdDirectPrivate(stream, "underlyingSource");
+ if (underlyingSource !== @undefined) {
+ const promise = @initializeArrayStream.@call(stream, underlyingSource, @undefined);
+ var reader = stream.getReader();
+ return (async function() {
+ while (@getByIdDirectPrivate(stream, "state") === @streamReadable) {
+ var thisResult = await reader.read();
+ if (thisResult.done) {
+ break;
+ }
+ }
+
+ try {
+ reader.releaseLock();
+ } catch(e) {
+ }
+
+ return await promise;
+ })();
}
+
var reader = stream.getReader();
var manyResult = reader.readMany();
async function processManyResult(result) {
if (result.done) {
- return null;
+ return [];
}
var chunks = result.value || [];
while (true) {
var thisResult = await reader.read();
-
if (thisResult.done) {
- return chunks;
+ break;
}
-
- chunks.push(thisResult.value);
+ chunks = chunks.concat(thisResult.value);
}
return chunks;
- };
-
+ }
if (manyResult && @isPromise(manyResult)) {
return manyResult.@then(processManyResult);
}
- if (manyResult && manyResult.done) {
- return null;
- }
-
return processManyResult(manyResult);
}
@@ -177,7 +195,6 @@ function readableStreamToText(stream) {
function readableStreamToJSON(stream) {
"use strict";
- // TODO: optimize this to skip the extra ArrayBuffer
return @readableStreamToText(stream).@then(globalThis.JSON.parse);
}
@@ -213,50 +230,6 @@ function readableStreamToBlob(stream) {
}
@globalPrivate
-function readableStreamToArrayPublic(stream) {
- "use strict";
-
- if (@getByIdDirectPrivate(stream, "state") === @streamClosed) {
- return [];
- }
- var reader = stream.getReader();
-
- var manyResult = reader.readMany();
-
- var processManyResult = (0, (async function(result) {
- if (result.done) {
- return [];
- }
-
- var chunks = result.value || [];
-
- while (true) {
- var thisResult = await reader.read();
- if (thisResult.done) {
- return chunks;
- }
-
- chunks.push(thisResult.value);
- }
-
- return chunks;
- }));
-
-
- if (manyResult && @isPromise(manyResult)) {
- return manyResult.then(processManyResult);
- }
-
- if (manyResult && manyResult.done) {
- return [];
- }
-
- return processManyResult(manyResult);
-}
-
-
-
-@globalPrivate
function consumeReadableStream(nativePtr, nativeType, inputStream) {
"use strict";
const symbol = globalThis.Symbol.for("Bun.consumeReadableStreamPrototype");
diff --git a/src/bun.js/builtins/js/ReadableStreamInternals.js b/src/bun.js/builtins/js/ReadableStreamInternals.js
index 2bd707622..ce8a85445 100644
--- a/src/bun.js/builtins/js/ReadableStreamInternals.js
+++ b/src/bun.js/builtins/js/ReadableStreamInternals.js
@@ -636,8 +636,17 @@ function assignToStream(stream, sink) {
@putByIdDirectPrivate(stream, "start", @undefined);
@putByIdDirectPrivate(stream, "underlyingSource", @undefined);
- @startDirectStream.@call(sink, stream, pull, close);
+ const highWaterMark = @getByIdDirectPrivate(stream, "highWaterMark");
+ if (highWaterMark) {
+ sink.start({
+ highWaterMark,
+ });
+ }
+
+ @startDirectStream.@call(sink, stream, pull, close);
+
+
// lock the stream, relying on close() or end() to eventaully close it
reader = stream.getReader();
@@ -665,7 +674,11 @@ function assignToStream(stream, sink) {
var wroteCount = many.value.length;
- sink.start();
+ const highWaterMark = @getByIdDirectPrivate(stream, "highWaterMark");
+
+ if (highWaterMark)
+ sink.start({highWaterMark});
+
for (var i = 0, values = many.value, length = many.value.length; i < length; i++) {
sink.write(values[i]);
}
@@ -948,32 +961,35 @@ function initializeTextStream(underlyingSource, highWaterMark)
var hasString = false;
var hasBuffer = false;
var rope = '';
- var estimatedLength = 0;
+ var estimatedLength = @toLength(0);
var closingPromise = @newPromise();
var calledDone = false;
+ var isView = @ArrayBuffer.@isView;
- var sink = {
+
+ sink = {
start() {
},
write(chunk) {
if (typeof chunk === 'string') {
- if (chunk.length > 0) {
+ var chunkLength = @toLength(chunk.length);
+ if (chunkLength > 0) {
rope += chunk;
hasString = true;
// TODO: utf16 byte length
- estimatedLength += chunk.length;
+ estimatedLength += chunkLength;
}
- return chunk.length;
+ return chunkLength;
}
- if (!chunk || !@isObject(chunk) || !(@ArrayBuffer.@isView(chunk) || chunk instanceof @ArrayBuffer)) {
+ if (!chunk || !@isObject(chunk) || !((isView(chunk)) || chunk instanceof @ArrayBuffer)) {
@throwTypeError("Expected text, ArrayBuffer or ArrayBufferView");
}
- const byteLength = chunk.byteLength;
+ const byteLength = @toLength(chunk.byteLength);
if (byteLength > 0) {
hasBuffer = true;
if (rope.length > 0) {
@@ -1051,7 +1067,6 @@ function initializeTextStream(underlyingSource, highWaterMark)
} catch(e) {
} finally {
- fifo.clear();
rope = '';
hasString = false;
hasBuffer = false;
@@ -1085,6 +1100,72 @@ function initializeTextStream(underlyingSource, highWaterMark)
return closingPromise;
}
+function initializeArrayStream(underlyingSource, highWaterMark)
+{
+ "use strict";
+
+ var array = [];
+ var closingPromise = @newPromise();
+ var calledDone = false;
+
+ function fulfill() {
+ calledDone = true;
+ @fulfillPromise(closingPromise, array);
+ return array;
+ }
+
+ var sink = {
+ start() {
+
+ },
+ write(chunk) {
+ array.push(chunk);
+ return chunk.length;
+ },
+
+ drain() {
+ return 0;
+ },
+
+ end() {
+ if (calledDone) {
+ return [];
+ }
+ return fulfill();
+ },
+
+ close() {
+ if (!calledDone) {
+ fulfill();
+ }
+ }
+ };
+
+ var controller = {
+ @underlyingSource: underlyingSource,
+ @pull: @onPullDirectStream,
+ @controlledReadableStream: this,
+ @sink: sink,
+ close: @onCloseDirectStream,
+ write: sink.write,
+ error: @handleDirectStreamError,
+ end: @onCloseDirectStream,
+ @close: @onCloseDirectStream,
+ drain: @onDrainDirectStream,
+ _pendingRead: @undefined,
+ _deferClose: 0,
+ _deferDrain: 0,
+ _deferCloseReason: @undefined,
+ _handleError: @undefined,
+ };
+
+
+ @putByIdDirectPrivate(this, "readableStreamController", controller);
+ @putByIdDirectPrivate(this, "underlyingSource", @undefined);
+ @putByIdDirectPrivate(this, "start", @undefined);
+ return closingPromise;
+}
+
function initializeArrayBufferStream(underlyingSource, highWaterMark)
{
"use strict";
@@ -1093,7 +1174,7 @@ function initializeArrayBufferStream(underlyingSource, highWaterMark)
// When we don't know what the destination type is
// We assume it is a Uint8Array.
- var opts = highWaterMark ? {highWaterMark, stream: true, asUint8Array: true} : {stream: true, asUint8Array: true};
+ var opts = highWaterMark && typeof highWaterMark === 'number' ? {highWaterMark, stream: true, asUint8Array: true} : {stream: true, asUint8Array: true};
var sink = new globalThis.Bun.ArrayBufferSink();
sink.start(opts);
@@ -1117,6 +1198,8 @@ function initializeArrayBufferStream(underlyingSource, highWaterMark)
@putByIdDirectPrivate(this, "readableStreamController", controller);
+ @putByIdDirectPrivate(this, "underlyingSource", @undefined);
+ @putByIdDirectPrivate(this, "start", @undefined);
}