diff options
Diffstat (limited to 'src/js/node/stream.js')
-rw-r--r-- | src/js/node/stream.js | 427 |
1 files changed, 32 insertions, 395 deletions
diff --git a/src/js/node/stream.js b/src/js/node/stream.js index d91ec2f6c..d7d984cb8 100644 --- a/src/js/node/stream.js +++ b/src/js/node/stream.js @@ -2,7 +2,7 @@ // "readable-stream" npm package // just transpiled and debug logs added. -const EE = require("node:events"); +const EE = $lazy("events"); const StringDecoder = require("node:string_decoder").StringDecoder; var __getOwnPropNames = Object.getOwnPropertyNames; @@ -24,275 +24,6 @@ function validateBoolean(value, name) { $debug("node:stream loaded"); -function highWaterMarkFrom(options, isDuplex, duplexKey) { - return options.highWaterMark != null ? options.highWaterMark : isDuplex ? options[duplexKey] : null; -} -function getDefaultHighWaterMark(objectMode) { - return objectMode ? 16 : 16 * 1024; -} -function getHighWaterMark(state, options, duplexKey, isDuplex) { - const hwm = highWaterMarkFrom(options, isDuplex, duplexKey); - if (hwm != null) { - if (!Number.isInteger(hwm) || hwm < 0) { - const name = isDuplex ? `options.${duplexKey}` : "options.highWaterMark"; - throw new ERR_INVALID_ARG_VALUE(name, hwm); - } - return Math.floor(hwm); - } - - // Default value - return getDefaultHighWaterMark(state.objectMode); -} - -class BufferList { - constructor() { - this.head = null; - this.tail = null; - this.length = 0; - } - push(v) { - const entry = { - data: v, - next: null, - }; - if (this.length > 0) this.tail.next = entry; - else this.head = entry; - this.tail = entry; - ++this.length; - } - unshift(v) { - const entry = { - data: v, - next: this.head, - }; - if (this.length === 0) this.tail = entry; - this.head = entry; - ++this.length; - } - shift() { - if (this.length === 0) return; - const ret = this.head.data; - if (this.length === 1) this.head = this.tail = null; - else this.head = this.head.next; - --this.length; - return ret; - } - clear() { - this.head = this.tail = null; - this.length = 0; - } - join(s) { - if (this.length === 0) return ""; - let p = this.head; - let ret = "" + p.data; - while ((p = p.next) !== null) ret += s + p.data; - return ret; - } - concat(n) { - if (this.length === 0) return Buffer.alloc(0); - const ret = Buffer.allocUnsafe(n >>> 0); - let p = this.head; - let i = 0; - while (p) { - ret.set(p.data, i); - i += p.data.length; - p = p.next; - } - return ret; - } - - // Consumes a specified amount of bytes or characters from the buffered data. - consume(n, hasStrings) { - const data = this.head.data; - if (n < data.length) { - // `slice` is the same for buffers and strings. - const slice = data.slice(0, n); - this.head.data = data.slice(n); - return slice; - } - if (n === data.length) { - // First chunk is a perfect match. - return this.shift(); - } - // Result spans more than one buffer. - return hasStrings ? this._getString(n) : this._getBuffer(n); - } - first() { - return this.head.data; - } - *[Symbol.iterator]() { - for (let p = this.head; p; p = p.next) { - yield p.data; - } - } - - // Consumes a specified amount of characters from the buffered data. - _getString(n) { - let ret = ""; - let p = this.head; - let c = 0; - do { - const str = p.data; - if (n > str.length) { - ret += str; - n -= str.length; - } else { - if (n === str.length) { - ret += str; - ++c; - if (p.next) this.head = p.next; - else this.head = this.tail = null; - } else { - ret += str.slice(0, n); - this.head = p; - p.data = str.slice(n); - } - break; - } - ++c; - } while ((p = p.next) !== null); - this.length -= c; - return ret; - } - - // Consumes a specified amount of bytes from the buffered data. - _getBuffer(n) { - const ret = Buffer.allocUnsafe(n); - const retLen = n; - let p = this.head; - let c = 0; - do { - const buf = p.data; - if (n > buf.length) { - ret.set(buf, retLen - n); - n -= buf.length; - } else { - if (n === buf.length) { - ret.set(buf, retLen - n); - ++c; - if (p.next) this.head = p.next; - else this.head = this.tail = null; - } else { - ret.set(new Uint8Array(buf.buffer, buf.byteOffset, n), retLen - n); - this.head = p; - p.data = buf.slice(n); - } - break; - } - ++c; - } while ((p = p.next) !== null); - this.length -= c; - return ret; - } - - // Make sure the linked list only shows the minimal necessary information. - [Symbol.for("nodejs.util.inspect.custom")](_, options) { - return inspect(this, { - ...options, - // Only inspect one level. - depth: 0, - // It should not recurse. - customInspect: false, - }); - } -} - -function ReadableState(options, stream, isDuplex) { - // Duplex streams are both readable and writable, but share - // the same options object. - // However, some cases require setting options to different - // values for the readable and the writable sides of the duplex stream. - // These options can be provided separately as readableXXX and writableXXX. - if (typeof isDuplex !== "boolean") isDuplex = stream instanceof Duplex; - - // Object stream flag. Used to make read(n) ignore n and to - // make all the buffer merging and length checks go away. - this.objectMode = !!(options && options.objectMode); - if (isDuplex) this.objectMode = this.objectMode || !!(options && options.readableObjectMode); - - // The point at which it stops calling _read() to fill the buffer - // Note: 0 is a valid value, means "don't call _read preemptively ever" - this.highWaterMark = options - ? getHighWaterMark(this, options, "readableHighWaterMark", isDuplex) - : getDefaultHighWaterMark(false); - - // A linked list is used to store data chunks instead of an array because the - // linked list can remove elements from the beginning faster than - // array.shift(). - this.buffer = new BufferList(); - this.length = 0; - this.pipes = []; - this.flowing = null; - this.ended = false; - this.endEmitted = false; - this.reading = false; - - // Stream is still being constructed and cannot be - // destroyed until construction finished or failed. - // Async construction is opt in, therefore we start as - // constructed. - this.constructed = true; - - // A flag to be able to tell if the event 'readable'/'data' is emitted - // immediately, or on a later tick. We set this to true at first, because - // any actions that shouldn't happen until "later" should generally also - // not happen before the first read call. - this.sync = true; - - // Whenever we return null, then we set a flag to say - // that we're awaiting a 'readable' event emission. - this.needReadable = false; - this.emittedReadable = false; - this.readableListening = false; - this.resumeScheduled = false; - // this[kPaused] = null - - // True if the error was already emitted and should not be thrown again. - this.errorEmitted = false; - - // Should close be emitted on destroy. Defaults to true. - this.emitClose = !options || options.emitClose !== false; - - // Should .destroy() be called after 'end' (and potentially 'finish'). - this.autoDestroy = !options || options.autoDestroy !== false; - - // Has it been destroyed. - this.destroyed = false; - - // Indicates whether the stream has errored. When true no further - // _read calls, 'data' or 'readable' events should occur. This is needed - // since when autoDestroy is disabled we need a way to tell whether the - // stream has failed. - this.errored = null; - - // Indicates whether the stream has finished destroying. - this.closed = false; - - // True if close has been emitted or would have been emitted - // depending on emitClose. - this.closeEmitted = false; - - // Crypto is kind of old and crusty. Historically, its default string - // encoding is 'binary' so we have to make this configurable. - // Everything else in the universe uses 'utf8', though. - this.defaultEncoding = (options && options.defaultEncoding) || "utf8"; - - // Ref the piped dest which we need a drain event on it - // type: null | Writable | Set<Writable>. - this.awaitDrainWriters = null; - this.multiAwaitDrain = false; - - // If true, a maybeReadMore has been scheduled. - this.readingMore = false; - this.dataEmitted = false; - this.decoder = null; - this.encoding = null; - if (options && options.encoding) { - this.decoder = new StringDecoder(options.encoding); - this.encoding = options.encoding; - } -} - /** * @callback validateObject * @param {*} value @@ -311,7 +42,7 @@ const validateObject = (value, name, options = null) => { const nullable = options?.nullable ?? false; if ( (!nullable && value === null) || - (!allowArray && $isArray(value)) || + (!allowArray && ArrayIsArray(value)) || (typeof value !== "object" && (!allowFunction || typeof value !== "function")) ) { throw new ERR_INVALID_ARG_TYPE(name, "Object", value); @@ -330,6 +61,8 @@ function validateString(value, name) { if (typeof value !== "string") throw new ERR_INVALID_ARG_TYPE(name, "string", value); } +var ArrayIsArray = Array.isArray; + //------------------------------------------------------------------------------ // Node error polyfills //------------------------------------------------------------------------------ @@ -348,7 +81,7 @@ var require_primordials = __commonJS({ "use strict"; module.exports = { ArrayIsArray(self) { - return $isArray(self); + return Array.isArray(self); }, ArrayPrototypeIncludes(self, el) { return self.includes(el); @@ -432,6 +165,9 @@ var require_primordials = __commonJS({ SymbolAsyncIterator: Symbol.asyncIterator, SymbolHasInstance: Symbol.hasInstance, SymbolIterator: Symbol.iterator, + TypedArrayPrototypeSet(self, buf, len) { + return self.set(buf, len); + }, Uint8Array, }; }, @@ -450,7 +186,21 @@ var require_util = __commonJS({ : function isBlob2(b) { return false; }; - + var AggregateError = class extends Error { + constructor(errors) { + if (!Array.isArray(errors)) { + throw new TypeError(`Expected input to be an Array, got ${typeof errors}`); + } + let message = ""; + for (let i = 0; i < errors.length; i++) { + message += ` ${errors[i].stack} +`; + } + super(message); + this.name = "AggregateError"; + this.errors = errors; + } + }; module.exports = { AggregateError, once(callback) { @@ -549,8 +299,8 @@ var require_util = __commonJS({ var require_errors = __commonJS({ "node_modules/readable-stream/lib/ours/errors.js"(exports, module) { "use strict"; - var { format, inspect } = require_util(); - var AggregateError = globalThis.AggregateError; + var { format, inspect, AggregateError: CustomAggregateError } = require_util(); + var AggregateError = globalThis.AggregateError || CustomAggregateError; var kIsNodeError = Symbol("kIsNodeError"); var kTypes = ["string", "function", "number", "object", "Function", "Object", "boolean", "bigint", "symbol"]; var classRegExp = /^([A-Z][a-z0-9]*)+$/; @@ -2510,6 +2260,7 @@ var require_readable = __commonJS({ Symbol: Symbol2, } = require_primordials(); + var ReadableState = $lazy("bun:stream").ReadableState; var { Stream, prependListener } = require_legacy(); function Readable(options) { @@ -2759,127 +2510,15 @@ var require_readable = __commonJS({ var { addAbortSignal } = require_add_abort_signal(); var eos = require_end_of_stream(); + const { maybeReadMore: _maybeReadMore, resume, emitReadable: _emitReadable, onEofChunk } = $lazy("bun:stream"); function maybeReadMore(stream, state) { - if (!state.readingMore && state.constructed) { - state.readingMore = true; - process.nextTick(maybeReadMore_, stream, state); - } + process.nextTick(_maybeReadMore, stream, state); } - function maybeReadMore_(stream, state) { - // Attempt to read more data if we should. - // - // The conditions for reading more data are (one of): - // - Not enough data buffered (state.length < state.highWaterMark). The loop - // is responsible for filling the buffer with enough data if such data - // is available. If highWaterMark is 0 and we are not in the flowing mode - // we should _not_ attempt to buffer any extra data. We'll get more data - // when the stream consumer calls read() instead. - // - No data in the buffer, and the stream is in flowing mode. In this mode - // the loop below is responsible for ensuring read() is called. Failing to - // call read here would abort the flow and there's no other mechanism for - // continuing the flow if the stream consumer has just subscribed to the - // 'data' event. - // - // In addition to the above conditions to keep reading data, the following - // conditions prevent the data from being read: - // - The stream has ended (state.ended). - // - There is already a pending 'read' operation (state.reading). This is a - // case where the stream has called the implementation defined _read() - // method, but they are processing the call asynchronously and have _not_ - // called push() with new data. In this case we skip performing more - // read()s. The execution ends in this method again after the _read() ends - // up calling push() with more data. - while ( - !state.reading && - !state.ended && - (state.length < state.highWaterMark || (state.flowing && state.length === 0)) - ) { - const len = state.length; - $debug("maybeReadMore read 0"); - stream.read(0); - if (len === state.length) - // Didn't get any data, stop spinning. - break; - } - state.readingMore = false; - } - // Don't emit readable right away in sync mode, because this can trigger - // another read() call => stack overflow. This way, it might trigger - // a nextTick recursion warning, but that's not so bad. - function emitReadable(stream) { - const state = stream._readableState; - $debug("emitReadable", state.needReadable, state.emittedReadable); - state.needReadable = false; - if (!state.emittedReadable) { - $debug("emitReadable", state.flowing); - state.emittedReadable = true; - process.nextTick(emitReadable_, stream); - } - } - function emitReadable_(stream) { - const state = stream._readableState; - $debug("emitReadable_", state.destroyed, state.length, state.ended); - if (!state.destroyed && !state.errored && (state.length || state.ended)) { - stream.emit("readable"); - state.emittedReadable = false; - } - - // The stream needs another readable event if: - // 1. It is not flowing, as the flow mechanism will take - // care of it. - // 2. It is not ended. - // 3. It is below the highWaterMark, so we can schedule - // another readable later. - state.needReadable = !state.flowing && !state.ended && state.length <= state.highWaterMark; - flow(stream); - } - function flow(stream) { - const state = stream._readableState; - $debug("flow", state.flowing); - while (state.flowing && stream.read() !== null); - } - function onEofChunk(stream, state) { - $debug("onEofChunk"); - if (state.ended) return; - if (state.decoder) { - const chunk = state.decoder.end(); - if (chunk && chunk.length) { - state.buffer.push(chunk); - state.length += state.objectMode ? 1 : chunk.length; - } - } - state.ended = true; - if (state.sync) { - // If we are sync, wait until next tick to emit the data. - // Otherwise we risk emitting data in the flow() - // the readable code triggers during a read() call. - emitReadable(stream); - } else { - // Emit 'readable' now to make sure it gets picked up. - state.needReadable = false; - state.emittedReadable = true; - // We have to emit readable now that we are EOF. Modules - // in the ecosystem (e.g. dicer) rely on this event being sync. - emitReadable_(stream); - } - } - function resume(stream, state) { - if (!state.resumeScheduled) { - state.resumeScheduled = true; - process.nextTick(resume_, stream, state); - } + // REVERT ME + function emitReadable(stream, state) { + $debug("NativeReadable - emitReadable", stream.__id); + _emitReadable(stream, state); } - function resume_(stream, state) { - $debug("resume", state.reading); - if (!state.reading) { - stream.read(0); - } - state.resumeScheduled = false; - stream.emit("resume"); - flow(stream); - if (state.flowing && !state.reading) stream.read(0); - } - var destroyImpl = require_destroy(); var { aggregateTwoErrors, @@ -5968,6 +5607,4 @@ exports[Symbol.for("::bunternal::")] = { _ReadableFromWeb, _ReadableFromWebForUn exports.eos = require_end_of_stream(); exports.EventEmitter = EE; -var Duplex = exports.Duplex; - export default exports; |