diff options
Diffstat (limited to 'src/bun.js/builtins')
-rw-r--r-- | src/bun.js/builtins/js/ReadableStream.js | 89 | ||||
-rw-r--r-- | src/bun.js/builtins/js/ReadableStreamInternals.js | 105 |
2 files changed, 125 insertions, 69 deletions
diff --git a/src/bun.js/builtins/js/ReadableStream.js b/src/bun.js/builtins/js/ReadableStream.js index 7f1723b12..95f379be5 100644 --- a/src/bun.js/builtins/js/ReadableStream.js +++ b/src/bun.js/builtins/js/ReadableStream.js @@ -46,6 +46,7 @@ function initializeReadableStream(underlyingSource, strategy) @putByIdDirectPrivate(this, "storedError", @undefined); @putByIdDirectPrivate(this, "disturbed", false); + // Initialized with null value to enable distinction with undefined case. @putByIdDirectPrivate(this, "readableStreamController", null); @@ -62,6 +63,7 @@ function initializeReadableStream(underlyingSource, strategy) if (@getByIdDirectPrivate(underlyingSource, "pull") !== @undefined && !isLazy) { const size = @getByIdDirectPrivate(strategy, "size"); const highWaterMark = @getByIdDirectPrivate(strategy, "highWaterMark"); + @putByIdDirectPrivate(this, "highWaterMark", highWaterMark); @putByIdDirectPrivate(this, "underlyingSource", @undefined); @setupReadableStreamDefaultController(this, underlyingSource, size, highWaterMark !== @undefined ? highWaterMark : 1, @getByIdDirectPrivate(underlyingSource, "start"), @getByIdDirectPrivate(underlyingSource, "pull"), @getByIdDirectPrivate(underlyingSource, "cancel")); @@ -69,10 +71,13 @@ function initializeReadableStream(underlyingSource, strategy) } if (isDirect) { @putByIdDirectPrivate(this, "underlyingSource", underlyingSource); + @putByIdDirectPrivate(this, "highWaterMark", @getByIdDirectPrivate(strategy, "highWaterMark")); @putByIdDirectPrivate(this, "start", () => @createReadableStreamController(this, underlyingSource, strategy)); } else if (isLazy) { const autoAllocateChunkSize = underlyingSource.autoAllocateChunkSize; + @putByIdDirectPrivate(this, "highWaterMark", @undefined); @putByIdDirectPrivate(this, "underlyingSource", @undefined); + @putByIdDirectPrivate(this, "highWaterMark", autoAllocateChunkSize || @getByIdDirectPrivate(strategy, "highWaterMark")); @putByIdDirectPrivate(this, "start", () => { @@ -83,6 +88,7 @@ function initializeReadableStream(underlyingSource, strategy) }); } else { @putByIdDirectPrivate(this, "underlyingSource", @undefined); + @putByIdDirectPrivate(this, "highWaterMark", @getByIdDirectPrivate(strategy, "highWaterMark")); @putByIdDirectPrivate(this, "start", @undefined); @createReadableStreamController(this, underlyingSource, strategy); } @@ -96,42 +102,54 @@ function initializeReadableStream(underlyingSource, strategy) function readableStreamToArray(stream) { "use strict"; - if (!stream || @getByIdDirectPrivate(stream, "state") === @streamClosed) { - return null; + // this is a direct stream + var underlyingSource = @getByIdDirectPrivate(stream, "underlyingSource"); + if (underlyingSource !== @undefined) { + const promise = @initializeArrayStream.@call(stream, underlyingSource, @undefined); + var reader = stream.getReader(); + return (async function() { + while (@getByIdDirectPrivate(stream, "state") === @streamReadable) { + var thisResult = await reader.read(); + if (thisResult.done) { + break; + } + } + + try { + reader.releaseLock(); + } catch(e) { + } + + return await promise; + })(); } + var reader = stream.getReader(); var manyResult = reader.readMany(); async function processManyResult(result) { if (result.done) { - return null; + return []; } var chunks = result.value || []; while (true) { var thisResult = await reader.read(); - if (thisResult.done) { - return chunks; + break; } - - chunks.push(thisResult.value); + chunks = chunks.concat(thisResult.value); } return chunks; - }; - + } if (manyResult && @isPromise(manyResult)) { return manyResult.@then(processManyResult); } - if (manyResult && manyResult.done) { - return null; - } - return processManyResult(manyResult); } @@ -177,7 +195,6 @@ function readableStreamToText(stream) { function readableStreamToJSON(stream) { "use strict"; - // TODO: optimize this to skip the extra ArrayBuffer return @readableStreamToText(stream).@then(globalThis.JSON.parse); } @@ -213,50 +230,6 @@ function readableStreamToBlob(stream) { } @globalPrivate -function readableStreamToArrayPublic(stream) { - "use strict"; - - if (@getByIdDirectPrivate(stream, "state") === @streamClosed) { - return []; - } - var reader = stream.getReader(); - - var manyResult = reader.readMany(); - - var processManyResult = (0, (async function(result) { - if (result.done) { - return []; - } - - var chunks = result.value || []; - - while (true) { - var thisResult = await reader.read(); - if (thisResult.done) { - return chunks; - } - - chunks.push(thisResult.value); - } - - return chunks; - })); - - - if (manyResult && @isPromise(manyResult)) { - return manyResult.then(processManyResult); - } - - if (manyResult && manyResult.done) { - return []; - } - - return processManyResult(manyResult); -} - - - -@globalPrivate function consumeReadableStream(nativePtr, nativeType, inputStream) { "use strict"; const symbol = globalThis.Symbol.for("Bun.consumeReadableStreamPrototype"); diff --git a/src/bun.js/builtins/js/ReadableStreamInternals.js b/src/bun.js/builtins/js/ReadableStreamInternals.js index 2bd707622..ce8a85445 100644 --- a/src/bun.js/builtins/js/ReadableStreamInternals.js +++ b/src/bun.js/builtins/js/ReadableStreamInternals.js @@ -636,8 +636,17 @@ function assignToStream(stream, sink) { @putByIdDirectPrivate(stream, "start", @undefined); @putByIdDirectPrivate(stream, "underlyingSource", @undefined); - @startDirectStream.@call(sink, stream, pull, close); + const highWaterMark = @getByIdDirectPrivate(stream, "highWaterMark"); + if (highWaterMark) { + sink.start({ + highWaterMark, + }); + } + + @startDirectStream.@call(sink, stream, pull, close); + + // lock the stream, relying on close() or end() to eventaully close it reader = stream.getReader(); @@ -665,7 +674,11 @@ function assignToStream(stream, sink) { var wroteCount = many.value.length; - sink.start(); + const highWaterMark = @getByIdDirectPrivate(stream, "highWaterMark"); + + if (highWaterMark) + sink.start({highWaterMark}); + for (var i = 0, values = many.value, length = many.value.length; i < length; i++) { sink.write(values[i]); } @@ -948,32 +961,35 @@ function initializeTextStream(underlyingSource, highWaterMark) var hasString = false; var hasBuffer = false; var rope = ''; - var estimatedLength = 0; + var estimatedLength = @toLength(0); var closingPromise = @newPromise(); var calledDone = false; + var isView = @ArrayBuffer.@isView; - var sink = { + + sink = { start() { }, write(chunk) { if (typeof chunk === 'string') { - if (chunk.length > 0) { + var chunkLength = @toLength(chunk.length); + if (chunkLength > 0) { rope += chunk; hasString = true; // TODO: utf16 byte length - estimatedLength += chunk.length; + estimatedLength += chunkLength; } - return chunk.length; + return chunkLength; } - if (!chunk || !@isObject(chunk) || !(@ArrayBuffer.@isView(chunk) || chunk instanceof @ArrayBuffer)) { + if (!chunk || !@isObject(chunk) || !((isView(chunk)) || chunk instanceof @ArrayBuffer)) { @throwTypeError("Expected text, ArrayBuffer or ArrayBufferView"); } - const byteLength = chunk.byteLength; + const byteLength = @toLength(chunk.byteLength); if (byteLength > 0) { hasBuffer = true; if (rope.length > 0) { @@ -1051,7 +1067,6 @@ function initializeTextStream(underlyingSource, highWaterMark) } catch(e) { } finally { - fifo.clear(); rope = ''; hasString = false; hasBuffer = false; @@ -1085,6 +1100,72 @@ function initializeTextStream(underlyingSource, highWaterMark) return closingPromise; } +function initializeArrayStream(underlyingSource, highWaterMark) +{ + "use strict"; + + var array = []; + var closingPromise = @newPromise(); + var calledDone = false; + + function fulfill() { + calledDone = true; + @fulfillPromise(closingPromise, array); + return array; + } + + var sink = { + start() { + + }, + write(chunk) { + array.push(chunk); + return chunk.length; + }, + + drain() { + return 0; + }, + + end() { + if (calledDone) { + return []; + } + return fulfill(); + }, + + close() { + if (!calledDone) { + fulfill(); + } + } + }; + + var controller = { + @underlyingSource: underlyingSource, + @pull: @onPullDirectStream, + @controlledReadableStream: this, + @sink: sink, + close: @onCloseDirectStream, + write: sink.write, + error: @handleDirectStreamError, + end: @onCloseDirectStream, + @close: @onCloseDirectStream, + drain: @onDrainDirectStream, + _pendingRead: @undefined, + _deferClose: 0, + _deferDrain: 0, + _deferCloseReason: @undefined, + _handleError: @undefined, + }; + + + @putByIdDirectPrivate(this, "readableStreamController", controller); + @putByIdDirectPrivate(this, "underlyingSource", @undefined); + @putByIdDirectPrivate(this, "start", @undefined); + return closingPromise; +} + function initializeArrayBufferStream(underlyingSource, highWaterMark) { "use strict"; @@ -1093,7 +1174,7 @@ function initializeArrayBufferStream(underlyingSource, highWaterMark) // When we don't know what the destination type is // We assume it is a Uint8Array. - var opts = highWaterMark ? {highWaterMark, stream: true, asUint8Array: true} : {stream: true, asUint8Array: true}; + var opts = highWaterMark && typeof highWaterMark === 'number' ? {highWaterMark, stream: true, asUint8Array: true} : {stream: true, asUint8Array: true}; var sink = new globalThis.Bun.ArrayBufferSink(); sink.start(opts); @@ -1117,6 +1198,8 @@ function initializeArrayBufferStream(underlyingSource, highWaterMark) @putByIdDirectPrivate(this, "readableStreamController", controller); + @putByIdDirectPrivate(this, "underlyingSource", @undefined); + @putByIdDirectPrivate(this, "start", @undefined); } |