aboutsummaryrefslogtreecommitdiff
path: root/src/bun.js/builtins/js/ReadableStreamInternals.js
diff options
context:
space:
mode:
Diffstat (limited to 'src/bun.js/builtins/js/ReadableStreamInternals.js')
-rw-r--r--src/bun.js/builtins/js/ReadableStreamInternals.js72
1 files changed, 47 insertions, 25 deletions
diff --git a/src/bun.js/builtins/js/ReadableStreamInternals.js b/src/bun.js/builtins/js/ReadableStreamInternals.js
index 353e3f9e9..0f5871fa1 100644
--- a/src/bun.js/builtins/js/ReadableStreamInternals.js
+++ b/src/bun.js/builtins/js/ReadableStreamInternals.js
@@ -784,42 +784,51 @@ function readDirectStream(stream, sink, underlyingSource) {
"use strict";
@putByIdDirectPrivate(stream, "underlyingSource", @undefined);
+ @putByIdDirectPrivate(stream, "start", @undefined);
- var {close: originalClose, pull} = underlyingSource;
- underlyingSource = @undefined;
+ var capturedStream = stream;
+ var reader;
- var fakeReader = {
- };
- var close = (reason) => {
- try {
- originalClose && originalClose(reason);
- } catch (e) {
+ function close(stream, reason) {
+ if (reason && underlyingSource?.cancel) {
+ try {
+ underlyingSource.cancel(reason);
+ } catch (e) {
+ }
+ underlyingSource = @undefined;
}
- originalClose = @undefined;
- @putByIdDirectPrivate(stream, "reader", @undefined);
- @putByIdDirectPrivate(stream, "readableStreamController", null);
- @putByIdDirectPrivate(stream, "state", @streamClosed);
- stream = @undefined;
- fakeReader = @undefined;
- };
+
+ if (stream) {
+ @putByIdDirectPrivate(stream, "readableStreamController", @undefined);
+ @putByIdDirectPrivate(stream, "reader", @undefined);
+ if (reason) {
+ @putByIdDirectPrivate(stream, "state", @streamErrored);
+ @putByIdDirectPrivate(stream, "storedError", reason);
+ } else {
+ @putByIdDirectPrivate(stream, "state", @streamClosed);
+ }
+
+ }
+ }
- if (!pull) {
+
+
+
+ if (!underlyingSource.pull) {
close();
return;
}
- if (!@isCallable(pull)) {
+ if (!@isCallable(underlyingSource.pull)) {
close();
@throwTypeError("pull is not a function");
return;
}
@putByIdDirectPrivate(stream, "readableStreamController", sink);
- @putByIdDirectPrivate(stream, "start", @undefined);
-
const highWaterMark = @getByIdDirectPrivate(stream, "highWaterMark");
if (highWaterMark) {
@@ -828,12 +837,15 @@ function readDirectStream(stream, sink, underlyingSource) {
});
}
- @startDirectStream.@call(sink, stream, pull, close);
+ @startDirectStream.@call(sink, stream, underlyingSource.pull, close);
+ @putByIdDirectPrivate(stream, "reader", {});
- // isReadableStreamLocked() checks for truthiness of "reader"
- @putByIdDirectPrivate(stream, "reader", fakeReader);
- pull(sink);
+ var maybePromise = underlyingSource.pull(sink);
sink = @undefined;
+ if (maybePromise && @isPromise(maybePromise)) {
+ return maybePromise.@then(() => {});
+ }
+
}
@globalPrivate;
@@ -841,11 +853,21 @@ function assignToStream(stream, sink) {
"use strict";
// The stream is either a direct stream or a "default" JS stream
- const underlyingSource = @getByIdDirectPrivate(stream, "underlyingSource");
+ var underlyingSource = @getByIdDirectPrivate(stream, "underlyingSource");
// we know it's a direct stream when @underlyingSource is set
if (underlyingSource) {
- return @readDirectStream(stream, sink, underlyingSource);
+ try {
+ return @readDirectStream(stream, sink, underlyingSource);
+ } catch(e) {
+ throw e;
+ } finally {
+ underlyingSource = @undefined;
+ stream = @undefined;
+ sink = @undefined;
+ }
+
+
}
return @readStreamIntoSink(stream, sink, true);