diff options
Diffstat (limited to 'src/bun.js/builtins/js/ReadableStreamInternals.js')
-rw-r--r-- | src/bun.js/builtins/js/ReadableStreamInternals.js | 105 |
1 files changed, 94 insertions, 11 deletions
diff --git a/src/bun.js/builtins/js/ReadableStreamInternals.js b/src/bun.js/builtins/js/ReadableStreamInternals.js index 2bd707622..ce8a85445 100644 --- a/src/bun.js/builtins/js/ReadableStreamInternals.js +++ b/src/bun.js/builtins/js/ReadableStreamInternals.js @@ -636,8 +636,17 @@ function assignToStream(stream, sink) { @putByIdDirectPrivate(stream, "start", @undefined); @putByIdDirectPrivate(stream, "underlyingSource", @undefined); - @startDirectStream.@call(sink, stream, pull, close); + const highWaterMark = @getByIdDirectPrivate(stream, "highWaterMark"); + if (highWaterMark) { + sink.start({ + highWaterMark, + }); + } + + @startDirectStream.@call(sink, stream, pull, close); + + // lock the stream, relying on close() or end() to eventaully close it reader = stream.getReader(); @@ -665,7 +674,11 @@ function assignToStream(stream, sink) { var wroteCount = many.value.length; - sink.start(); + const highWaterMark = @getByIdDirectPrivate(stream, "highWaterMark"); + + if (highWaterMark) + sink.start({highWaterMark}); + for (var i = 0, values = many.value, length = many.value.length; i < length; i++) { sink.write(values[i]); } @@ -948,32 +961,35 @@ function initializeTextStream(underlyingSource, highWaterMark) var hasString = false; var hasBuffer = false; var rope = ''; - var estimatedLength = 0; + var estimatedLength = @toLength(0); var closingPromise = @newPromise(); var calledDone = false; + var isView = @ArrayBuffer.@isView; - var sink = { + + sink = { start() { }, write(chunk) { if (typeof chunk === 'string') { - if (chunk.length > 0) { + var chunkLength = @toLength(chunk.length); + if (chunkLength > 0) { rope += chunk; hasString = true; // TODO: utf16 byte length - estimatedLength += chunk.length; + estimatedLength += chunkLength; } - return chunk.length; + return chunkLength; } - if (!chunk || !@isObject(chunk) || !(@ArrayBuffer.@isView(chunk) || chunk instanceof @ArrayBuffer)) { + if (!chunk || !@isObject(chunk) || !((isView(chunk)) || chunk instanceof @ArrayBuffer)) { @throwTypeError("Expected text, ArrayBuffer or ArrayBufferView"); } - const byteLength = chunk.byteLength; + const byteLength = @toLength(chunk.byteLength); if (byteLength > 0) { hasBuffer = true; if (rope.length > 0) { @@ -1051,7 +1067,6 @@ function initializeTextStream(underlyingSource, highWaterMark) } catch(e) { } finally { - fifo.clear(); rope = ''; hasString = false; hasBuffer = false; @@ -1085,6 +1100,72 @@ function initializeTextStream(underlyingSource, highWaterMark) return closingPromise; } +function initializeArrayStream(underlyingSource, highWaterMark) +{ + "use strict"; + + var array = []; + var closingPromise = @newPromise(); + var calledDone = false; + + function fulfill() { + calledDone = true; + @fulfillPromise(closingPromise, array); + return array; + } + + var sink = { + start() { + + }, + write(chunk) { + array.push(chunk); + return chunk.length; + }, + + drain() { + return 0; + }, + + end() { + if (calledDone) { + return []; + } + return fulfill(); + }, + + close() { + if (!calledDone) { + fulfill(); + } + } + }; + + var controller = { + @underlyingSource: underlyingSource, + @pull: @onPullDirectStream, + @controlledReadableStream: this, + @sink: sink, + close: @onCloseDirectStream, + write: sink.write, + error: @handleDirectStreamError, + end: @onCloseDirectStream, + @close: @onCloseDirectStream, + drain: @onDrainDirectStream, + _pendingRead: @undefined, + _deferClose: 0, + _deferDrain: 0, + _deferCloseReason: @undefined, + _handleError: @undefined, + }; + + + @putByIdDirectPrivate(this, "readableStreamController", controller); + @putByIdDirectPrivate(this, "underlyingSource", @undefined); + @putByIdDirectPrivate(this, "start", @undefined); + return closingPromise; +} + function initializeArrayBufferStream(underlyingSource, highWaterMark) { "use strict"; @@ -1093,7 +1174,7 @@ function initializeArrayBufferStream(underlyingSource, highWaterMark) // When we don't know what the destination type is // We assume it is a Uint8Array. - var opts = highWaterMark ? {highWaterMark, stream: true, asUint8Array: true} : {stream: true, asUint8Array: true}; + var opts = highWaterMark && typeof highWaterMark === 'number' ? {highWaterMark, stream: true, asUint8Array: true} : {stream: true, asUint8Array: true}; var sink = new globalThis.Bun.ArrayBufferSink(); sink.start(opts); @@ -1117,6 +1198,8 @@ function initializeArrayBufferStream(underlyingSource, highWaterMark) @putByIdDirectPrivate(this, "readableStreamController", controller); + @putByIdDirectPrivate(this, "underlyingSource", @undefined); + @putByIdDirectPrivate(this, "start", @undefined); } |