aboutsummaryrefslogtreecommitdiff
path: root/src/js/node/stream.js
diff options
context:
space:
mode:
Diffstat (limited to 'src/js/node/stream.js')
-rw-r--r--src/js/node/stream.js427
1 files changed, 32 insertions, 395 deletions
diff --git a/src/js/node/stream.js b/src/js/node/stream.js
index d91ec2f6c..d7d984cb8 100644
--- a/src/js/node/stream.js
+++ b/src/js/node/stream.js
@@ -2,7 +2,7 @@
// "readable-stream" npm package
// just transpiled and debug logs added.
-const EE = require("node:events");
+const EE = $lazy("events");
const StringDecoder = require("node:string_decoder").StringDecoder;
var __getOwnPropNames = Object.getOwnPropertyNames;
@@ -24,275 +24,6 @@ function validateBoolean(value, name) {
$debug("node:stream loaded");
-function highWaterMarkFrom(options, isDuplex, duplexKey) {
- return options.highWaterMark != null ? options.highWaterMark : isDuplex ? options[duplexKey] : null;
-}
-function getDefaultHighWaterMark(objectMode) {
- return objectMode ? 16 : 16 * 1024;
-}
-function getHighWaterMark(state, options, duplexKey, isDuplex) {
- const hwm = highWaterMarkFrom(options, isDuplex, duplexKey);
- if (hwm != null) {
- if (!Number.isInteger(hwm) || hwm < 0) {
- const name = isDuplex ? `options.${duplexKey}` : "options.highWaterMark";
- throw new ERR_INVALID_ARG_VALUE(name, hwm);
- }
- return Math.floor(hwm);
- }
-
- // Default value
- return getDefaultHighWaterMark(state.objectMode);
-}
-
-class BufferList {
- constructor() {
- this.head = null;
- this.tail = null;
- this.length = 0;
- }
- push(v) {
- const entry = {
- data: v,
- next: null,
- };
- if (this.length > 0) this.tail.next = entry;
- else this.head = entry;
- this.tail = entry;
- ++this.length;
- }
- unshift(v) {
- const entry = {
- data: v,
- next: this.head,
- };
- if (this.length === 0) this.tail = entry;
- this.head = entry;
- ++this.length;
- }
- shift() {
- if (this.length === 0) return;
- const ret = this.head.data;
- if (this.length === 1) this.head = this.tail = null;
- else this.head = this.head.next;
- --this.length;
- return ret;
- }
- clear() {
- this.head = this.tail = null;
- this.length = 0;
- }
- join(s) {
- if (this.length === 0) return "";
- let p = this.head;
- let ret = "" + p.data;
- while ((p = p.next) !== null) ret += s + p.data;
- return ret;
- }
- concat(n) {
- if (this.length === 0) return Buffer.alloc(0);
- const ret = Buffer.allocUnsafe(n >>> 0);
- let p = this.head;
- let i = 0;
- while (p) {
- ret.set(p.data, i);
- i += p.data.length;
- p = p.next;
- }
- return ret;
- }
-
- // Consumes a specified amount of bytes or characters from the buffered data.
- consume(n, hasStrings) {
- const data = this.head.data;
- if (n < data.length) {
- // `slice` is the same for buffers and strings.
- const slice = data.slice(0, n);
- this.head.data = data.slice(n);
- return slice;
- }
- if (n === data.length) {
- // First chunk is a perfect match.
- return this.shift();
- }
- // Result spans more than one buffer.
- return hasStrings ? this._getString(n) : this._getBuffer(n);
- }
- first() {
- return this.head.data;
- }
- *[Symbol.iterator]() {
- for (let p = this.head; p; p = p.next) {
- yield p.data;
- }
- }
-
- // Consumes a specified amount of characters from the buffered data.
- _getString(n) {
- let ret = "";
- let p = this.head;
- let c = 0;
- do {
- const str = p.data;
- if (n > str.length) {
- ret += str;
- n -= str.length;
- } else {
- if (n === str.length) {
- ret += str;
- ++c;
- if (p.next) this.head = p.next;
- else this.head = this.tail = null;
- } else {
- ret += str.slice(0, n);
- this.head = p;
- p.data = str.slice(n);
- }
- break;
- }
- ++c;
- } while ((p = p.next) !== null);
- this.length -= c;
- return ret;
- }
-
- // Consumes a specified amount of bytes from the buffered data.
- _getBuffer(n) {
- const ret = Buffer.allocUnsafe(n);
- const retLen = n;
- let p = this.head;
- let c = 0;
- do {
- const buf = p.data;
- if (n > buf.length) {
- ret.set(buf, retLen - n);
- n -= buf.length;
- } else {
- if (n === buf.length) {
- ret.set(buf, retLen - n);
- ++c;
- if (p.next) this.head = p.next;
- else this.head = this.tail = null;
- } else {
- ret.set(new Uint8Array(buf.buffer, buf.byteOffset, n), retLen - n);
- this.head = p;
- p.data = buf.slice(n);
- }
- break;
- }
- ++c;
- } while ((p = p.next) !== null);
- this.length -= c;
- return ret;
- }
-
- // Make sure the linked list only shows the minimal necessary information.
- [Symbol.for("nodejs.util.inspect.custom")](_, options) {
- return inspect(this, {
- ...options,
- // Only inspect one level.
- depth: 0,
- // It should not recurse.
- customInspect: false,
- });
- }
-}
-
-function ReadableState(options, stream, isDuplex) {
- // Duplex streams are both readable and writable, but share
- // the same options object.
- // However, some cases require setting options to different
- // values for the readable and the writable sides of the duplex stream.
- // These options can be provided separately as readableXXX and writableXXX.
- if (typeof isDuplex !== "boolean") isDuplex = stream instanceof Duplex;
-
- // Object stream flag. Used to make read(n) ignore n and to
- // make all the buffer merging and length checks go away.
- this.objectMode = !!(options && options.objectMode);
- if (isDuplex) this.objectMode = this.objectMode || !!(options && options.readableObjectMode);
-
- // The point at which it stops calling _read() to fill the buffer
- // Note: 0 is a valid value, means "don't call _read preemptively ever"
- this.highWaterMark = options
- ? getHighWaterMark(this, options, "readableHighWaterMark", isDuplex)
- : getDefaultHighWaterMark(false);
-
- // A linked list is used to store data chunks instead of an array because the
- // linked list can remove elements from the beginning faster than
- // array.shift().
- this.buffer = new BufferList();
- this.length = 0;
- this.pipes = [];
- this.flowing = null;
- this.ended = false;
- this.endEmitted = false;
- this.reading = false;
-
- // Stream is still being constructed and cannot be
- // destroyed until construction finished or failed.
- // Async construction is opt in, therefore we start as
- // constructed.
- this.constructed = true;
-
- // A flag to be able to tell if the event 'readable'/'data' is emitted
- // immediately, or on a later tick. We set this to true at first, because
- // any actions that shouldn't happen until "later" should generally also
- // not happen before the first read call.
- this.sync = true;
-
- // Whenever we return null, then we set a flag to say
- // that we're awaiting a 'readable' event emission.
- this.needReadable = false;
- this.emittedReadable = false;
- this.readableListening = false;
- this.resumeScheduled = false;
- // this[kPaused] = null
-
- // True if the error was already emitted and should not be thrown again.
- this.errorEmitted = false;
-
- // Should close be emitted on destroy. Defaults to true.
- this.emitClose = !options || options.emitClose !== false;
-
- // Should .destroy() be called after 'end' (and potentially 'finish').
- this.autoDestroy = !options || options.autoDestroy !== false;
-
- // Has it been destroyed.
- this.destroyed = false;
-
- // Indicates whether the stream has errored. When true no further
- // _read calls, 'data' or 'readable' events should occur. This is needed
- // since when autoDestroy is disabled we need a way to tell whether the
- // stream has failed.
- this.errored = null;
-
- // Indicates whether the stream has finished destroying.
- this.closed = false;
-
- // True if close has been emitted or would have been emitted
- // depending on emitClose.
- this.closeEmitted = false;
-
- // Crypto is kind of old and crusty. Historically, its default string
- // encoding is 'binary' so we have to make this configurable.
- // Everything else in the universe uses 'utf8', though.
- this.defaultEncoding = (options && options.defaultEncoding) || "utf8";
-
- // Ref the piped dest which we need a drain event on it
- // type: null | Writable | Set<Writable>.
- this.awaitDrainWriters = null;
- this.multiAwaitDrain = false;
-
- // If true, a maybeReadMore has been scheduled.
- this.readingMore = false;
- this.dataEmitted = false;
- this.decoder = null;
- this.encoding = null;
- if (options && options.encoding) {
- this.decoder = new StringDecoder(options.encoding);
- this.encoding = options.encoding;
- }
-}
-
/**
* @callback validateObject
* @param {*} value
@@ -311,7 +42,7 @@ const validateObject = (value, name, options = null) => {
const nullable = options?.nullable ?? false;
if (
(!nullable && value === null) ||
- (!allowArray && $isArray(value)) ||
+ (!allowArray && ArrayIsArray(value)) ||
(typeof value !== "object" && (!allowFunction || typeof value !== "function"))
) {
throw new ERR_INVALID_ARG_TYPE(name, "Object", value);
@@ -330,6 +61,8 @@ function validateString(value, name) {
if (typeof value !== "string") throw new ERR_INVALID_ARG_TYPE(name, "string", value);
}
+var ArrayIsArray = Array.isArray;
+
//------------------------------------------------------------------------------
// Node error polyfills
//------------------------------------------------------------------------------
@@ -348,7 +81,7 @@ var require_primordials = __commonJS({
"use strict";
module.exports = {
ArrayIsArray(self) {
- return $isArray(self);
+ return Array.isArray(self);
},
ArrayPrototypeIncludes(self, el) {
return self.includes(el);
@@ -432,6 +165,9 @@ var require_primordials = __commonJS({
SymbolAsyncIterator: Symbol.asyncIterator,
SymbolHasInstance: Symbol.hasInstance,
SymbolIterator: Symbol.iterator,
+ TypedArrayPrototypeSet(self, buf, len) {
+ return self.set(buf, len);
+ },
Uint8Array,
};
},
@@ -450,7 +186,21 @@ var require_util = __commonJS({
: function isBlob2(b) {
return false;
};
-
+ var AggregateError = class extends Error {
+ constructor(errors) {
+ if (!Array.isArray(errors)) {
+ throw new TypeError(`Expected input to be an Array, got ${typeof errors}`);
+ }
+ let message = "";
+ for (let i = 0; i < errors.length; i++) {
+ message += ` ${errors[i].stack}
+`;
+ }
+ super(message);
+ this.name = "AggregateError";
+ this.errors = errors;
+ }
+ };
module.exports = {
AggregateError,
once(callback) {
@@ -549,8 +299,8 @@ var require_util = __commonJS({
var require_errors = __commonJS({
"node_modules/readable-stream/lib/ours/errors.js"(exports, module) {
"use strict";
- var { format, inspect } = require_util();
- var AggregateError = globalThis.AggregateError;
+ var { format, inspect, AggregateError: CustomAggregateError } = require_util();
+ var AggregateError = globalThis.AggregateError || CustomAggregateError;
var kIsNodeError = Symbol("kIsNodeError");
var kTypes = ["string", "function", "number", "object", "Function", "Object", "boolean", "bigint", "symbol"];
var classRegExp = /^([A-Z][a-z0-9]*)+$/;
@@ -2510,6 +2260,7 @@ var require_readable = __commonJS({
Symbol: Symbol2,
} = require_primordials();
+ var ReadableState = $lazy("bun:stream").ReadableState;
var { Stream, prependListener } = require_legacy();
function Readable(options) {
@@ -2759,127 +2510,15 @@ var require_readable = __commonJS({
var { addAbortSignal } = require_add_abort_signal();
var eos = require_end_of_stream();
+ const { maybeReadMore: _maybeReadMore, resume, emitReadable: _emitReadable, onEofChunk } = $lazy("bun:stream");
function maybeReadMore(stream, state) {
- if (!state.readingMore && state.constructed) {
- state.readingMore = true;
- process.nextTick(maybeReadMore_, stream, state);
- }
+ process.nextTick(_maybeReadMore, stream, state);
}
- function maybeReadMore_(stream, state) {
- // Attempt to read more data if we should.
- //
- // The conditions for reading more data are (one of):
- // - Not enough data buffered (state.length < state.highWaterMark). The loop
- // is responsible for filling the buffer with enough data if such data
- // is available. If highWaterMark is 0 and we are not in the flowing mode
- // we should _not_ attempt to buffer any extra data. We'll get more data
- // when the stream consumer calls read() instead.
- // - No data in the buffer, and the stream is in flowing mode. In this mode
- // the loop below is responsible for ensuring read() is called. Failing to
- // call read here would abort the flow and there's no other mechanism for
- // continuing the flow if the stream consumer has just subscribed to the
- // 'data' event.
- //
- // In addition to the above conditions to keep reading data, the following
- // conditions prevent the data from being read:
- // - The stream has ended (state.ended).
- // - There is already a pending 'read' operation (state.reading). This is a
- // case where the stream has called the implementation defined _read()
- // method, but they are processing the call asynchronously and have _not_
- // called push() with new data. In this case we skip performing more
- // read()s. The execution ends in this method again after the _read() ends
- // up calling push() with more data.
- while (
- !state.reading &&
- !state.ended &&
- (state.length < state.highWaterMark || (state.flowing && state.length === 0))
- ) {
- const len = state.length;
- $debug("maybeReadMore read 0");
- stream.read(0);
- if (len === state.length)
- // Didn't get any data, stop spinning.
- break;
- }
- state.readingMore = false;
- }
- // Don't emit readable right away in sync mode, because this can trigger
- // another read() call => stack overflow. This way, it might trigger
- // a nextTick recursion warning, but that's not so bad.
- function emitReadable(stream) {
- const state = stream._readableState;
- $debug("emitReadable", state.needReadable, state.emittedReadable);
- state.needReadable = false;
- if (!state.emittedReadable) {
- $debug("emitReadable", state.flowing);
- state.emittedReadable = true;
- process.nextTick(emitReadable_, stream);
- }
- }
- function emitReadable_(stream) {
- const state = stream._readableState;
- $debug("emitReadable_", state.destroyed, state.length, state.ended);
- if (!state.destroyed && !state.errored && (state.length || state.ended)) {
- stream.emit("readable");
- state.emittedReadable = false;
- }
-
- // The stream needs another readable event if:
- // 1. It is not flowing, as the flow mechanism will take
- // care of it.
- // 2. It is not ended.
- // 3. It is below the highWaterMark, so we can schedule
- // another readable later.
- state.needReadable = !state.flowing && !state.ended && state.length <= state.highWaterMark;
- flow(stream);
- }
- function flow(stream) {
- const state = stream._readableState;
- $debug("flow", state.flowing);
- while (state.flowing && stream.read() !== null);
- }
- function onEofChunk(stream, state) {
- $debug("onEofChunk");
- if (state.ended) return;
- if (state.decoder) {
- const chunk = state.decoder.end();
- if (chunk && chunk.length) {
- state.buffer.push(chunk);
- state.length += state.objectMode ? 1 : chunk.length;
- }
- }
- state.ended = true;
- if (state.sync) {
- // If we are sync, wait until next tick to emit the data.
- // Otherwise we risk emitting data in the flow()
- // the readable code triggers during a read() call.
- emitReadable(stream);
- } else {
- // Emit 'readable' now to make sure it gets picked up.
- state.needReadable = false;
- state.emittedReadable = true;
- // We have to emit readable now that we are EOF. Modules
- // in the ecosystem (e.g. dicer) rely on this event being sync.
- emitReadable_(stream);
- }
- }
- function resume(stream, state) {
- if (!state.resumeScheduled) {
- state.resumeScheduled = true;
- process.nextTick(resume_, stream, state);
- }
+ // REVERT ME
+ function emitReadable(stream, state) {
+ $debug("NativeReadable - emitReadable", stream.__id);
+ _emitReadable(stream, state);
}
- function resume_(stream, state) {
- $debug("resume", state.reading);
- if (!state.reading) {
- stream.read(0);
- }
- state.resumeScheduled = false;
- stream.emit("resume");
- flow(stream);
- if (state.flowing && !state.reading) stream.read(0);
- }
-
var destroyImpl = require_destroy();
var {
aggregateTwoErrors,
@@ -5968,6 +5607,4 @@ exports[Symbol.for("::bunternal::")] = { _ReadableFromWeb, _ReadableFromWebForUn
exports.eos = require_end_of_stream();
exports.EventEmitter = EE;
-var Duplex = exports.Duplex;
-
export default exports;