aboutsummaryrefslogtreecommitdiff
path: root/src/bun.js/builtins/js/ReadableStreamInternals.js
diff options
context:
space:
mode:
authorGravatar Jarred Sumner <709451+Jarred-Sumner@users.noreply.github.com> 2022-06-26 06:01:22 -0700
committerGravatar Jarred Sumner <709451+Jarred-Sumner@users.noreply.github.com> 2022-06-26 06:01:22 -0700
commit77a0f335cb0f18af4e03713583b98e0e1b024b33 (patch)
treef6ed90a992cb46677ab597bba4f6db2fbfcba3b1 /src/bun.js/builtins/js/ReadableStreamInternals.js
parent31cfcf2c9f40520dac72530ec62e765d3a0de221 (diff)
downloadbun-77a0f335cb0f18af4e03713583b98e0e1b024b33.tar.gz
bun-77a0f335cb0f18af4e03713583b98e0e1b024b33.tar.zst
bun-77a0f335cb0f18af4e03713583b98e0e1b024b33.zip
wip ReadableStream for HTTP(s) Server
Diffstat (limited to 'src/bun.js/builtins/js/ReadableStreamInternals.js')
-rw-r--r--src/bun.js/builtins/js/ReadableStreamInternals.js104
1 files changed, 100 insertions, 4 deletions
diff --git a/src/bun.js/builtins/js/ReadableStreamInternals.js b/src/bun.js/builtins/js/ReadableStreamInternals.js
index 3e6590f31..f988aa091 100644
--- a/src/bun.js/builtins/js/ReadableStreamInternals.js
+++ b/src/bun.js/builtins/js/ReadableStreamInternals.js
@@ -601,10 +601,106 @@ function isReadableStreamDefaultController(controller)
@globalPrivate
-function assignDirectStream() {
+function assignToStream(stream, sink) {
"use strict";
- var stream = this;
+ // The stream is either a direct stream or a "default" JS stream
+ const underlyingSource = @getByIdDirectPrivate(stream, "underlyingSource");
+
+ // we know it's a direct stream when @underlyingSource is set
+ if (underlyingSource) {
+ var originalClose = underlyingSource.close;
+ var reader;
+ var close = (reason) => {
+ originalClose && originalClose(reason);
+ try {
+ reader && reader.releaseLock();
+ } catch (e) {}
+ @readableStreamClose(stream, reason);
+
+ }
+ var pull = underlyingSource.pull;
+
+ @putByIdDirectPrivate(stream, "readableStreamController", sink);
+ @putByIdDirectPrivate(stream, "start", @undefined);
+ @putByIdDirectPrivate(stream, "underlyingSource", @undefined);
+
+ @startDirectStream.@call(sink, stream, pull, close);
+
+ if (!pull) {
+ close();
+ return;
+ }
+
+ if (!@isCallable(pull)) {
+ close();
+ @throwTypeError("pull is not a function");
+ return;
+ }
+
+ // lock the stream, relying on close() or end() to eventaully close it
+ reader = stream.getReader();
+
+ pull(sink);
+ return;
+ }
+
+
+ return (async function() {
+ "use strict";
+
+ var didClose = false;
+
+
+ try {
+ var reader = stream.getReader();
+ reader.closed.then(() => {
+ if (!didClose && sink) {
+ didClose = true;
+ sink.end();
+ }
+ }, (e) => {
+ if (!didClose && sink) {
+ didClose = true;
+ sink.close(e);
+ }
+ });
+
+ var many = reader.readMany();
+ if (many && @isPromise(many)) {
+ many = await many;
+ }
+
+ if (many.done) {
+ didClose = true;
+ sink.end();
+ return;
+ }
+
+ sink.start();
+ var wroteCount = many.value.length;
+ for (var i = 0, values = many.value, length = many.value.length; i < length; i++) {
+ sink.write(values[i]);
+ }
+
+ if (wroteCount > 0) {
+ sink.drain();
+ }
+
+ while (true) {
+ var result = await reader.read();
+ if (result.done) {
+ didClose = true;
+ return sink.end();
+ }
+
+ sink.write(result.value);
+ }
+ } catch (e) {
+ globalThis.console.error(e);
+
+ }
+ })();
}
@@ -645,7 +741,7 @@ function handleDirectStreamErrorReject(e) {
return @Promise.@reject(e);
}
-function onPullDirectStream(controller)
+function onPullArrayBufferSink(controller)
{
"use strict";
@@ -853,7 +949,7 @@ function initializeArrayBufferStream(underlyingSource, highWaterMark)
var controller = {
@underlyingSource: underlyingSource,
- @pull: @onPullDirectStream,
+ @pull: @onPullArrayBufferSink,
@controlledReadableStream: this,
@sink: sink,
close: @onCloseDirectStream,