diff options
Diffstat (limited to 'src/bun.js/builtins/js/ReadableStreamInternals.js')
-rw-r--r-- | src/bun.js/builtins/js/ReadableStreamInternals.js | 197 |
1 files changed, 173 insertions, 24 deletions
diff --git a/src/bun.js/builtins/js/ReadableStreamInternals.js b/src/bun.js/builtins/js/ReadableStreamInternals.js index f988aa091..317c78171 100644 --- a/src/bun.js/builtins/js/ReadableStreamInternals.js +++ b/src/bun.js/builtins/js/ReadableStreamInternals.js @@ -620,12 +620,6 @@ function assignToStream(stream, sink) { } var pull = underlyingSource.pull; - - @putByIdDirectPrivate(stream, "readableStreamController", sink); - @putByIdDirectPrivate(stream, "start", @undefined); - @putByIdDirectPrivate(stream, "underlyingSource", @undefined); - - @startDirectStream.@call(sink, stream, pull, close); if (!pull) { close(); @@ -637,6 +631,12 @@ function assignToStream(stream, sink) { @throwTypeError("pull is not a function"); return; } + + @putByIdDirectPrivate(stream, "readableStreamController", sink); + @putByIdDirectPrivate(stream, "start", @undefined); + @putByIdDirectPrivate(stream, "underlyingSource", @undefined); + + @startDirectStream.@call(sink, stream, pull, close); // lock the stream, relying on close() or end() to eventaully close it reader = stream.getReader(); @@ -650,45 +650,41 @@ function assignToStream(stream, sink) { "use strict"; var didClose = false; - - try { var reader = stream.getReader(); - reader.closed.then(() => { - if (!didClose && sink) { - didClose = true; - sink.end(); - } - }, (e) => { - if (!didClose && sink) { - didClose = true; - sink.close(e); - } - }); var many = reader.readMany(); if (many && @isPromise(many)) { many = await many; } - if (many.done) { didClose = true; sink.end(); return; } - sink.start(); + var wroteCount = many.value.length; + sink.start(); for (var i = 0, values = many.value, length = many.value.length; i < length; i++) { sink.write(values[i]); } + var streamState = @getByIdDirectPrivate(stream, "state"); + + + if (streamState === @streamClosed) { + didClose = true; + return sink.end(); + } + if (wroteCount > 0) { sink.drain(); } while (true) { var result = await reader.read(); + if (result.done) { didClose = true; return sink.end(); @@ -697,8 +693,16 @@ function assignToStream(stream, sink) { sink.write(result.value); } } catch (e) { - globalThis.console.error(e); + if (sink && !didClose) { + didClose = true; + try { + sink.close(); + } catch(j) { + throw j; + } + } + throw e; } })(); } @@ -741,7 +745,7 @@ function handleDirectStreamErrorReject(e) { return @Promise.@reject(e); } -function onPullArrayBufferSink(controller) +function onPullDirectStream(controller) { "use strict"; @@ -935,6 +939,151 @@ function onDrainDirectStream() } +function initializeTextStream(underlyingSource, highWaterMark) +{ + "use strict"; + + var sink; + var fifo = @createFIFO(); + var hasString = false; + var hasBuffer = false; + var rope = ''; + var estimatedLength = 0; + var closingPromise = @newPromise(); + var calledDone = false; + var sink = { + start() { + + }, + write(chunk) { + if (typeof chunk === 'string') { + if (chunk.length > 0) { + rope += chunk; + hasString = true; + // TODO: utf16 byte length + estimatedLength += chunk.length; + + } + + return chunk.length; + } + + if (!chunk || !@isObject(chunk) || !(@ArrayBuffer.@isView(chunk) || chunk instanceof @ArrayBuffer)) { + @throwTypeError("Expected text, ArrayBuffer or ArrayBufferView"); + } + + const byteLength = chunk.byteLength; + if (byteLength > 0) { + hasBuffer = true; + if (rope.length > 0) { + fifo.push(rope); + rope = ''; + } + fifo.push(chunk); + } + estimatedLength += byteLength; + return byteLength; + + }, + + drain() { + return 0; + }, + + end() { + if (calledDone) { + return ""; + } + return sink.fulfill(); + }, + + fulfill() { + calledDone = true; + const result = sink.finishInternal(); + @fulfillPromise(closingPromise, result); + return result; + }, + + finishInternal() { + if (!hasString && !hasBuffer) { + return ""; + } + + if (hasString && !hasBuffer) { + return rope; + } + + if (hasBuffer && !hasString) { + return new globalThis.TextDecoder().decode( + globalThis.Bun.concatArrayBuffers(fifo.toArray(false))); + } + + // worst case: mixed content + var array = fifo.toArray(false); + + var arrayBufferSink = new globalThis.Bun.ArrayBufferSink(); + arrayBufferSink.start({ + highWaterMark: estimatedLength, + asUint8Array: true, + }); + for (let item of array) { + arrayBufferSink.write( + item + ); + } + if (rope.length > 0) { + arrayBufferSink.write(rope); + } + + // TODO: use builtin + return new globalThis.TextDecoder().decode( + arrayBufferSink.end() + ); + }, + + close() { + try { + if (!calledDone) { + calledDone = true; + sink.fulfill(); + } + } catch(e) { + + } finally { + fifo.clear(); + rope = ''; + hasString = false; + hasBuffer = false; + estimatedLength = 0; + } + } + }; + + var controller = { + @underlyingSource: underlyingSource, + @pull: @onPullDirectStream, + @controlledReadableStream: this, + @sink: sink, + close: @onCloseDirectStream, + write: sink.write, + error: @handleDirectStreamError, + end: @onCloseDirectStream, + @close: @onCloseDirectStream, + drain: @onDrainDirectStream, + _pendingRead: @undefined, + _deferClose: 0, + _deferDrain: 0, + _deferCloseReason: @undefined, + _handleError: @undefined, + }; + + + @putByIdDirectPrivate(this, "readableStreamController", controller); + @putByIdDirectPrivate(this, "underlyingSource", @undefined); + @putByIdDirectPrivate(this, "start", @undefined); + return closingPromise; +} + function initializeArrayBufferStream(underlyingSource, highWaterMark) { "use strict"; @@ -949,7 +1098,7 @@ function initializeArrayBufferStream(underlyingSource, highWaterMark) var controller = { @underlyingSource: underlyingSource, - @pull: @onPullArrayBufferSink, + @pull: @onPullDirectStream, @controlledReadableStream: this, @sink: sink, close: @onCloseDirectStream, |