diff options
Diffstat (limited to 'src/js/node/stream.js')
-rw-r--r-- | src/js/node/stream.js | 427 |
1 files changed, 395 insertions, 32 deletions
diff --git a/src/js/node/stream.js b/src/js/node/stream.js index d7d984cb8..ec131b58f 100644 --- a/src/js/node/stream.js +++ b/src/js/node/stream.js @@ -2,7 +2,7 @@ // "readable-stream" npm package // just transpiled and debug logs added. -const EE = $lazy("events"); +const EE = require("node:events"); const StringDecoder = require("node:string_decoder").StringDecoder; var __getOwnPropNames = Object.getOwnPropertyNames; @@ -24,6 +24,275 @@ function validateBoolean(value, name) { $debug("node:stream loaded"); +function highWaterMarkFrom(options, isDuplex, duplexKey) { + return options.highWaterMark != null ? options.highWaterMark : isDuplex ? options[duplexKey] : null +} +function getDefaultHighWaterMark(objectMode) { + return objectMode ? 16 : 16 * 1024 +} +function getHighWaterMark(state, options, duplexKey, isDuplex) { + const hwm = highWaterMarkFrom(options, isDuplex, duplexKey) + if (hwm != null) { + if (!Number.isInteger(hwm) || hwm < 0) { + const name = isDuplex ? `options.${duplexKey}` : 'options.highWaterMark' + throw new ERR_INVALID_ARG_VALUE(name, hwm) + } + return Math.floor(hwm) + } + + // Default value + return getDefaultHighWaterMark(state.objectMode) +} + +class BufferList { + constructor() { + this.head = null + this.tail = null + this.length = 0 + } + push(v) { + const entry = { + data: v, + next: null + } + if (this.length > 0) this.tail.next = entry + else this.head = entry + this.tail = entry + ++this.length + } + unshift(v) { + const entry = { + data: v, + next: this.head + } + if (this.length === 0) this.tail = entry + this.head = entry + ++this.length + } + shift() { + if (this.length === 0) return + const ret = this.head.data + if (this.length === 1) this.head = this.tail = null + else this.head = this.head.next + --this.length + return ret + } + clear() { + this.head = this.tail = null + this.length = 0 + } + join(s) { + if (this.length === 0) return '' + let p = this.head + let ret = '' + p.data + while ((p = p.next) !== null) ret += s + p.data + return ret + } + concat(n) { + if (this.length === 0) return Buffer.alloc(0) + const ret = Buffer.allocUnsafe(n >>> 0) + let p = this.head + let i = 0 + while (p) { + ret.set(p.data, i) + i += p.data.length + p = p.next + } + return ret + } + + // Consumes a specified amount of bytes or characters from the buffered data. + consume(n, hasStrings) { + const data = this.head.data + if (n < data.length) { + // `slice` is the same for buffers and strings. + const slice = data.slice(0, n) + this.head.data = data.slice(n) + return slice + } + if (n === data.length) { + // First chunk is a perfect match. + return this.shift() + } + // Result spans more than one buffer. + return hasStrings ? this._getString(n) : this._getBuffer(n) + } + first() { + return this.head.data + } + *[Symbol.iterator]() { + for (let p = this.head; p; p = p.next) { + yield p.data + } + } + + // Consumes a specified amount of characters from the buffered data. + _getString(n) { + let ret = '' + let p = this.head + let c = 0 + do { + const str = p.data + if (n > str.length) { + ret += str + n -= str.length + } else { + if (n === str.length) { + ret += str + ++c + if (p.next) this.head = p.next + else this.head = this.tail = null + } else { + ret += str.slice(0, n) + this.head = p + p.data = str.slice(n) + } + break + } + ++c + } while ((p = p.next) !== null) + this.length -= c + return ret + } + + // Consumes a specified amount of bytes from the buffered data. + _getBuffer(n) { + const ret = Buffer.allocUnsafe(n) + const retLen = n + let p = this.head + let c = 0 + do { + const buf = p.data + if (n > buf.length) { + ret.set(buf, retLen - n) + n -= buf.length + } else { + if (n === buf.length) { + ret.set(buf, retLen - n) + ++c + if (p.next) this.head = p.next + else this.head = this.tail = null + } else { + ret.set(new Uint8Array(buf.buffer, buf.byteOffset, n), retLen - n) + this.head = p + p.data = buf.slice(n) + } + break + } + ++c + } while ((p = p.next) !== null) + this.length -= c + return ret + } + + // Make sure the linked list only shows the minimal necessary information. + [Symbol.for('nodejs.util.inspect.custom')](_, options) { + return inspect(this, { + ...options, + // Only inspect one level. + depth: 0, + // It should not recurse. + customInspect: false + }) + } +} + +function ReadableState(options, stream, isDuplex) { + // Duplex streams are both readable and writable, but share + // the same options object. + // However, some cases require setting options to different + // values for the readable and the writable sides of the duplex stream. + // These options can be provided separately as readableXXX and writableXXX. + if (typeof isDuplex !== 'boolean') isDuplex = stream instanceof Duplex + + // Object stream flag. Used to make read(n) ignore n and to + // make all the buffer merging and length checks go away. + this.objectMode = !!(options && options.objectMode) + if (isDuplex) this.objectMode = this.objectMode || !!(options && options.readableObjectMode) + + // The point at which it stops calling _read() to fill the buffer + // Note: 0 is a valid value, means "don't call _read preemptively ever" + this.highWaterMark = options + ? getHighWaterMark(this, options, 'readableHighWaterMark', isDuplex) + : getDefaultHighWaterMark(false) + + // A linked list is used to store data chunks instead of an array because the + // linked list can remove elements from the beginning faster than + // array.shift(). + this.buffer = new BufferList() + this.length = 0 + this.pipes = [] + this.flowing = null + this.ended = false + this.endEmitted = false + this.reading = false + + // Stream is still being constructed and cannot be + // destroyed until construction finished or failed. + // Async construction is opt in, therefore we start as + // constructed. + this.constructed = true + + // A flag to be able to tell if the event 'readable'/'data' is emitted + // immediately, or on a later tick. We set this to true at first, because + // any actions that shouldn't happen until "later" should generally also + // not happen before the first read call. + this.sync = true + + // Whenever we return null, then we set a flag to say + // that we're awaiting a 'readable' event emission. + this.needReadable = false + this.emittedReadable = false + this.readableListening = false + this.resumeScheduled = false + // this[kPaused] = null + + // True if the error was already emitted and should not be thrown again. + this.errorEmitted = false + + // Should close be emitted on destroy. Defaults to true. + this.emitClose = !options || options.emitClose !== false + + // Should .destroy() be called after 'end' (and potentially 'finish'). + this.autoDestroy = !options || options.autoDestroy !== false + + // Has it been destroyed. + this.destroyed = false + + // Indicates whether the stream has errored. When true no further + // _read calls, 'data' or 'readable' events should occur. This is needed + // since when autoDestroy is disabled we need a way to tell whether the + // stream has failed. + this.errored = null + + // Indicates whether the stream has finished destroying. + this.closed = false + + // True if close has been emitted or would have been emitted + // depending on emitClose. + this.closeEmitted = false + + // Crypto is kind of old and crusty. Historically, its default string + // encoding is 'binary' so we have to make this configurable. + // Everything else in the universe uses 'utf8', though. + this.defaultEncoding = (options && options.defaultEncoding) || 'utf8' + + // Ref the piped dest which we need a drain event on it + // type: null | Writable | Set<Writable>. + this.awaitDrainWriters = null + this.multiAwaitDrain = false + + // If true, a maybeReadMore has been scheduled. + this.readingMore = false + this.dataEmitted = false + this.decoder = null + this.encoding = null + if (options && options.encoding) { + this.decoder = new StringDecoder(options.encoding) + this.encoding = options.encoding + } +} + /** * @callback validateObject * @param {*} value @@ -42,7 +311,7 @@ const validateObject = (value, name, options = null) => { const nullable = options?.nullable ?? false; if ( (!nullable && value === null) || - (!allowArray && ArrayIsArray(value)) || + (!allowArray && $isArray(value)) || (typeof value !== "object" && (!allowFunction || typeof value !== "function")) ) { throw new ERR_INVALID_ARG_TYPE(name, "Object", value); @@ -61,8 +330,6 @@ function validateString(value, name) { if (typeof value !== "string") throw new ERR_INVALID_ARG_TYPE(name, "string", value); } -var ArrayIsArray = Array.isArray; - //------------------------------------------------------------------------------ // Node error polyfills //------------------------------------------------------------------------------ @@ -81,7 +348,7 @@ var require_primordials = __commonJS({ "use strict"; module.exports = { ArrayIsArray(self) { - return Array.isArray(self); + return $isArray(self); }, ArrayPrototypeIncludes(self, el) { return self.includes(el); @@ -165,9 +432,6 @@ var require_primordials = __commonJS({ SymbolAsyncIterator: Symbol.asyncIterator, SymbolHasInstance: Symbol.hasInstance, SymbolIterator: Symbol.iterator, - TypedArrayPrototypeSet(self, buf, len) { - return self.set(buf, len); - }, Uint8Array, }; }, @@ -186,21 +450,7 @@ var require_util = __commonJS({ : function isBlob2(b) { return false; }; - var AggregateError = class extends Error { - constructor(errors) { - if (!Array.isArray(errors)) { - throw new TypeError(`Expected input to be an Array, got ${typeof errors}`); - } - let message = ""; - for (let i = 0; i < errors.length; i++) { - message += ` ${errors[i].stack} -`; - } - super(message); - this.name = "AggregateError"; - this.errors = errors; - } - }; + module.exports = { AggregateError, once(callback) { @@ -299,8 +549,8 @@ var require_util = __commonJS({ var require_errors = __commonJS({ "node_modules/readable-stream/lib/ours/errors.js"(exports, module) { "use strict"; - var { format, inspect, AggregateError: CustomAggregateError } = require_util(); - var AggregateError = globalThis.AggregateError || CustomAggregateError; + var { format, inspect } = require_util(); + var AggregateError = globalThis.AggregateError; var kIsNodeError = Symbol("kIsNodeError"); var kTypes = ["string", "function", "number", "object", "Function", "Object", "boolean", "bigint", "symbol"]; var classRegExp = /^([A-Z][a-z0-9]*)+$/; @@ -2260,7 +2510,6 @@ var require_readable = __commonJS({ Symbol: Symbol2, } = require_primordials(); - var ReadableState = $lazy("bun:stream").ReadableState; var { Stream, prependListener } = require_legacy(); function Readable(options) { @@ -2510,15 +2759,127 @@ var require_readable = __commonJS({ var { addAbortSignal } = require_add_abort_signal(); var eos = require_end_of_stream(); - const { maybeReadMore: _maybeReadMore, resume, emitReadable: _emitReadable, onEofChunk } = $lazy("bun:stream"); function maybeReadMore(stream, state) { - process.nextTick(_maybeReadMore, stream, state); + if (!state.readingMore && state.constructed) { + state.readingMore = true + process.nextTick(maybeReadMore_, stream, state) + } + } + function maybeReadMore_(stream, state) { + // Attempt to read more data if we should. + // + // The conditions for reading more data are (one of): + // - Not enough data buffered (state.length < state.highWaterMark). The loop + // is responsible for filling the buffer with enough data if such data + // is available. If highWaterMark is 0 and we are not in the flowing mode + // we should _not_ attempt to buffer any extra data. We'll get more data + // when the stream consumer calls read() instead. + // - No data in the buffer, and the stream is in flowing mode. In this mode + // the loop below is responsible for ensuring read() is called. Failing to + // call read here would abort the flow and there's no other mechanism for + // continuing the flow if the stream consumer has just subscribed to the + // 'data' event. + // + // In addition to the above conditions to keep reading data, the following + // conditions prevent the data from being read: + // - The stream has ended (state.ended). + // - There is already a pending 'read' operation (state.reading). This is a + // case where the stream has called the implementation defined _read() + // method, but they are processing the call asynchronously and have _not_ + // called push() with new data. In this case we skip performing more + // read()s. The execution ends in this method again after the _read() ends + // up calling push() with more data. + while ( + !state.reading && + !state.ended && + (state.length < state.highWaterMark || (state.flowing && state.length === 0)) + ) { + const len = state.length + $debug('maybeReadMore read 0') + stream.read(0) + if (len === state.length) + // Didn't get any data, stop spinning. + break + } + state.readingMore = false + } + // Don't emit readable right away in sync mode, because this can trigger + // another read() call => stack overflow. This way, it might trigger + // a nextTick recursion warning, but that's not so bad. + function emitReadable(stream) { + const state = stream._readableState; + $debug('emitReadable', state.needReadable, state.emittedReadable) + state.needReadable = false + if (!state.emittedReadable) { + $debug('emitReadable', state.flowing) + state.emittedReadable = true + process.nextTick(emitReadable_, stream) + } + } + function emitReadable_(stream) { + const state = stream._readableState + $debug('emitReadable_', state.destroyed, state.length, state.ended) + if (!state.destroyed && !state.errored && (state.length || state.ended)) { + stream.emit('readable') + state.emittedReadable = false + } + + // The stream needs another readable event if: + // 1. It is not flowing, as the flow mechanism will take + // care of it. + // 2. It is not ended. + // 3. It is below the highWaterMark, so we can schedule + // another readable later. + state.needReadable = !state.flowing && !state.ended && state.length <= state.highWaterMark + flow(stream) + } + function flow(stream) { + const state = stream._readableState + $debug('flow', state.flowing) + while (state.flowing && stream.read() !== null); + } + function onEofChunk(stream, state) { + $debug('onEofChunk') + if (state.ended) return + if (state.decoder) { + const chunk = state.decoder.end() + if (chunk && chunk.length) { + state.buffer.push(chunk) + state.length += state.objectMode ? 1 : chunk.length + } + } + state.ended = true + if (state.sync) { + // If we are sync, wait until next tick to emit the data. + // Otherwise we risk emitting data in the flow() + // the readable code triggers during a read() call. + emitReadable(stream) + } else { + // Emit 'readable' now to make sure it gets picked up. + state.needReadable = false + state.emittedReadable = true + // We have to emit readable now that we are EOF. Modules + // in the ecosystem (e.g. dicer) rely on this event being sync. + emitReadable_(stream) + } } - // REVERT ME - function emitReadable(stream, state) { - $debug("NativeReadable - emitReadable", stream.__id); - _emitReadable(stream, state); + function resume(stream, state) { + if (!state.resumeScheduled) { + state.resumeScheduled = true + process.nextTick(resume_, stream, state) + } } + function resume_(stream, state) { + $debug('resume', state.reading) + if (!state.reading) { + stream.read(0) + } + state.resumeScheduled = false + stream.emit('resume') + flow(stream) + if (state.flowing && !state.reading) stream.read(0) + } + var destroyImpl = require_destroy(); var { aggregateTwoErrors, @@ -5607,4 +5968,6 @@ exports[Symbol.for("::bunternal::")] = { _ReadableFromWeb, _ReadableFromWebForUn exports.eos = require_end_of_stream(); exports.EventEmitter = EE; +var Duplex = exports.Duplex; + export default exports; |