aboutsummaryrefslogtreecommitdiff
path: root/src/js/node/stream.js
diff options
context:
space:
mode:
Diffstat (limited to 'src/js/node/stream.js')
-rw-r--r--src/js/node/stream.js427
1 files changed, 395 insertions, 32 deletions
diff --git a/src/js/node/stream.js b/src/js/node/stream.js
index d7d984cb8..ec131b58f 100644
--- a/src/js/node/stream.js
+++ b/src/js/node/stream.js
@@ -2,7 +2,7 @@
// "readable-stream" npm package
// just transpiled and debug logs added.
-const EE = $lazy("events");
+const EE = require("node:events");
const StringDecoder = require("node:string_decoder").StringDecoder;
var __getOwnPropNames = Object.getOwnPropertyNames;
@@ -24,6 +24,275 @@ function validateBoolean(value, name) {
$debug("node:stream loaded");
+function highWaterMarkFrom(options, isDuplex, duplexKey) {
+ return options.highWaterMark != null ? options.highWaterMark : isDuplex ? options[duplexKey] : null
+}
+function getDefaultHighWaterMark(objectMode) {
+ return objectMode ? 16 : 16 * 1024
+}
+function getHighWaterMark(state, options, duplexKey, isDuplex) {
+ const hwm = highWaterMarkFrom(options, isDuplex, duplexKey)
+ if (hwm != null) {
+ if (!Number.isInteger(hwm) || hwm < 0) {
+ const name = isDuplex ? `options.${duplexKey}` : 'options.highWaterMark'
+ throw new ERR_INVALID_ARG_VALUE(name, hwm)
+ }
+ return Math.floor(hwm)
+ }
+
+ // Default value
+ return getDefaultHighWaterMark(state.objectMode)
+}
+
+class BufferList {
+ constructor() {
+ this.head = null
+ this.tail = null
+ this.length = 0
+ }
+ push(v) {
+ const entry = {
+ data: v,
+ next: null
+ }
+ if (this.length > 0) this.tail.next = entry
+ else this.head = entry
+ this.tail = entry
+ ++this.length
+ }
+ unshift(v) {
+ const entry = {
+ data: v,
+ next: this.head
+ }
+ if (this.length === 0) this.tail = entry
+ this.head = entry
+ ++this.length
+ }
+ shift() {
+ if (this.length === 0) return
+ const ret = this.head.data
+ if (this.length === 1) this.head = this.tail = null
+ else this.head = this.head.next
+ --this.length
+ return ret
+ }
+ clear() {
+ this.head = this.tail = null
+ this.length = 0
+ }
+ join(s) {
+ if (this.length === 0) return ''
+ let p = this.head
+ let ret = '' + p.data
+ while ((p = p.next) !== null) ret += s + p.data
+ return ret
+ }
+ concat(n) {
+ if (this.length === 0) return Buffer.alloc(0)
+ const ret = Buffer.allocUnsafe(n >>> 0)
+ let p = this.head
+ let i = 0
+ while (p) {
+ ret.set(p.data, i)
+ i += p.data.length
+ p = p.next
+ }
+ return ret
+ }
+
+ // Consumes a specified amount of bytes or characters from the buffered data.
+ consume(n, hasStrings) {
+ const data = this.head.data
+ if (n < data.length) {
+ // `slice` is the same for buffers and strings.
+ const slice = data.slice(0, n)
+ this.head.data = data.slice(n)
+ return slice
+ }
+ if (n === data.length) {
+ // First chunk is a perfect match.
+ return this.shift()
+ }
+ // Result spans more than one buffer.
+ return hasStrings ? this._getString(n) : this._getBuffer(n)
+ }
+ first() {
+ return this.head.data
+ }
+ *[Symbol.iterator]() {
+ for (let p = this.head; p; p = p.next) {
+ yield p.data
+ }
+ }
+
+ // Consumes a specified amount of characters from the buffered data.
+ _getString(n) {
+ let ret = ''
+ let p = this.head
+ let c = 0
+ do {
+ const str = p.data
+ if (n > str.length) {
+ ret += str
+ n -= str.length
+ } else {
+ if (n === str.length) {
+ ret += str
+ ++c
+ if (p.next) this.head = p.next
+ else this.head = this.tail = null
+ } else {
+ ret += str.slice(0, n)
+ this.head = p
+ p.data = str.slice(n)
+ }
+ break
+ }
+ ++c
+ } while ((p = p.next) !== null)
+ this.length -= c
+ return ret
+ }
+
+ // Consumes a specified amount of bytes from the buffered data.
+ _getBuffer(n) {
+ const ret = Buffer.allocUnsafe(n)
+ const retLen = n
+ let p = this.head
+ let c = 0
+ do {
+ const buf = p.data
+ if (n > buf.length) {
+ ret.set(buf, retLen - n)
+ n -= buf.length
+ } else {
+ if (n === buf.length) {
+ ret.set(buf, retLen - n)
+ ++c
+ if (p.next) this.head = p.next
+ else this.head = this.tail = null
+ } else {
+ ret.set(new Uint8Array(buf.buffer, buf.byteOffset, n), retLen - n)
+ this.head = p
+ p.data = buf.slice(n)
+ }
+ break
+ }
+ ++c
+ } while ((p = p.next) !== null)
+ this.length -= c
+ return ret
+ }
+
+ // Make sure the linked list only shows the minimal necessary information.
+ [Symbol.for('nodejs.util.inspect.custom')](_, options) {
+ return inspect(this, {
+ ...options,
+ // Only inspect one level.
+ depth: 0,
+ // It should not recurse.
+ customInspect: false
+ })
+ }
+}
+
+function ReadableState(options, stream, isDuplex) {
+ // Duplex streams are both readable and writable, but share
+ // the same options object.
+ // However, some cases require setting options to different
+ // values for the readable and the writable sides of the duplex stream.
+ // These options can be provided separately as readableXXX and writableXXX.
+ if (typeof isDuplex !== 'boolean') isDuplex = stream instanceof Duplex
+
+ // Object stream flag. Used to make read(n) ignore n and to
+ // make all the buffer merging and length checks go away.
+ this.objectMode = !!(options && options.objectMode)
+ if (isDuplex) this.objectMode = this.objectMode || !!(options && options.readableObjectMode)
+
+ // The point at which it stops calling _read() to fill the buffer
+ // Note: 0 is a valid value, means "don't call _read preemptively ever"
+ this.highWaterMark = options
+ ? getHighWaterMark(this, options, 'readableHighWaterMark', isDuplex)
+ : getDefaultHighWaterMark(false)
+
+ // A linked list is used to store data chunks instead of an array because the
+ // linked list can remove elements from the beginning faster than
+ // array.shift().
+ this.buffer = new BufferList()
+ this.length = 0
+ this.pipes = []
+ this.flowing = null
+ this.ended = false
+ this.endEmitted = false
+ this.reading = false
+
+ // Stream is still being constructed and cannot be
+ // destroyed until construction finished or failed.
+ // Async construction is opt in, therefore we start as
+ // constructed.
+ this.constructed = true
+
+ // A flag to be able to tell if the event 'readable'/'data' is emitted
+ // immediately, or on a later tick. We set this to true at first, because
+ // any actions that shouldn't happen until "later" should generally also
+ // not happen before the first read call.
+ this.sync = true
+
+ // Whenever we return null, then we set a flag to say
+ // that we're awaiting a 'readable' event emission.
+ this.needReadable = false
+ this.emittedReadable = false
+ this.readableListening = false
+ this.resumeScheduled = false
+ // this[kPaused] = null
+
+ // True if the error was already emitted and should not be thrown again.
+ this.errorEmitted = false
+
+ // Should close be emitted on destroy. Defaults to true.
+ this.emitClose = !options || options.emitClose !== false
+
+ // Should .destroy() be called after 'end' (and potentially 'finish').
+ this.autoDestroy = !options || options.autoDestroy !== false
+
+ // Has it been destroyed.
+ this.destroyed = false
+
+ // Indicates whether the stream has errored. When true no further
+ // _read calls, 'data' or 'readable' events should occur. This is needed
+ // since when autoDestroy is disabled we need a way to tell whether the
+ // stream has failed.
+ this.errored = null
+
+ // Indicates whether the stream has finished destroying.
+ this.closed = false
+
+ // True if close has been emitted or would have been emitted
+ // depending on emitClose.
+ this.closeEmitted = false
+
+ // Crypto is kind of old and crusty. Historically, its default string
+ // encoding is 'binary' so we have to make this configurable.
+ // Everything else in the universe uses 'utf8', though.
+ this.defaultEncoding = (options && options.defaultEncoding) || 'utf8'
+
+ // Ref the piped dest which we need a drain event on it
+ // type: null | Writable | Set<Writable>.
+ this.awaitDrainWriters = null
+ this.multiAwaitDrain = false
+
+ // If true, a maybeReadMore has been scheduled.
+ this.readingMore = false
+ this.dataEmitted = false
+ this.decoder = null
+ this.encoding = null
+ if (options && options.encoding) {
+ this.decoder = new StringDecoder(options.encoding)
+ this.encoding = options.encoding
+ }
+}
+
/**
* @callback validateObject
* @param {*} value
@@ -42,7 +311,7 @@ const validateObject = (value, name, options = null) => {
const nullable = options?.nullable ?? false;
if (
(!nullable && value === null) ||
- (!allowArray && ArrayIsArray(value)) ||
+ (!allowArray && $isArray(value)) ||
(typeof value !== "object" && (!allowFunction || typeof value !== "function"))
) {
throw new ERR_INVALID_ARG_TYPE(name, "Object", value);
@@ -61,8 +330,6 @@ function validateString(value, name) {
if (typeof value !== "string") throw new ERR_INVALID_ARG_TYPE(name, "string", value);
}
-var ArrayIsArray = Array.isArray;
-
//------------------------------------------------------------------------------
// Node error polyfills
//------------------------------------------------------------------------------
@@ -81,7 +348,7 @@ var require_primordials = __commonJS({
"use strict";
module.exports = {
ArrayIsArray(self) {
- return Array.isArray(self);
+ return $isArray(self);
},
ArrayPrototypeIncludes(self, el) {
return self.includes(el);
@@ -165,9 +432,6 @@ var require_primordials = __commonJS({
SymbolAsyncIterator: Symbol.asyncIterator,
SymbolHasInstance: Symbol.hasInstance,
SymbolIterator: Symbol.iterator,
- TypedArrayPrototypeSet(self, buf, len) {
- return self.set(buf, len);
- },
Uint8Array,
};
},
@@ -186,21 +450,7 @@ var require_util = __commonJS({
: function isBlob2(b) {
return false;
};
- var AggregateError = class extends Error {
- constructor(errors) {
- if (!Array.isArray(errors)) {
- throw new TypeError(`Expected input to be an Array, got ${typeof errors}`);
- }
- let message = "";
- for (let i = 0; i < errors.length; i++) {
- message += ` ${errors[i].stack}
-`;
- }
- super(message);
- this.name = "AggregateError";
- this.errors = errors;
- }
- };
+
module.exports = {
AggregateError,
once(callback) {
@@ -299,8 +549,8 @@ var require_util = __commonJS({
var require_errors = __commonJS({
"node_modules/readable-stream/lib/ours/errors.js"(exports, module) {
"use strict";
- var { format, inspect, AggregateError: CustomAggregateError } = require_util();
- var AggregateError = globalThis.AggregateError || CustomAggregateError;
+ var { format, inspect } = require_util();
+ var AggregateError = globalThis.AggregateError;
var kIsNodeError = Symbol("kIsNodeError");
var kTypes = ["string", "function", "number", "object", "Function", "Object", "boolean", "bigint", "symbol"];
var classRegExp = /^([A-Z][a-z0-9]*)+$/;
@@ -2260,7 +2510,6 @@ var require_readable = __commonJS({
Symbol: Symbol2,
} = require_primordials();
- var ReadableState = $lazy("bun:stream").ReadableState;
var { Stream, prependListener } = require_legacy();
function Readable(options) {
@@ -2510,15 +2759,127 @@ var require_readable = __commonJS({
var { addAbortSignal } = require_add_abort_signal();
var eos = require_end_of_stream();
- const { maybeReadMore: _maybeReadMore, resume, emitReadable: _emitReadable, onEofChunk } = $lazy("bun:stream");
function maybeReadMore(stream, state) {
- process.nextTick(_maybeReadMore, stream, state);
+ if (!state.readingMore && state.constructed) {
+ state.readingMore = true
+ process.nextTick(maybeReadMore_, stream, state)
+ }
+ }
+ function maybeReadMore_(stream, state) {
+ // Attempt to read more data if we should.
+ //
+ // The conditions for reading more data are (one of):
+ // - Not enough data buffered (state.length < state.highWaterMark). The loop
+ // is responsible for filling the buffer with enough data if such data
+ // is available. If highWaterMark is 0 and we are not in the flowing mode
+ // we should _not_ attempt to buffer any extra data. We'll get more data
+ // when the stream consumer calls read() instead.
+ // - No data in the buffer, and the stream is in flowing mode. In this mode
+ // the loop below is responsible for ensuring read() is called. Failing to
+ // call read here would abort the flow and there's no other mechanism for
+ // continuing the flow if the stream consumer has just subscribed to the
+ // 'data' event.
+ //
+ // In addition to the above conditions to keep reading data, the following
+ // conditions prevent the data from being read:
+ // - The stream has ended (state.ended).
+ // - There is already a pending 'read' operation (state.reading). This is a
+ // case where the stream has called the implementation defined _read()
+ // method, but they are processing the call asynchronously and have _not_
+ // called push() with new data. In this case we skip performing more
+ // read()s. The execution ends in this method again after the _read() ends
+ // up calling push() with more data.
+ while (
+ !state.reading &&
+ !state.ended &&
+ (state.length < state.highWaterMark || (state.flowing && state.length === 0))
+ ) {
+ const len = state.length
+ $debug('maybeReadMore read 0')
+ stream.read(0)
+ if (len === state.length)
+ // Didn't get any data, stop spinning.
+ break
+ }
+ state.readingMore = false
+ }
+ // Don't emit readable right away in sync mode, because this can trigger
+ // another read() call => stack overflow. This way, it might trigger
+ // a nextTick recursion warning, but that's not so bad.
+ function emitReadable(stream) {
+ const state = stream._readableState;
+ $debug('emitReadable', state.needReadable, state.emittedReadable)
+ state.needReadable = false
+ if (!state.emittedReadable) {
+ $debug('emitReadable', state.flowing)
+ state.emittedReadable = true
+ process.nextTick(emitReadable_, stream)
+ }
+ }
+ function emitReadable_(stream) {
+ const state = stream._readableState
+ $debug('emitReadable_', state.destroyed, state.length, state.ended)
+ if (!state.destroyed && !state.errored && (state.length || state.ended)) {
+ stream.emit('readable')
+ state.emittedReadable = false
+ }
+
+ // The stream needs another readable event if:
+ // 1. It is not flowing, as the flow mechanism will take
+ // care of it.
+ // 2. It is not ended.
+ // 3. It is below the highWaterMark, so we can schedule
+ // another readable later.
+ state.needReadable = !state.flowing && !state.ended && state.length <= state.highWaterMark
+ flow(stream)
+ }
+ function flow(stream) {
+ const state = stream._readableState
+ $debug('flow', state.flowing)
+ while (state.flowing && stream.read() !== null);
+ }
+ function onEofChunk(stream, state) {
+ $debug('onEofChunk')
+ if (state.ended) return
+ if (state.decoder) {
+ const chunk = state.decoder.end()
+ if (chunk && chunk.length) {
+ state.buffer.push(chunk)
+ state.length += state.objectMode ? 1 : chunk.length
+ }
+ }
+ state.ended = true
+ if (state.sync) {
+ // If we are sync, wait until next tick to emit the data.
+ // Otherwise we risk emitting data in the flow()
+ // the readable code triggers during a read() call.
+ emitReadable(stream)
+ } else {
+ // Emit 'readable' now to make sure it gets picked up.
+ state.needReadable = false
+ state.emittedReadable = true
+ // We have to emit readable now that we are EOF. Modules
+ // in the ecosystem (e.g. dicer) rely on this event being sync.
+ emitReadable_(stream)
+ }
}
- // REVERT ME
- function emitReadable(stream, state) {
- $debug("NativeReadable - emitReadable", stream.__id);
- _emitReadable(stream, state);
+ function resume(stream, state) {
+ if (!state.resumeScheduled) {
+ state.resumeScheduled = true
+ process.nextTick(resume_, stream, state)
+ }
}
+ function resume_(stream, state) {
+ $debug('resume', state.reading)
+ if (!state.reading) {
+ stream.read(0)
+ }
+ state.resumeScheduled = false
+ stream.emit('resume')
+ flow(stream)
+ if (state.flowing && !state.reading) stream.read(0)
+ }
+
var destroyImpl = require_destroy();
var {
aggregateTwoErrors,
@@ -5607,4 +5968,6 @@ exports[Symbol.for("::bunternal::")] = { _ReadableFromWeb, _ReadableFromWebForUn
exports.eos = require_end_of_stream();
exports.EventEmitter = EE;
+var Duplex = exports.Duplex;
+
export default exports;