aboutsummaryrefslogtreecommitdiff
path: root/src/bun.js/builtins/js/ReadableStreamInternals.js
diff options
context:
space:
mode:
Diffstat (limited to 'src/bun.js/builtins/js/ReadableStreamInternals.js')
-rw-r--r--src/bun.js/builtins/js/ReadableStreamInternals.js197
1 files changed, 173 insertions, 24 deletions
diff --git a/src/bun.js/builtins/js/ReadableStreamInternals.js b/src/bun.js/builtins/js/ReadableStreamInternals.js
index f988aa091..317c78171 100644
--- a/src/bun.js/builtins/js/ReadableStreamInternals.js
+++ b/src/bun.js/builtins/js/ReadableStreamInternals.js
@@ -620,12 +620,6 @@ function assignToStream(stream, sink) {
}
var pull = underlyingSource.pull;
-
- @putByIdDirectPrivate(stream, "readableStreamController", sink);
- @putByIdDirectPrivate(stream, "start", @undefined);
- @putByIdDirectPrivate(stream, "underlyingSource", @undefined);
-
- @startDirectStream.@call(sink, stream, pull, close);
if (!pull) {
close();
@@ -637,6 +631,12 @@ function assignToStream(stream, sink) {
@throwTypeError("pull is not a function");
return;
}
+
+ @putByIdDirectPrivate(stream, "readableStreamController", sink);
+ @putByIdDirectPrivate(stream, "start", @undefined);
+ @putByIdDirectPrivate(stream, "underlyingSource", @undefined);
+
+ @startDirectStream.@call(sink, stream, pull, close);
// lock the stream, relying on close() or end() to eventaully close it
reader = stream.getReader();
@@ -650,45 +650,41 @@ function assignToStream(stream, sink) {
"use strict";
var didClose = false;
-
-
try {
var reader = stream.getReader();
- reader.closed.then(() => {
- if (!didClose && sink) {
- didClose = true;
- sink.end();
- }
- }, (e) => {
- if (!didClose && sink) {
- didClose = true;
- sink.close(e);
- }
- });
var many = reader.readMany();
if (many && @isPromise(many)) {
many = await many;
}
-
if (many.done) {
didClose = true;
sink.end();
return;
}
- sink.start();
+
var wroteCount = many.value.length;
+ sink.start();
for (var i = 0, values = many.value, length = many.value.length; i < length; i++) {
sink.write(values[i]);
}
+ var streamState = @getByIdDirectPrivate(stream, "state");
+
+
+ if (streamState === @streamClosed) {
+ didClose = true;
+ return sink.end();
+ }
+
if (wroteCount > 0) {
sink.drain();
}
while (true) {
var result = await reader.read();
+
if (result.done) {
didClose = true;
return sink.end();
@@ -697,8 +693,16 @@ function assignToStream(stream, sink) {
sink.write(result.value);
}
} catch (e) {
- globalThis.console.error(e);
+ if (sink && !didClose) {
+ didClose = true;
+ try {
+ sink.close();
+ } catch(j) {
+ throw j;
+ }
+ }
+ throw e;
}
})();
}
@@ -741,7 +745,7 @@ function handleDirectStreamErrorReject(e) {
return @Promise.@reject(e);
}
-function onPullArrayBufferSink(controller)
+function onPullDirectStream(controller)
{
"use strict";
@@ -935,6 +939,151 @@ function onDrainDirectStream()
}
+function initializeTextStream(underlyingSource, highWaterMark)
+{
+ "use strict";
+
+ var sink;
+ var fifo = @createFIFO();
+ var hasString = false;
+ var hasBuffer = false;
+ var rope = '';
+ var estimatedLength = 0;
+ var closingPromise = @newPromise();
+ var calledDone = false;
+ var sink = {
+ start() {
+
+ },
+ write(chunk) {
+ if (typeof chunk === 'string') {
+ if (chunk.length > 0) {
+ rope += chunk;
+ hasString = true;
+ // TODO: utf16 byte length
+ estimatedLength += chunk.length;
+
+ }
+
+ return chunk.length;
+ }
+
+ if (!chunk || !@isObject(chunk) || !(@ArrayBuffer.@isView(chunk) || chunk instanceof @ArrayBuffer)) {
+ @throwTypeError("Expected text, ArrayBuffer or ArrayBufferView");
+ }
+
+ const byteLength = chunk.byteLength;
+ if (byteLength > 0) {
+ hasBuffer = true;
+ if (rope.length > 0) {
+ fifo.push(rope);
+ rope = '';
+ }
+ fifo.push(chunk);
+ }
+ estimatedLength += byteLength;
+ return byteLength;
+
+ },
+
+ drain() {
+ return 0;
+ },
+
+ end() {
+ if (calledDone) {
+ return "";
+ }
+ return sink.fulfill();
+ },
+
+ fulfill() {
+ calledDone = true;
+ const result = sink.finishInternal();
+ @fulfillPromise(closingPromise, result);
+ return result;
+ },
+
+ finishInternal() {
+ if (!hasString && !hasBuffer) {
+ return "";
+ }
+
+ if (hasString && !hasBuffer) {
+ return rope;
+ }
+
+ if (hasBuffer && !hasString) {
+ return new globalThis.TextDecoder().decode(
+ globalThis.Bun.concatArrayBuffers(fifo.toArray(false)));
+ }
+
+ // worst case: mixed content
+ var array = fifo.toArray(false);
+
+ var arrayBufferSink = new globalThis.Bun.ArrayBufferSink();
+ arrayBufferSink.start({
+ highWaterMark: estimatedLength,
+ asUint8Array: true,
+ });
+ for (let item of array) {
+ arrayBufferSink.write(
+ item
+ );
+ }
+ if (rope.length > 0) {
+ arrayBufferSink.write(rope);
+ }
+
+ // TODO: use builtin
+ return new globalThis.TextDecoder().decode(
+ arrayBufferSink.end()
+ );
+ },
+
+ close() {
+ try {
+ if (!calledDone) {
+ calledDone = true;
+ sink.fulfill();
+ }
+ } catch(e) {
+
+ } finally {
+ fifo.clear();
+ rope = '';
+ hasString = false;
+ hasBuffer = false;
+ estimatedLength = 0;
+ }
+ }
+ };
+
+ var controller = {
+ @underlyingSource: underlyingSource,
+ @pull: @onPullDirectStream,
+ @controlledReadableStream: this,
+ @sink: sink,
+ close: @onCloseDirectStream,
+ write: sink.write,
+ error: @handleDirectStreamError,
+ end: @onCloseDirectStream,
+ @close: @onCloseDirectStream,
+ drain: @onDrainDirectStream,
+ _pendingRead: @undefined,
+ _deferClose: 0,
+ _deferDrain: 0,
+ _deferCloseReason: @undefined,
+ _handleError: @undefined,
+ };
+
+
+ @putByIdDirectPrivate(this, "readableStreamController", controller);
+ @putByIdDirectPrivate(this, "underlyingSource", @undefined);
+ @putByIdDirectPrivate(this, "start", @undefined);
+ return closingPromise;
+}
+
function initializeArrayBufferStream(underlyingSource, highWaterMark)
{
"use strict";
@@ -949,7 +1098,7 @@ function initializeArrayBufferStream(underlyingSource, highWaterMark)
var controller = {
@underlyingSource: underlyingSource,
- @pull: @onPullArrayBufferSink,
+ @pull: @onPullDirectStream,
@controlledReadableStream: this,
@sink: sink,
close: @onCloseDirectStream,