aboutsummaryrefslogtreecommitdiff
path: root/src/javascript/jsc/bindings/builtins/js
diff options
context:
space:
mode:
Diffstat (limited to 'src/javascript/jsc/bindings/builtins/js')
-rw-r--r--src/javascript/jsc/bindings/builtins/js/ReadableStream.js159
-rw-r--r--src/javascript/jsc/bindings/builtins/js/ReadableStreamInternals.js2
2 files changed, 160 insertions, 1 deletions
diff --git a/src/javascript/jsc/bindings/builtins/js/ReadableStream.js b/src/javascript/jsc/bindings/builtins/js/ReadableStream.js
index f3ab66d5c..e3f4cbdf8 100644
--- a/src/javascript/jsc/bindings/builtins/js/ReadableStream.js
+++ b/src/javascript/jsc/bindings/builtins/js/ReadableStream.js
@@ -89,6 +89,165 @@ function initializeReadableStream(underlyingSource, strategy)
}
@globalPrivate
+function readableStreamToArray(stream) {
+ "use strict";
+
+ if (@getByIdDirectPrivate(stream, "state") === @streamClosed) {
+ return null;
+ }
+ var reader = stream.getReader();
+
+ var manyResult = reader.readMany();
+
+ var processManyResult = (0, (async function(manyResult) {
+ if (result.done) {
+ return null;
+ }
+
+ var chunks = result.value;
+
+ while (true) {
+ var thisResult = await reader.read();
+ if (thisResult.done) {
+ return chunks;
+ }
+
+ chunks.push(thisResult.value);
+ }
+
+ return chunks;
+ }));
+
+
+ if (manyResult && @isPromise(manyResult)) {
+ return manyResult.then(processManyResult);
+ }
+
+ if (manyResult && manyResult.done) {
+ return null;
+ }
+
+ return processManyResult(manyResult);
+}
+
+
+
+@globalPrivate
+function consumeReadableStream(nativePtr, nativeType, inputStream) {
+ "use strict";
+ const symbol = Symbol.for("Bun.consumeReadableStreamPrototype");
+ var cached = globalThis[symbol];
+ if (!cached) {
+ cached = globalThis[symbol] = [];
+ }
+ var Prototype = cached[nativeType];
+ if (Prototype === @undefined) {
+ var [doRead, doError, doReadMany, doClose, onClose, deinit] = globalThis[Symbol.for("Bun.lazy")](nativeType);
+
+ Prototype = class NativeReadableStreamSink {
+ constructor(reader, ptr) {
+ this.#ptr = ptr;
+ this.#reader = reader;
+ this.#didClose = false;
+
+ this.handleError = this._handleError.bind(this);
+ this.handleClosed = this._handleClosed.bind(this);
+ this.processResult = this._processResult.bind(this);
+
+ reader.closed.then(this.handleClosed, this.handleError);
+ }
+
+ handleError;
+ handleClosed;
+ _handleClosed() {
+ if (this.#didClose) return;
+ this.#didClose = true;
+ var ptr = this.#ptr;
+ this.#ptr = 0;
+ doClose(ptr);
+ deinit(ptr);
+ }
+
+ _handleError(error) {
+ if (this.#didClose) return;
+ this.#didClose = true;
+ var ptr = this.#ptr;
+ this.#ptr = 0;
+ doError(ptr, error);
+ deinit(ptr);
+ }
+
+ #ptr;
+ #didClose = false;
+ #reader;
+
+ _handleReadMany({value, done, size}) {
+ if (done) {
+ this.handleClosed();
+ return;
+ }
+
+ if (this.#didClose) return;
+
+
+ doReadMany(this.#ptr, value, done, size);
+ }
+
+
+ read() {
+ if (!this.#ptr) return @throwTypeError("ReadableStreamSink is already closed");
+
+ return this.processResult(this.#reader.read());
+
+ }
+
+ _processResult(result) {
+ if (result && @isPromise(result)) {
+ const flags = @getPromiseInternalField(result, @promiseFieldFlags);
+ if (flags & @promiseStateFulfilled) {
+ const fulfilledValue = @getPromiseInternalField(result, @promiseFieldReactionsOrResult);
+ if (fulfilledValue) {
+ result = fulfilledValue;
+ }
+ }
+ }
+
+ if (result && @isPromise(result)) {
+ result.then(this.processResult, this.handleError);
+ return null;
+ }
+
+ if (result.done) {
+ this.handleClosed();
+ return 0;
+ } else if (result.value) {
+ return result.value;
+ } else {
+ return -1;
+ }
+ }
+
+ readMany() {
+ if (!this.#ptr) return @throwTypeError("ReadableStreamSink is already closed");
+ return this.processResult(this.#reader.readMany());
+ }
+ };
+
+ const minlength = nativeType + 1;
+ if (cached.length < minlength) {
+ cached.length = minlength;
+ }
+ @putByValDirect(cached, nativeType, Prototype);
+ }
+
+ if (@isReadableStreamLocked(inputStream)) {
+ @throwTypeError("Cannot start reading from a locked stream");
+ }
+
+ return new Prototype(inputStream.getReader(), nativePtr);
+}
+
+@globalPrivate
function createEmptyReadableStream() {
var stream = new @ReadableStream({
pull() {},
diff --git a/src/javascript/jsc/bindings/builtins/js/ReadableStreamInternals.js b/src/javascript/jsc/bindings/builtins/js/ReadableStreamInternals.js
index 18e82262c..09387c9f1 100644
--- a/src/javascript/jsc/bindings/builtins/js/ReadableStreamInternals.js
+++ b/src/javascript/jsc/bindings/builtins/js/ReadableStreamInternals.js
@@ -732,7 +732,7 @@ function readableStreamDefaultControllerEnqueue(controller, chunk)
// this is checked by callers
@assert(@readableStreamDefaultControllerCanCloseOrEnqueue(controller));
- if (@isReadableStreamLocked(stream) && @getByIdDirectPrivate(@getByIdDirectPrivate(stream, "reader"), "readRequests").isNotEmpty) {
+ if (@isReadableStreamLocked(stream) && @getByIdDirectPrivate(@getByIdDirectPrivate(stream, "reader"), "readRequests")?.isNotEmpty()) {
@readableStreamFulfillReadRequest(stream, chunk, false);
@readableStreamDefaultControllerCallPullIfNeeded(controller);
return;