diff options
Diffstat (limited to 'src/bun.js/streams.exports.js')
-rw-r--r-- | src/bun.js/streams.exports.js | 199 |
1 files changed, 197 insertions, 2 deletions
diff --git a/src/bun.js/streams.exports.js b/src/bun.js/streams.exports.js index 979ef32f6..576f3ea89 100644 --- a/src/bun.js/streams.exports.js +++ b/src/bun.js/streams.exports.js @@ -1,5 +1,6 @@ // "readable-stream" npm package // just transpiled +var { isPromise } = import.meta.primordials; var __create = Object.create; var __defProp = Object.defineProperty; @@ -33,6 +34,76 @@ var __copyProps = (to, from, except, desc) => { var runOnNextTick = process.nextTick; +function isReadableStream(value) { + return ( + typeof value === "object" && + value !== null && + value instanceof ReadableStream + ); +} + +function validateBoolean(value, name) { + if (typeof value !== "boolean") + throw new ERR_INVALID_ARG_TYPE(name, "boolean", value); +} + +/** + * @callback validateObject + * @param {*} value + * @param {string} name + * @param {{ + * allowArray?: boolean, + * allowFunction?: boolean, + * nullable?: boolean + * }} [options] + */ + +/** @type {validateObject} */ +const validateObject = (value, name, options = null) => { + const allowArray = options?.allowArray ?? false; + const allowFunction = options?.allowFunction ?? false; + const nullable = options?.nullable ?? false; + if ( + (!nullable && value === null) || + (!allowArray && ArrayIsArray(value)) || + (typeof value !== "object" && + (!allowFunction || typeof value !== "function")) + ) { + throw new ERR_INVALID_ARG_TYPE(name, "Object", value); + } +}; + +/** + * @callback validateString + * @param {*} value + * @param {string} name + * @returns {asserts value is string} + */ + +/** @type {validateString} */ +function validateString(value, name) { + if (typeof value !== "string") + throw new ERR_INVALID_ARG_TYPE(name, "string", value); +} + +var ArrayIsArray = Array.isArray; + +//------------------------------------------------------------------------------ +// Node error polyfills +//------------------------------------------------------------------------------ + +function ERR_INVALID_ARG_TYPE(name, type, value) { + return new Error( + `The argument '${name}' is invalid. Received '${value}' for type '${type}'` + ); +} + +function ERR_INVALID_ARG_VALUE(name, value, reason) { + return new Error( + `The value '${value}' is invalid for argument '${name}'. Reason: ${reason}` + ); +} + // node_modules/readable-stream/lib/ours/primordials.js var require_primordials = __commonJS({ "node_modules/readable-stream/lib/ours/primordials.js"(exports, module) { @@ -2509,7 +2580,9 @@ var require_readable = __commonJS({ const state = this._readableState; if (ev === "data") { state.readableListening = this.listenerCount("readable") > 0; - if (state.flowing !== false) this.resume(); + if (state.flowing !== false) { + this.resume(); + } } else if (ev === "readable") { if (!state.endEmitted && !state.readableListening) { state.readableListening = state.needReadable = true; @@ -2528,6 +2601,126 @@ var require_readable = __commonJS({ static ReadableState = ReadableState; } + + class ReadableFromWeb extends Readable { + #reader; + #closed; + + constructor(options) { + const { objectMode, highWaterMark, encoding, signal, reader } = options; + super({ + objectMode, + highWaterMark, + encoding, + signal, + }); + + this.#reader = reader; + this.#reader.closed + .then(() => { + this.#closed = true; + }) + .catch((error) => { + this.#closed = true; + destroy(this, error); + }); + } + + async _read() { + var deferredError; + try { + var done, value; + const firstResult = this.#reader.readMany(); + + if (isPromise(firstResult)) { + const result = await firstResult; + done = result.done; + value = result.value; + } else { + done = firstResult.done; + value = firstResult.value; + } + + if (done) { + this.push(null); + return; + } + + if (!value) + throw new Error( + `Invalid value from ReadableStream reader: ${value}` + ); + if (ArrayIsArray(value)) { + this.push(...value); + } else { + this.push(value); + } + } catch (e) { + deferredError = e; + } finally { + if (deferredError) throw deferredError; + } + } + + _destroy(error, callback) { + if (!this.#closed) { + this.#reader.releaseLock(); + this.#reader.cancel(error).then(done).catch(done); + return; + } + try { + callback(error); + } catch (error) { + globalThis.reportError(error); + } + } + + // NOTE(Derrick): For whatever reason this seems to be necessary to make this work + // I couldn't find out why .constructed was getting set to false + // even though construct() was getting called + _construct() { + this._readableState.constructed = true; + } + } + + /** + * @param {ReadableStream} readableStream + * @param {{ + * highWaterMark? : number, + * encoding? : string, + * objectMode? : boolean, + * signal? : AbortSignal, + * }} [options] + * @returns {Readable} + */ + function newStreamReadableFromReadableStream(readableStream, options = {}) { + if (!isReadableStream(readableStream)) { + throw new ERR_INVALID_ARG_TYPE( + "readableStream", + "ReadableStream", + readableStream + ); + } + + validateObject(options, "options"); + const { highWaterMark, encoding, objectMode = false, signal } = options; + + if (encoding !== undefined && !Buffer.isEncoding(encoding)) + throw new ERR_INVALID_ARG_VALUE(encoding, "options.encoding"); + validateBoolean(objectMode, "options.objectMode"); + + const reader = readableStream.getReader(); + const readable = new ReadableFromWeb({ + highWaterMark, + encoding, + objectMode, + signal, + reader, + }); + + return readable; + } + module.exports = Readable; var { addAbortSignal } = require_add_abort_signal(); @@ -3327,7 +3520,9 @@ var require_readable = __commonJS({ Readable.from = function (iterable, opts) { return from(Readable, iterable, opts); }; - var webStreamsAdapters; + var webStreamsAdapters = { + newStreamReadableFromReadableStream, + }; function lazyWebStreams() { if (webStreamsAdapters === void 0) webStreamsAdapters = {}; return webStreamsAdapters; |