From 48f64bc6e5fade5410413d31d0f17e9802a3917b Mon Sep 17 00:00:00 2001 From: Jarred Sumner <709451+Jarred-Sumner@users.noreply.github.com> Date: Mon, 4 Jul 2022 16:38:45 -0700 Subject: [itnernal] Cleanup some of the streams code --- src/bun.js/builtins/js/ReadableStreamInternals.js | 72 +++++++++++++++-------- 1 file changed, 47 insertions(+), 25 deletions(-) (limited to 'src/bun.js/builtins/js/ReadableStreamInternals.js') diff --git a/src/bun.js/builtins/js/ReadableStreamInternals.js b/src/bun.js/builtins/js/ReadableStreamInternals.js index 353e3f9e9..0f5871fa1 100644 --- a/src/bun.js/builtins/js/ReadableStreamInternals.js +++ b/src/bun.js/builtins/js/ReadableStreamInternals.js @@ -784,42 +784,51 @@ function readDirectStream(stream, sink, underlyingSource) { "use strict"; @putByIdDirectPrivate(stream, "underlyingSource", @undefined); + @putByIdDirectPrivate(stream, "start", @undefined); - var {close: originalClose, pull} = underlyingSource; - underlyingSource = @undefined; + var capturedStream = stream; + var reader; - var fakeReader = { - }; - var close = (reason) => { - try { - originalClose && originalClose(reason); - } catch (e) { + function close(stream, reason) { + if (reason && underlyingSource?.cancel) { + try { + underlyingSource.cancel(reason); + } catch (e) { + } + underlyingSource = @undefined; } - originalClose = @undefined; - @putByIdDirectPrivate(stream, "reader", @undefined); - @putByIdDirectPrivate(stream, "readableStreamController", null); - @putByIdDirectPrivate(stream, "state", @streamClosed); - stream = @undefined; - fakeReader = @undefined; - }; + + if (stream) { + @putByIdDirectPrivate(stream, "readableStreamController", @undefined); + @putByIdDirectPrivate(stream, "reader", @undefined); + if (reason) { + @putByIdDirectPrivate(stream, "state", @streamErrored); + @putByIdDirectPrivate(stream, "storedError", reason); + } else { + @putByIdDirectPrivate(stream, "state", @streamClosed); + } + + } + } - if (!pull) { + + + + if (!underlyingSource.pull) { close(); return; } - if (!@isCallable(pull)) { + if (!@isCallable(underlyingSource.pull)) { close(); @throwTypeError("pull is not a function"); return; } @putByIdDirectPrivate(stream, "readableStreamController", sink); - @putByIdDirectPrivate(stream, "start", @undefined); - const highWaterMark = @getByIdDirectPrivate(stream, "highWaterMark"); if (highWaterMark) { @@ -828,12 +837,15 @@ function readDirectStream(stream, sink, underlyingSource) { }); } - @startDirectStream.@call(sink, stream, pull, close); + @startDirectStream.@call(sink, stream, underlyingSource.pull, close); + @putByIdDirectPrivate(stream, "reader", {}); - // isReadableStreamLocked() checks for truthiness of "reader" - @putByIdDirectPrivate(stream, "reader", fakeReader); - pull(sink); + var maybePromise = underlyingSource.pull(sink); sink = @undefined; + if (maybePromise && @isPromise(maybePromise)) { + return maybePromise.@then(() => {}); + } + } @globalPrivate; @@ -841,11 +853,21 @@ function assignToStream(stream, sink) { "use strict"; // The stream is either a direct stream or a "default" JS stream - const underlyingSource = @getByIdDirectPrivate(stream, "underlyingSource"); + var underlyingSource = @getByIdDirectPrivate(stream, "underlyingSource"); // we know it's a direct stream when @underlyingSource is set if (underlyingSource) { - return @readDirectStream(stream, sink, underlyingSource); + try { + return @readDirectStream(stream, sink, underlyingSource); + } catch(e) { + throw e; + } finally { + underlyingSource = @undefined; + stream = @undefined; + sink = @undefined; + } + + } return @readStreamIntoSink(stream, sink, true); -- cgit v1.2.3