diff options
author | 2022-06-03 04:44:11 -0700 | |
---|---|---|
committer | 2022-06-03 04:44:11 -0700 | |
commit | e5322eb63bf6ff8deb0f2aade42b9caae40a47cc (patch) | |
tree | 87b72fe1895f92068e908aaaddafe5ef514d8daf /src/javascript/jsc/bindings/builtins/js | |
parent | 102553dca6a090533725416cc384d36a49a9ce1d (diff) | |
download | bun-e5322eb63bf6ff8deb0f2aade42b9caae40a47cc.tar.gz bun-e5322eb63bf6ff8deb0f2aade42b9caae40a47cc.tar.zst bun-e5322eb63bf6ff8deb0f2aade42b9caae40a47cc.zip |
Move streams to it's own file
Diffstat (limited to 'src/javascript/jsc/bindings/builtins/js')
-rw-r--r-- | src/javascript/jsc/bindings/builtins/js/ReadableStream.js | 159 | ||||
-rw-r--r-- | src/javascript/jsc/bindings/builtins/js/ReadableStreamInternals.js | 2 |
2 files changed, 160 insertions, 1 deletions
diff --git a/src/javascript/jsc/bindings/builtins/js/ReadableStream.js b/src/javascript/jsc/bindings/builtins/js/ReadableStream.js index f3ab66d5c..e3f4cbdf8 100644 --- a/src/javascript/jsc/bindings/builtins/js/ReadableStream.js +++ b/src/javascript/jsc/bindings/builtins/js/ReadableStream.js @@ -89,6 +89,165 @@ function initializeReadableStream(underlyingSource, strategy) } @globalPrivate +function readableStreamToArray(stream) { + "use strict"; + + if (@getByIdDirectPrivate(stream, "state") === @streamClosed) { + return null; + } + var reader = stream.getReader(); + + var manyResult = reader.readMany(); + + var processManyResult = (0, (async function(manyResult) { + if (result.done) { + return null; + } + + var chunks = result.value; + + while (true) { + var thisResult = await reader.read(); + if (thisResult.done) { + return chunks; + } + + chunks.push(thisResult.value); + } + + return chunks; + })); + + + if (manyResult && @isPromise(manyResult)) { + return manyResult.then(processManyResult); + } + + if (manyResult && manyResult.done) { + return null; + } + + return processManyResult(manyResult); +} + + + +@globalPrivate +function consumeReadableStream(nativePtr, nativeType, inputStream) { + "use strict"; + const symbol = Symbol.for("Bun.consumeReadableStreamPrototype"); + var cached = globalThis[symbol]; + if (!cached) { + cached = globalThis[symbol] = []; + } + var Prototype = cached[nativeType]; + if (Prototype === @undefined) { + var [doRead, doError, doReadMany, doClose, onClose, deinit] = globalThis[Symbol.for("Bun.lazy")](nativeType); + + Prototype = class NativeReadableStreamSink { + constructor(reader, ptr) { + this.#ptr = ptr; + this.#reader = reader; + this.#didClose = false; + + this.handleError = this._handleError.bind(this); + this.handleClosed = this._handleClosed.bind(this); + this.processResult = this._processResult.bind(this); + + reader.closed.then(this.handleClosed, this.handleError); + } + + handleError; + handleClosed; + _handleClosed() { + if (this.#didClose) return; + this.#didClose = true; + var ptr = this.#ptr; + this.#ptr = 0; + doClose(ptr); + deinit(ptr); + } + + _handleError(error) { + if (this.#didClose) return; + this.#didClose = true; + var ptr = this.#ptr; + this.#ptr = 0; + doError(ptr, error); + deinit(ptr); + } + + #ptr; + #didClose = false; + #reader; + + _handleReadMany({value, done, size}) { + if (done) { + this.handleClosed(); + return; + } + + if (this.#didClose) return; + + + doReadMany(this.#ptr, value, done, size); + } + + + read() { + if (!this.#ptr) return @throwTypeError("ReadableStreamSink is already closed"); + + return this.processResult(this.#reader.read()); + + } + + _processResult(result) { + if (result && @isPromise(result)) { + const flags = @getPromiseInternalField(result, @promiseFieldFlags); + if (flags & @promiseStateFulfilled) { + const fulfilledValue = @getPromiseInternalField(result, @promiseFieldReactionsOrResult); + if (fulfilledValue) { + result = fulfilledValue; + } + } + } + + if (result && @isPromise(result)) { + result.then(this.processResult, this.handleError); + return null; + } + + if (result.done) { + this.handleClosed(); + return 0; + } else if (result.value) { + return result.value; + } else { + return -1; + } + } + + readMany() { + if (!this.#ptr) return @throwTypeError("ReadableStreamSink is already closed"); + return this.processResult(this.#reader.readMany()); + } + }; + + const minlength = nativeType + 1; + if (cached.length < minlength) { + cached.length = minlength; + } + @putByValDirect(cached, nativeType, Prototype); + } + + if (@isReadableStreamLocked(inputStream)) { + @throwTypeError("Cannot start reading from a locked stream"); + } + + return new Prototype(inputStream.getReader(), nativePtr); +} + +@globalPrivate function createEmptyReadableStream() { var stream = new @ReadableStream({ pull() {}, diff --git a/src/javascript/jsc/bindings/builtins/js/ReadableStreamInternals.js b/src/javascript/jsc/bindings/builtins/js/ReadableStreamInternals.js index 18e82262c..09387c9f1 100644 --- a/src/javascript/jsc/bindings/builtins/js/ReadableStreamInternals.js +++ b/src/javascript/jsc/bindings/builtins/js/ReadableStreamInternals.js @@ -732,7 +732,7 @@ function readableStreamDefaultControllerEnqueue(controller, chunk) // this is checked by callers @assert(@readableStreamDefaultControllerCanCloseOrEnqueue(controller)); - if (@isReadableStreamLocked(stream) && @getByIdDirectPrivate(@getByIdDirectPrivate(stream, "reader"), "readRequests").isNotEmpty) { + if (@isReadableStreamLocked(stream) && @getByIdDirectPrivate(@getByIdDirectPrivate(stream, "reader"), "readRequests")?.isNotEmpty()) { @readableStreamFulfillReadRequest(stream, chunk, false); @readableStreamDefaultControllerCallPullIfNeeded(controller); return; |