aboutsummaryrefslogtreecommitdiff
path: root/src/bun.js/builtins/js/ReadableStreamInternals.js
diff options
context:
space:
mode:
authorGravatar Jarred Sumner <709451+Jarred-Sumner@users.noreply.github.com> 2022-06-29 07:08:37 -0700
committerGravatar Jarred Sumner <709451+Jarred-Sumner@users.noreply.github.com> 2022-06-29 07:08:37 -0700
commit013dfb04988d2bb569d3872d22d17648d95c32ca (patch)
treefa10b508dd56190609000805203b8e6f71a0bad1 /src/bun.js/builtins/js/ReadableStreamInternals.js
parent7168a4032837af414197ccfbf10c72da0815b661 (diff)
downloadbun-013dfb04988d2bb569d3872d22d17648d95c32ca.tar.gz
bun-013dfb04988d2bb569d3872d22d17648d95c32ca.tar.zst
bun-013dfb04988d2bb569d3872d22d17648d95c32ca.zip
[web streams] Handle direct streams in `readableStreamToArray`
Diffstat (limited to 'src/bun.js/builtins/js/ReadableStreamInternals.js')
-rw-r--r--src/bun.js/builtins/js/ReadableStreamInternals.js105
1 files changed, 94 insertions, 11 deletions
diff --git a/src/bun.js/builtins/js/ReadableStreamInternals.js b/src/bun.js/builtins/js/ReadableStreamInternals.js
index 2bd707622..ce8a85445 100644
--- a/src/bun.js/builtins/js/ReadableStreamInternals.js
+++ b/src/bun.js/builtins/js/ReadableStreamInternals.js
@@ -636,8 +636,17 @@ function assignToStream(stream, sink) {
@putByIdDirectPrivate(stream, "start", @undefined);
@putByIdDirectPrivate(stream, "underlyingSource", @undefined);
- @startDirectStream.@call(sink, stream, pull, close);
+ const highWaterMark = @getByIdDirectPrivate(stream, "highWaterMark");
+ if (highWaterMark) {
+ sink.start({
+ highWaterMark,
+ });
+ }
+
+ @startDirectStream.@call(sink, stream, pull, close);
+
+
// lock the stream, relying on close() or end() to eventaully close it
reader = stream.getReader();
@@ -665,7 +674,11 @@ function assignToStream(stream, sink) {
var wroteCount = many.value.length;
- sink.start();
+ const highWaterMark = @getByIdDirectPrivate(stream, "highWaterMark");
+
+ if (highWaterMark)
+ sink.start({highWaterMark});
+
for (var i = 0, values = many.value, length = many.value.length; i < length; i++) {
sink.write(values[i]);
}
@@ -948,32 +961,35 @@ function initializeTextStream(underlyingSource, highWaterMark)
var hasString = false;
var hasBuffer = false;
var rope = '';
- var estimatedLength = 0;
+ var estimatedLength = @toLength(0);
var closingPromise = @newPromise();
var calledDone = false;
+ var isView = @ArrayBuffer.@isView;
- var sink = {
+
+ sink = {
start() {
},
write(chunk) {
if (typeof chunk === 'string') {
- if (chunk.length > 0) {
+ var chunkLength = @toLength(chunk.length);
+ if (chunkLength > 0) {
rope += chunk;
hasString = true;
// TODO: utf16 byte length
- estimatedLength += chunk.length;
+ estimatedLength += chunkLength;
}
- return chunk.length;
+ return chunkLength;
}
- if (!chunk || !@isObject(chunk) || !(@ArrayBuffer.@isView(chunk) || chunk instanceof @ArrayBuffer)) {
+ if (!chunk || !@isObject(chunk) || !((isView(chunk)) || chunk instanceof @ArrayBuffer)) {
@throwTypeError("Expected text, ArrayBuffer or ArrayBufferView");
}
- const byteLength = chunk.byteLength;
+ const byteLength = @toLength(chunk.byteLength);
if (byteLength > 0) {
hasBuffer = true;
if (rope.length > 0) {
@@ -1051,7 +1067,6 @@ function initializeTextStream(underlyingSource, highWaterMark)
} catch(e) {
} finally {
- fifo.clear();
rope = '';
hasString = false;
hasBuffer = false;
@@ -1085,6 +1100,72 @@ function initializeTextStream(underlyingSource, highWaterMark)
return closingPromise;
}
+function initializeArrayStream(underlyingSource, highWaterMark)
+{
+ "use strict";
+
+ var array = [];
+ var closingPromise = @newPromise();
+ var calledDone = false;
+
+ function fulfill() {
+ calledDone = true;
+ @fulfillPromise(closingPromise, array);
+ return array;
+ }
+
+ var sink = {
+ start() {
+
+ },
+ write(chunk) {
+ array.push(chunk);
+ return chunk.length;
+ },
+
+ drain() {
+ return 0;
+ },
+
+ end() {
+ if (calledDone) {
+ return [];
+ }
+ return fulfill();
+ },
+
+ close() {
+ if (!calledDone) {
+ fulfill();
+ }
+ }
+ };
+
+ var controller = {
+ @underlyingSource: underlyingSource,
+ @pull: @onPullDirectStream,
+ @controlledReadableStream: this,
+ @sink: sink,
+ close: @onCloseDirectStream,
+ write: sink.write,
+ error: @handleDirectStreamError,
+ end: @onCloseDirectStream,
+ @close: @onCloseDirectStream,
+ drain: @onDrainDirectStream,
+ _pendingRead: @undefined,
+ _deferClose: 0,
+ _deferDrain: 0,
+ _deferCloseReason: @undefined,
+ _handleError: @undefined,
+ };
+
+
+ @putByIdDirectPrivate(this, "readableStreamController", controller);
+ @putByIdDirectPrivate(this, "underlyingSource", @undefined);
+ @putByIdDirectPrivate(this, "start", @undefined);
+ return closingPromise;
+}
+
function initializeArrayBufferStream(underlyingSource, highWaterMark)
{
"use strict";
@@ -1093,7 +1174,7 @@ function initializeArrayBufferStream(underlyingSource, highWaterMark)
// When we don't know what the destination type is
// We assume it is a Uint8Array.
- var opts = highWaterMark ? {highWaterMark, stream: true, asUint8Array: true} : {stream: true, asUint8Array: true};
+ var opts = highWaterMark && typeof highWaterMark === 'number' ? {highWaterMark, stream: true, asUint8Array: true} : {stream: true, asUint8Array: true};
var sink = new globalThis.Bun.ArrayBufferSink();
sink.start(opts);
@@ -1117,6 +1198,8 @@ function initializeArrayBufferStream(underlyingSource, highWaterMark)
@putByIdDirectPrivate(this, "readableStreamController", controller);
+ @putByIdDirectPrivate(this, "underlyingSource", @undefined);
+ @putByIdDirectPrivate(this, "start", @undefined);
}