diff options
Diffstat (limited to 'src/bun.js/builtins/js/ReadableStream.js')
-rw-r--r-- | src/bun.js/builtins/js/ReadableStream.js | 506 |
1 files changed, 506 insertions, 0 deletions
diff --git a/src/bun.js/builtins/js/ReadableStream.js b/src/bun.js/builtins/js/ReadableStream.js new file mode 100644 index 000000000..db7cf85a8 --- /dev/null +++ b/src/bun.js/builtins/js/ReadableStream.js @@ -0,0 +1,506 @@ +/* + * Copyright (C) 2015 Canon Inc. + * Copyright (C) 2015 Igalia. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY APPLE INC. ``AS IS'' AND ANY + * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR + * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL APPLE INC. OR + * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, + * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, + * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR + * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY + * OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +function initializeReadableStream(underlyingSource, strategy) +{ + "use strict"; + + if (underlyingSource === @undefined) + underlyingSource = { @bunNativeType: 0, @bunNativePtr: 0, @lazy: false }; + if (strategy === @undefined) + strategy = { }; + + if (!@isObject(underlyingSource)) + @throwTypeError("ReadableStream constructor takes an object as first argument"); + + if (strategy !== @undefined && !@isObject(strategy)) + @throwTypeError("ReadableStream constructor takes an object as second argument, if any"); + + @putByIdDirectPrivate(this, "state", @streamReadable); + + @putByIdDirectPrivate(this, "reader", @undefined); + + @putByIdDirectPrivate(this, "storedError", @undefined); + + @putByIdDirectPrivate(this, "disturbed", false); + + // Initialized with null value to enable distinction with undefined case. + @putByIdDirectPrivate(this, "readableStreamController", null); + @putByIdDirectPrivate(this, "bunNativeType", @getByIdDirectPrivate(underlyingSource, "bunNativeType") ?? 0); + @putByIdDirectPrivate(this, "bunNativePtr", @getByIdDirectPrivate(underlyingSource, "bunNativePtr") ?? 0); + + const isDirect = underlyingSource.type === "direct"; + // direct streams are always lazy + const isUnderlyingSourceLazy = !!underlyingSource.@lazy; + const isLazy = isDirect || isUnderlyingSourceLazy; + + // // FIXME: We should introduce https://streams.spec.whatwg.org/#create-readable-stream. + // // For now, we emulate this with underlyingSource with private properties. + if (@getByIdDirectPrivate(underlyingSource, "pull") !== @undefined && !isLazy) { + const size = @getByIdDirectPrivate(strategy, "size"); + const highWaterMark = @getByIdDirectPrivate(strategy, "highWaterMark"); + @setupReadableStreamDefaultController(this, underlyingSource, size, highWaterMark !== @undefined ? highWaterMark : 1, @getByIdDirectPrivate(underlyingSource, "start"), @getByIdDirectPrivate(underlyingSource, "pull"), @getByIdDirectPrivate(underlyingSource, "cancel")); + + return this; + } + if (isDirect) { + @putByIdDirectPrivate(this, "start", () => @createReadableStreamController(this, underlyingSource, strategy)); + } else if (isLazy) { + const autoAllocateChunkSize = underlyingSource.autoAllocateChunkSize; + + + @putByIdDirectPrivate(this, "start", () => { + const instance = @lazyLoadStream(this, autoAllocateChunkSize); + if (instance) { + @createReadableStreamController(this, instance, strategy); + } + }); + } else { + @putByIdDirectPrivate(this, "start", @undefined); + @createReadableStreamController(this, underlyingSource, strategy); + } + + + return this; +} + + +@globalPrivate +function readableStreamToArray(stream) { + "use strict"; + + if (!stream || @getByIdDirectPrivate(stream, "state") === @streamClosed) { + return null; + } + var reader = stream.getReader(); + var manyResult = reader.readMany(); + + async function processManyResult(result) { + + if (result.done) { + return null; + } + + var chunks = result.value || []; + + while (true) { + var thisResult = await reader.read(); + + if (thisResult.done) { + return chunks; + } + + chunks.push(thisResult.value); + } + + return chunks; + }; + + + if (manyResult && @isPromise(manyResult)) { + return manyResult.@then(processManyResult); + } + + if (manyResult && manyResult.done) { + return null; + } + + return processManyResult(manyResult); +} + +@globalPrivate +function readableStreamToText(stream) { + "use strict"; + + // TODO: optimize this to skip the extra ArrayBuffer + return globalThis.Bun.readableStreamToArrayBuffer(stream).@then(function(arrayBuffer) { + return new globalThis.TextDecoder().decode(arrayBuffer); + }); +} + +@globalPrivate +function readableStreamToJSON(stream) { + "use strict"; + + // TODO: optimize this to skip the extra ArrayBuffer + return globalThis.Bun.readableStreamToArrayBuffer(stream).@then(function(arrayBuffer) { + return globalThis.JSON.parse(new globalThis.TextDecoder().decode(arrayBuffer)); + }); +} + +@globalPrivate +function readableStreamToBlob(stream) { + "use strict"; + + + const array = @readableStreamToArray(stream); + if (array === null) { + return new globalThis.Blob(); + } + + return array.@then(function(chunks) { + if (chunks === null || chunks.length === 0) { + return new globalThis.Blob(); + } + + return new globalThis.Blob(chunks); + }); +} + +@globalPrivate +function readableStreamToArrayPublic(stream) { + "use strict"; + + if (@getByIdDirectPrivate(stream, "state") === @streamClosed) { + return []; + } + var reader = stream.getReader(); + + var manyResult = reader.readMany(); + + var processManyResult = (0, (async function(result) { + if (result.done) { + return []; + } + + var chunks = result.value || []; + + while (true) { + var thisResult = await reader.read(); + if (thisResult.done) { + return chunks; + } + + chunks.push(thisResult.value); + } + + return chunks; + })); + + + if (manyResult && @isPromise(manyResult)) { + return manyResult.then(processManyResult); + } + + if (manyResult && manyResult.done) { + return []; + } + + return processManyResult(manyResult); +} + + + +@globalPrivate +function consumeReadableStream(nativePtr, nativeType, inputStream) { + "use strict"; + const symbol = globalThis.Symbol.for("Bun.consumeReadableStreamPrototype"); + var cached = globalThis[symbol]; + if (!cached) { + cached = globalThis[symbol] = []; + } + var Prototype = cached[nativeType]; + if (Prototype === @undefined) { + var [doRead, doError, doReadMany, doClose, onClose, deinit] = globalThis[globalThis.Symbol.for("Bun.lazy")](nativeType); + + Prototype = class NativeReadableStreamSink { + constructor(reader, ptr) { + this.#ptr = ptr; + this.#reader = reader; + this.#didClose = false; + + this.handleError = this._handleError.bind(this); + this.handleClosed = this._handleClosed.bind(this); + this.processResult = this._processResult.bind(this); + + reader.closed.then(this.handleClosed, this.handleError); + } + + handleError; + handleClosed; + _handleClosed() { + if (this.#didClose) return; + this.#didClose = true; + var ptr = this.#ptr; + this.#ptr = 0; + doClose(ptr); + deinit(ptr); + } + + _handleError(error) { + if (this.#didClose) return; + this.#didClose = true; + var ptr = this.#ptr; + this.#ptr = 0; + doError(ptr, error); + deinit(ptr); + } + + #ptr; + #didClose = false; + #reader; + + _handleReadMany({value, done, size}) { + if (done) { + this.handleClosed(); + return; + } + + if (this.#didClose) return; + + + doReadMany(this.#ptr, value, done, size); + } + + + read() { + if (!this.#ptr) return @throwTypeError("ReadableStreamSink is already closed"); + + return this.processResult(this.#reader.read()); + + } + + _processResult(result) { + if (result && @isPromise(result)) { + const flags = @getPromiseInternalField(result, @promiseFieldFlags); + if (flags & @promiseStateFulfilled) { + const fulfilledValue = @getPromiseInternalField(result, @promiseFieldReactionsOrResult); + if (fulfilledValue) { + result = fulfilledValue; + } + } + } + + if (result && @isPromise(result)) { + result.then(this.processResult, this.handleError); + return null; + } + + if (result.done) { + this.handleClosed(); + return 0; + } else if (result.value) { + return result.value; + } else { + return -1; + } + } + + readMany() { + if (!this.#ptr) return @throwTypeError("ReadableStreamSink is already closed"); + return this.processResult(this.#reader.readMany()); + } + + + }; + + const minlength = nativeType + 1; + if (cached.length < minlength) { + cached.length = minlength; + } + @putByValDirect(cached, nativeType, Prototype); + } + + if (@isReadableStreamLocked(inputStream)) { + @throwTypeError("Cannot start reading from a locked stream"); + } + + return new Prototype(inputStream.getReader(), nativePtr); +} + +@globalPrivate +function createEmptyReadableStream() { + "use strict"; + + var stream = new @ReadableStream({ + pull() {}, + }); + @readableStreamClose(stream); + return stream; +} + +@globalPrivate +function createNativeReadableStream(nativePtr, nativeType, autoAllocateChunkSize) { + "use strict"; + return new @ReadableStream({ + @lazy: true, + @bunNativeType: nativeType, + @bunNativePtr: nativePtr, + autoAllocateChunkSize: autoAllocateChunkSize, + }); +} + +function cancel(reason) +{ + "use strict"; + + if (!@isReadableStream(this)) + return @Promise.@reject(@makeThisTypeError("ReadableStream", "cancel")); + + if (@isReadableStreamLocked(this)) + return @Promise.@reject(@makeTypeError("ReadableStream is locked")); + + return @readableStreamCancel(this, reason); +} + +function getReader(options) +{ + "use strict"; + + if (!@isReadableStream(this)) + throw @makeThisTypeError("ReadableStream", "getReader"); + + const mode = @toDictionary(options, { }, "ReadableStream.getReader takes an object as first argument").mode; + if (mode === @undefined) { + var start_ = @getByIdDirectPrivate(this, "start"); + if (start_) { + @putByIdDirectPrivate(this, "start", @undefined); + start_(); + } + + return new @ReadableStreamDefaultReader(this); + } + // String conversion is required by spec, hence double equals. + if (mode == 'byob') { + return new @ReadableStreamBYOBReader(this); + } + + + @throwTypeError("Invalid mode is specified"); +} + +function pipeThrough(streams, options) +{ + "use strict"; + + const transforms = streams; + + const readable = transforms["readable"]; + if (!@isReadableStream(readable)) + throw @makeTypeError("readable should be ReadableStream"); + + const writable = transforms["writable"]; + const internalWritable = @getInternalWritableStream(writable); + if (!@isWritableStream(internalWritable)) + throw @makeTypeError("writable should be WritableStream"); + + let preventClose = false; + let preventAbort = false; + let preventCancel = false; + let signal; + if (!@isUndefinedOrNull(options)) { + if (!@isObject(options)) + throw @makeTypeError("options must be an object"); + + preventAbort = !!options["preventAbort"]; + preventCancel = !!options["preventCancel"]; + preventClose = !!options["preventClose"]; + + signal = options["signal"]; + if (signal !== @undefined && !@isAbortSignal(signal)) + throw @makeTypeError("options.signal must be AbortSignal"); + } + + if (!@isReadableStream(this)) + throw @makeThisTypeError("ReadableStream", "pipeThrough"); + + if (@isReadableStreamLocked(this)) + throw @makeTypeError("ReadableStream is locked"); + + if (@isWritableStreamLocked(internalWritable)) + throw @makeTypeError("WritableStream is locked"); + + @readableStreamPipeToWritableStream(this, internalWritable, preventClose, preventAbort, preventCancel, signal); + + return readable; +} + +function pipeTo(destination) +{ + "use strict"; + + // FIXME: https://bugs.webkit.org/show_bug.cgi?id=159869. + // Built-in generator should be able to parse function signature to compute the function length correctly. + let options = arguments[1]; + + let preventClose = false; + let preventAbort = false; + let preventCancel = false; + let signal; + if (!@isUndefinedOrNull(options)) { + if (!@isObject(options)) + return @Promise.@reject(@makeTypeError("options must be an object")); + + try { + preventAbort = !!options["preventAbort"]; + preventCancel = !!options["preventCancel"]; + preventClose = !!options["preventClose"]; + + signal = options["signal"]; + } catch(e) { + return @Promise.@reject(e); + } + + if (signal !== @undefined && !@isAbortSignal(signal)) + return @Promise.@reject(@makeTypeError("options.signal must be AbortSignal")); + } + + const internalDestination = @getInternalWritableStream(destination); + if (!@isWritableStream(internalDestination)) + return @Promise.@reject(@makeTypeError("ReadableStream pipeTo requires a WritableStream")); + + if (!@isReadableStream(this)) + return @Promise.@reject(@makeThisTypeError("ReadableStream", "pipeTo")); + + if (@isReadableStreamLocked(this)) + return @Promise.@reject(@makeTypeError("ReadableStream is locked")); + + if (@isWritableStreamLocked(internalDestination)) + return @Promise.@reject(@makeTypeError("WritableStream is locked")); + + return @readableStreamPipeToWritableStream(this, internalDestination, preventClose, preventAbort, preventCancel, signal); +} + +function tee() +{ + "use strict"; + + if (!@isReadableStream(this)) + throw @makeThisTypeError("ReadableStream", "tee"); + + return @readableStreamTee(this, false); +} + +@getter +function locked() +{ + "use strict"; + + if (!@isReadableStream(this)) + throw @makeGetterTypeError("ReadableStream", "locked"); + + return @isReadableStreamLocked(this); +} |