diff options
author | 2022-06-26 06:01:22 -0700 | |
---|---|---|
committer | 2022-06-26 06:01:22 -0700 | |
commit | 77a0f335cb0f18af4e03713583b98e0e1b024b33 (patch) | |
tree | f6ed90a992cb46677ab597bba4f6db2fbfcba3b1 /src/bun.js/builtins/js/ReadableStreamInternals.js | |
parent | 31cfcf2c9f40520dac72530ec62e765d3a0de221 (diff) | |
download | bun-77a0f335cb0f18af4e03713583b98e0e1b024b33.tar.gz bun-77a0f335cb0f18af4e03713583b98e0e1b024b33.tar.zst bun-77a0f335cb0f18af4e03713583b98e0e1b024b33.zip |
wip ReadableStream for HTTP(s) Server
Diffstat (limited to 'src/bun.js/builtins/js/ReadableStreamInternals.js')
-rw-r--r-- | src/bun.js/builtins/js/ReadableStreamInternals.js | 104 |
1 files changed, 100 insertions, 4 deletions
diff --git a/src/bun.js/builtins/js/ReadableStreamInternals.js b/src/bun.js/builtins/js/ReadableStreamInternals.js index 3e6590f31..f988aa091 100644 --- a/src/bun.js/builtins/js/ReadableStreamInternals.js +++ b/src/bun.js/builtins/js/ReadableStreamInternals.js @@ -601,10 +601,106 @@ function isReadableStreamDefaultController(controller) @globalPrivate -function assignDirectStream() { +function assignToStream(stream, sink) { "use strict"; - var stream = this; + // The stream is either a direct stream or a "default" JS stream + const underlyingSource = @getByIdDirectPrivate(stream, "underlyingSource"); + + // we know it's a direct stream when @underlyingSource is set + if (underlyingSource) { + var originalClose = underlyingSource.close; + var reader; + var close = (reason) => { + originalClose && originalClose(reason); + try { + reader && reader.releaseLock(); + } catch (e) {} + @readableStreamClose(stream, reason); + + } + var pull = underlyingSource.pull; + + @putByIdDirectPrivate(stream, "readableStreamController", sink); + @putByIdDirectPrivate(stream, "start", @undefined); + @putByIdDirectPrivate(stream, "underlyingSource", @undefined); + + @startDirectStream.@call(sink, stream, pull, close); + + if (!pull) { + close(); + return; + } + + if (!@isCallable(pull)) { + close(); + @throwTypeError("pull is not a function"); + return; + } + + // lock the stream, relying on close() or end() to eventaully close it + reader = stream.getReader(); + + pull(sink); + return; + } + + + return (async function() { + "use strict"; + + var didClose = false; + + + try { + var reader = stream.getReader(); + reader.closed.then(() => { + if (!didClose && sink) { + didClose = true; + sink.end(); + } + }, (e) => { + if (!didClose && sink) { + didClose = true; + sink.close(e); + } + }); + + var many = reader.readMany(); + if (many && @isPromise(many)) { + many = await many; + } + + if (many.done) { + didClose = true; + sink.end(); + return; + } + + sink.start(); + var wroteCount = many.value.length; + for (var i = 0, values = many.value, length = many.value.length; i < length; i++) { + sink.write(values[i]); + } + + if (wroteCount > 0) { + sink.drain(); + } + + while (true) { + var result = await reader.read(); + if (result.done) { + didClose = true; + return sink.end(); + } + + sink.write(result.value); + } + } catch (e) { + globalThis.console.error(e); + + } + })(); } @@ -645,7 +741,7 @@ function handleDirectStreamErrorReject(e) { return @Promise.@reject(e); } -function onPullDirectStream(controller) +function onPullArrayBufferSink(controller) { "use strict"; @@ -853,7 +949,7 @@ function initializeArrayBufferStream(underlyingSource, highWaterMark) var controller = { @underlyingSource: underlyingSource, - @pull: @onPullDirectStream, + @pull: @onPullArrayBufferSink, @controlledReadableStream: this, @sink: sink, close: @onCloseDirectStream, |