diff options
author | 2022-06-26 06:01:22 -0700 | |
---|---|---|
committer | 2022-06-26 06:01:22 -0700 | |
commit | 77a0f335cb0f18af4e03713583b98e0e1b024b33 (patch) | |
tree | f6ed90a992cb46677ab597bba4f6db2fbfcba3b1 /src/bun.js/builtins/js | |
parent | 31cfcf2c9f40520dac72530ec62e765d3a0de221 (diff) | |
download | bun-77a0f335cb0f18af4e03713583b98e0e1b024b33.tar.gz bun-77a0f335cb0f18af4e03713583b98e0e1b024b33.tar.zst bun-77a0f335cb0f18af4e03713583b98e0e1b024b33.zip |
wip ReadableStream for HTTP(s) Server
Diffstat (limited to 'src/bun.js/builtins/js')
-rw-r--r-- | src/bun.js/builtins/js/ReadableStream.js | 4 | ||||
-rw-r--r-- | src/bun.js/builtins/js/ReadableStreamDefaultReader.js | 9 | ||||
-rw-r--r-- | src/bun.js/builtins/js/ReadableStreamInternals.js | 104 |
3 files changed, 113 insertions, 4 deletions
diff --git a/src/bun.js/builtins/js/ReadableStream.js b/src/bun.js/builtins/js/ReadableStream.js index db7cf85a8..f3c11728e 100644 --- a/src/bun.js/builtins/js/ReadableStream.js +++ b/src/bun.js/builtins/js/ReadableStream.js @@ -62,14 +62,17 @@ function initializeReadableStream(underlyingSource, strategy) if (@getByIdDirectPrivate(underlyingSource, "pull") !== @undefined && !isLazy) { const size = @getByIdDirectPrivate(strategy, "size"); const highWaterMark = @getByIdDirectPrivate(strategy, "highWaterMark"); + @putByIdDirectPrivate(this, "underlyingSource", @undefined); @setupReadableStreamDefaultController(this, underlyingSource, size, highWaterMark !== @undefined ? highWaterMark : 1, @getByIdDirectPrivate(underlyingSource, "start"), @getByIdDirectPrivate(underlyingSource, "pull"), @getByIdDirectPrivate(underlyingSource, "cancel")); return this; } if (isDirect) { + @putByIdDirectPrivate(this, "underlyingSource", underlyingSource); @putByIdDirectPrivate(this, "start", () => @createReadableStreamController(this, underlyingSource, strategy)); } else if (isLazy) { const autoAllocateChunkSize = underlyingSource.autoAllocateChunkSize; + @putByIdDirectPrivate(this, "underlyingSource", @undefined); @putByIdDirectPrivate(this, "start", () => { @@ -79,6 +82,7 @@ function initializeReadableStream(underlyingSource, strategy) } }); } else { + @putByIdDirectPrivate(this, "underlyingSource", @undefined); @putByIdDirectPrivate(this, "start", @undefined); @createReadableStreamController(this, underlyingSource, strategy); } diff --git a/src/bun.js/builtins/js/ReadableStreamDefaultReader.js b/src/bun.js/builtins/js/ReadableStreamDefaultReader.js index 774c7161e..e3e39b2da 100644 --- a/src/bun.js/builtins/js/ReadableStreamDefaultReader.js +++ b/src/bun.js/builtins/js/ReadableStreamDefaultReader.js @@ -87,6 +87,11 @@ function readMany() value[i] = new @Uint8Array(buf.buffer, buf.byteOffset, buf.byteLength); } } + } else { + values[0] = values[0].value; + for (var i = 1; i < values.length; i++) { + values[i] = values[i].value; + } } @resetQueue(@getByIdDirectPrivate(controller, "queue")); @@ -117,6 +122,10 @@ function readMany() value[i] = new @Uint8Array(buf.buffer, buf.byteOffset, buf.byteLength); } } + } else { + for (var i = 1; i < value.length; i++) { + value[i] = value[i].value; + } } var size = queue.size; diff --git a/src/bun.js/builtins/js/ReadableStreamInternals.js b/src/bun.js/builtins/js/ReadableStreamInternals.js index 3e6590f31..f988aa091 100644 --- a/src/bun.js/builtins/js/ReadableStreamInternals.js +++ b/src/bun.js/builtins/js/ReadableStreamInternals.js @@ -601,10 +601,106 @@ function isReadableStreamDefaultController(controller) @globalPrivate -function assignDirectStream() { +function assignToStream(stream, sink) { "use strict"; - var stream = this; + // The stream is either a direct stream or a "default" JS stream + const underlyingSource = @getByIdDirectPrivate(stream, "underlyingSource"); + + // we know it's a direct stream when @underlyingSource is set + if (underlyingSource) { + var originalClose = underlyingSource.close; + var reader; + var close = (reason) => { + originalClose && originalClose(reason); + try { + reader && reader.releaseLock(); + } catch (e) {} + @readableStreamClose(stream, reason); + + } + var pull = underlyingSource.pull; + + @putByIdDirectPrivate(stream, "readableStreamController", sink); + @putByIdDirectPrivate(stream, "start", @undefined); + @putByIdDirectPrivate(stream, "underlyingSource", @undefined); + + @startDirectStream.@call(sink, stream, pull, close); + + if (!pull) { + close(); + return; + } + + if (!@isCallable(pull)) { + close(); + @throwTypeError("pull is not a function"); + return; + } + + // lock the stream, relying on close() or end() to eventaully close it + reader = stream.getReader(); + + pull(sink); + return; + } + + + return (async function() { + "use strict"; + + var didClose = false; + + + try { + var reader = stream.getReader(); + reader.closed.then(() => { + if (!didClose && sink) { + didClose = true; + sink.end(); + } + }, (e) => { + if (!didClose && sink) { + didClose = true; + sink.close(e); + } + }); + + var many = reader.readMany(); + if (many && @isPromise(many)) { + many = await many; + } + + if (many.done) { + didClose = true; + sink.end(); + return; + } + + sink.start(); + var wroteCount = many.value.length; + for (var i = 0, values = many.value, length = many.value.length; i < length; i++) { + sink.write(values[i]); + } + + if (wroteCount > 0) { + sink.drain(); + } + + while (true) { + var result = await reader.read(); + if (result.done) { + didClose = true; + return sink.end(); + } + + sink.write(result.value); + } + } catch (e) { + globalThis.console.error(e); + + } + })(); } @@ -645,7 +741,7 @@ function handleDirectStreamErrorReject(e) { return @Promise.@reject(e); } -function onPullDirectStream(controller) +function onPullArrayBufferSink(controller) { "use strict"; @@ -853,7 +949,7 @@ function initializeArrayBufferStream(underlyingSource, highWaterMark) var controller = { @underlyingSource: underlyingSource, - @pull: @onPullDirectStream, + @pull: @onPullArrayBufferSink, @controlledReadableStream: this, @sink: sink, close: @onCloseDirectStream, |