aboutsummaryrefslogtreecommitdiff
path: root/src/js/node/stream.js
diff options
context:
space:
mode:
Diffstat (limited to 'src/js/node/stream.js')
-rw-r--r--src/js/node/stream.js324
1 files changed, 162 insertions, 162 deletions
diff --git a/src/js/node/stream.js b/src/js/node/stream.js
index ec131b58f..d91ec2f6c 100644
--- a/src/js/node/stream.js
+++ b/src/js/node/stream.js
@@ -25,175 +25,175 @@ function validateBoolean(value, name) {
$debug("node:stream loaded");
function highWaterMarkFrom(options, isDuplex, duplexKey) {
- return options.highWaterMark != null ? options.highWaterMark : isDuplex ? options[duplexKey] : null
+ return options.highWaterMark != null ? options.highWaterMark : isDuplex ? options[duplexKey] : null;
}
function getDefaultHighWaterMark(objectMode) {
- return objectMode ? 16 : 16 * 1024
+ return objectMode ? 16 : 16 * 1024;
}
function getHighWaterMark(state, options, duplexKey, isDuplex) {
- const hwm = highWaterMarkFrom(options, isDuplex, duplexKey)
+ const hwm = highWaterMarkFrom(options, isDuplex, duplexKey);
if (hwm != null) {
if (!Number.isInteger(hwm) || hwm < 0) {
- const name = isDuplex ? `options.${duplexKey}` : 'options.highWaterMark'
- throw new ERR_INVALID_ARG_VALUE(name, hwm)
+ const name = isDuplex ? `options.${duplexKey}` : "options.highWaterMark";
+ throw new ERR_INVALID_ARG_VALUE(name, hwm);
}
- return Math.floor(hwm)
+ return Math.floor(hwm);
}
// Default value
- return getDefaultHighWaterMark(state.objectMode)
+ return getDefaultHighWaterMark(state.objectMode);
}
class BufferList {
constructor() {
- this.head = null
- this.tail = null
- this.length = 0
+ this.head = null;
+ this.tail = null;
+ this.length = 0;
}
push(v) {
const entry = {
data: v,
- next: null
- }
- if (this.length > 0) this.tail.next = entry
- else this.head = entry
- this.tail = entry
- ++this.length
+ next: null,
+ };
+ if (this.length > 0) this.tail.next = entry;
+ else this.head = entry;
+ this.tail = entry;
+ ++this.length;
}
unshift(v) {
const entry = {
data: v,
- next: this.head
- }
- if (this.length === 0) this.tail = entry
- this.head = entry
- ++this.length
+ next: this.head,
+ };
+ if (this.length === 0) this.tail = entry;
+ this.head = entry;
+ ++this.length;
}
shift() {
- if (this.length === 0) return
- const ret = this.head.data
- if (this.length === 1) this.head = this.tail = null
- else this.head = this.head.next
- --this.length
- return ret
+ if (this.length === 0) return;
+ const ret = this.head.data;
+ if (this.length === 1) this.head = this.tail = null;
+ else this.head = this.head.next;
+ --this.length;
+ return ret;
}
clear() {
- this.head = this.tail = null
- this.length = 0
+ this.head = this.tail = null;
+ this.length = 0;
}
join(s) {
- if (this.length === 0) return ''
- let p = this.head
- let ret = '' + p.data
- while ((p = p.next) !== null) ret += s + p.data
- return ret
+ if (this.length === 0) return "";
+ let p = this.head;
+ let ret = "" + p.data;
+ while ((p = p.next) !== null) ret += s + p.data;
+ return ret;
}
concat(n) {
- if (this.length === 0) return Buffer.alloc(0)
- const ret = Buffer.allocUnsafe(n >>> 0)
- let p = this.head
- let i = 0
+ if (this.length === 0) return Buffer.alloc(0);
+ const ret = Buffer.allocUnsafe(n >>> 0);
+ let p = this.head;
+ let i = 0;
while (p) {
- ret.set(p.data, i)
- i += p.data.length
- p = p.next
+ ret.set(p.data, i);
+ i += p.data.length;
+ p = p.next;
}
- return ret
+ return ret;
}
// Consumes a specified amount of bytes or characters from the buffered data.
consume(n, hasStrings) {
- const data = this.head.data
+ const data = this.head.data;
if (n < data.length) {
// `slice` is the same for buffers and strings.
- const slice = data.slice(0, n)
- this.head.data = data.slice(n)
- return slice
+ const slice = data.slice(0, n);
+ this.head.data = data.slice(n);
+ return slice;
}
if (n === data.length) {
// First chunk is a perfect match.
- return this.shift()
+ return this.shift();
}
// Result spans more than one buffer.
- return hasStrings ? this._getString(n) : this._getBuffer(n)
+ return hasStrings ? this._getString(n) : this._getBuffer(n);
}
first() {
- return this.head.data
+ return this.head.data;
}
*[Symbol.iterator]() {
for (let p = this.head; p; p = p.next) {
- yield p.data
+ yield p.data;
}
}
// Consumes a specified amount of characters from the buffered data.
_getString(n) {
- let ret = ''
- let p = this.head
- let c = 0
+ let ret = "";
+ let p = this.head;
+ let c = 0;
do {
- const str = p.data
+ const str = p.data;
if (n > str.length) {
- ret += str
- n -= str.length
+ ret += str;
+ n -= str.length;
} else {
if (n === str.length) {
- ret += str
- ++c
- if (p.next) this.head = p.next
- else this.head = this.tail = null
+ ret += str;
+ ++c;
+ if (p.next) this.head = p.next;
+ else this.head = this.tail = null;
} else {
- ret += str.slice(0, n)
- this.head = p
- p.data = str.slice(n)
+ ret += str.slice(0, n);
+ this.head = p;
+ p.data = str.slice(n);
}
- break
+ break;
}
- ++c
- } while ((p = p.next) !== null)
- this.length -= c
- return ret
+ ++c;
+ } while ((p = p.next) !== null);
+ this.length -= c;
+ return ret;
}
// Consumes a specified amount of bytes from the buffered data.
_getBuffer(n) {
- const ret = Buffer.allocUnsafe(n)
- const retLen = n
- let p = this.head
- let c = 0
+ const ret = Buffer.allocUnsafe(n);
+ const retLen = n;
+ let p = this.head;
+ let c = 0;
do {
- const buf = p.data
+ const buf = p.data;
if (n > buf.length) {
- ret.set(buf, retLen - n)
- n -= buf.length
+ ret.set(buf, retLen - n);
+ n -= buf.length;
} else {
if (n === buf.length) {
- ret.set(buf, retLen - n)
- ++c
- if (p.next) this.head = p.next
- else this.head = this.tail = null
+ ret.set(buf, retLen - n);
+ ++c;
+ if (p.next) this.head = p.next;
+ else this.head = this.tail = null;
} else {
- ret.set(new Uint8Array(buf.buffer, buf.byteOffset, n), retLen - n)
- this.head = p
- p.data = buf.slice(n)
+ ret.set(new Uint8Array(buf.buffer, buf.byteOffset, n), retLen - n);
+ this.head = p;
+ p.data = buf.slice(n);
}
- break
+ break;
}
- ++c
- } while ((p = p.next) !== null)
- this.length -= c
- return ret
+ ++c;
+ } while ((p = p.next) !== null);
+ this.length -= c;
+ return ret;
}
// Make sure the linked list only shows the minimal necessary information.
- [Symbol.for('nodejs.util.inspect.custom')](_, options) {
+ [Symbol.for("nodejs.util.inspect.custom")](_, options) {
return inspect(this, {
...options,
// Only inspect one level.
depth: 0,
// It should not recurse.
- customInspect: false
- })
+ customInspect: false,
+ });
}
}
@@ -203,93 +203,93 @@ function ReadableState(options, stream, isDuplex) {
// However, some cases require setting options to different
// values for the readable and the writable sides of the duplex stream.
// These options can be provided separately as readableXXX and writableXXX.
- if (typeof isDuplex !== 'boolean') isDuplex = stream instanceof Duplex
+ if (typeof isDuplex !== "boolean") isDuplex = stream instanceof Duplex;
// Object stream flag. Used to make read(n) ignore n and to
// make all the buffer merging and length checks go away.
- this.objectMode = !!(options && options.objectMode)
- if (isDuplex) this.objectMode = this.objectMode || !!(options && options.readableObjectMode)
+ this.objectMode = !!(options && options.objectMode);
+ if (isDuplex) this.objectMode = this.objectMode || !!(options && options.readableObjectMode);
// The point at which it stops calling _read() to fill the buffer
// Note: 0 is a valid value, means "don't call _read preemptively ever"
this.highWaterMark = options
- ? getHighWaterMark(this, options, 'readableHighWaterMark', isDuplex)
- : getDefaultHighWaterMark(false)
+ ? getHighWaterMark(this, options, "readableHighWaterMark", isDuplex)
+ : getDefaultHighWaterMark(false);
// A linked list is used to store data chunks instead of an array because the
// linked list can remove elements from the beginning faster than
// array.shift().
- this.buffer = new BufferList()
- this.length = 0
- this.pipes = []
- this.flowing = null
- this.ended = false
- this.endEmitted = false
- this.reading = false
+ this.buffer = new BufferList();
+ this.length = 0;
+ this.pipes = [];
+ this.flowing = null;
+ this.ended = false;
+ this.endEmitted = false;
+ this.reading = false;
// Stream is still being constructed and cannot be
// destroyed until construction finished or failed.
// Async construction is opt in, therefore we start as
// constructed.
- this.constructed = true
+ this.constructed = true;
// A flag to be able to tell if the event 'readable'/'data' is emitted
// immediately, or on a later tick. We set this to true at first, because
// any actions that shouldn't happen until "later" should generally also
// not happen before the first read call.
- this.sync = true
+ this.sync = true;
// Whenever we return null, then we set a flag to say
// that we're awaiting a 'readable' event emission.
- this.needReadable = false
- this.emittedReadable = false
- this.readableListening = false
- this.resumeScheduled = false
+ this.needReadable = false;
+ this.emittedReadable = false;
+ this.readableListening = false;
+ this.resumeScheduled = false;
// this[kPaused] = null
// True if the error was already emitted and should not be thrown again.
- this.errorEmitted = false
+ this.errorEmitted = false;
// Should close be emitted on destroy. Defaults to true.
- this.emitClose = !options || options.emitClose !== false
+ this.emitClose = !options || options.emitClose !== false;
// Should .destroy() be called after 'end' (and potentially 'finish').
- this.autoDestroy = !options || options.autoDestroy !== false
+ this.autoDestroy = !options || options.autoDestroy !== false;
// Has it been destroyed.
- this.destroyed = false
+ this.destroyed = false;
// Indicates whether the stream has errored. When true no further
// _read calls, 'data' or 'readable' events should occur. This is needed
// since when autoDestroy is disabled we need a way to tell whether the
// stream has failed.
- this.errored = null
+ this.errored = null;
// Indicates whether the stream has finished destroying.
- this.closed = false
+ this.closed = false;
// True if close has been emitted or would have been emitted
// depending on emitClose.
- this.closeEmitted = false
+ this.closeEmitted = false;
// Crypto is kind of old and crusty. Historically, its default string
// encoding is 'binary' so we have to make this configurable.
// Everything else in the universe uses 'utf8', though.
- this.defaultEncoding = (options && options.defaultEncoding) || 'utf8'
+ this.defaultEncoding = (options && options.defaultEncoding) || "utf8";
// Ref the piped dest which we need a drain event on it
// type: null | Writable | Set<Writable>.
- this.awaitDrainWriters = null
- this.multiAwaitDrain = false
+ this.awaitDrainWriters = null;
+ this.multiAwaitDrain = false;
// If true, a maybeReadMore has been scheduled.
- this.readingMore = false
- this.dataEmitted = false
- this.decoder = null
- this.encoding = null
+ this.readingMore = false;
+ this.dataEmitted = false;
+ this.decoder = null;
+ this.encoding = null;
if (options && options.encoding) {
- this.decoder = new StringDecoder(options.encoding)
- this.encoding = options.encoding
+ this.decoder = new StringDecoder(options.encoding);
+ this.encoding = options.encoding;
}
}
@@ -2761,8 +2761,8 @@ var require_readable = __commonJS({
var eos = require_end_of_stream();
function maybeReadMore(stream, state) {
if (!state.readingMore && state.constructed) {
- state.readingMore = true
- process.nextTick(maybeReadMore_, stream, state)
+ state.readingMore = true;
+ process.nextTick(maybeReadMore_, stream, state);
}
}
function maybeReadMore_(stream, state) {
@@ -2794,34 +2794,34 @@ var require_readable = __commonJS({
!state.ended &&
(state.length < state.highWaterMark || (state.flowing && state.length === 0))
) {
- const len = state.length
- $debug('maybeReadMore read 0')
- stream.read(0)
+ const len = state.length;
+ $debug("maybeReadMore read 0");
+ stream.read(0);
if (len === state.length)
// Didn't get any data, stop spinning.
- break
+ break;
}
- state.readingMore = false
+ state.readingMore = false;
}
// Don't emit readable right away in sync mode, because this can trigger
// another read() call => stack overflow. This way, it might trigger
// a nextTick recursion warning, but that's not so bad.
function emitReadable(stream) {
const state = stream._readableState;
- $debug('emitReadable', state.needReadable, state.emittedReadable)
- state.needReadable = false
+ $debug("emitReadable", state.needReadable, state.emittedReadable);
+ state.needReadable = false;
if (!state.emittedReadable) {
- $debug('emitReadable', state.flowing)
- state.emittedReadable = true
- process.nextTick(emitReadable_, stream)
+ $debug("emitReadable", state.flowing);
+ state.emittedReadable = true;
+ process.nextTick(emitReadable_, stream);
}
}
function emitReadable_(stream) {
- const state = stream._readableState
- $debug('emitReadable_', state.destroyed, state.length, state.ended)
+ const state = stream._readableState;
+ $debug("emitReadable_", state.destroyed, state.length, state.ended);
if (!state.destroyed && !state.errored && (state.length || state.ended)) {
- stream.emit('readable')
- state.emittedReadable = false
+ stream.emit("readable");
+ state.emittedReadable = false;
}
// The stream needs another readable event if:
@@ -2830,54 +2830,54 @@ var require_readable = __commonJS({
// 2. It is not ended.
// 3. It is below the highWaterMark, so we can schedule
// another readable later.
- state.needReadable = !state.flowing && !state.ended && state.length <= state.highWaterMark
- flow(stream)
+ state.needReadable = !state.flowing && !state.ended && state.length <= state.highWaterMark;
+ flow(stream);
}
function flow(stream) {
- const state = stream._readableState
- $debug('flow', state.flowing)
+ const state = stream._readableState;
+ $debug("flow", state.flowing);
while (state.flowing && stream.read() !== null);
}
function onEofChunk(stream, state) {
- $debug('onEofChunk')
- if (state.ended) return
+ $debug("onEofChunk");
+ if (state.ended) return;
if (state.decoder) {
- const chunk = state.decoder.end()
+ const chunk = state.decoder.end();
if (chunk && chunk.length) {
- state.buffer.push(chunk)
- state.length += state.objectMode ? 1 : chunk.length
+ state.buffer.push(chunk);
+ state.length += state.objectMode ? 1 : chunk.length;
}
}
- state.ended = true
+ state.ended = true;
if (state.sync) {
// If we are sync, wait until next tick to emit the data.
// Otherwise we risk emitting data in the flow()
// the readable code triggers during a read() call.
- emitReadable(stream)
+ emitReadable(stream);
} else {
// Emit 'readable' now to make sure it gets picked up.
- state.needReadable = false
- state.emittedReadable = true
+ state.needReadable = false;
+ state.emittedReadable = true;
// We have to emit readable now that we are EOF. Modules
// in the ecosystem (e.g. dicer) rely on this event being sync.
- emitReadable_(stream)
+ emitReadable_(stream);
}
}
function resume(stream, state) {
if (!state.resumeScheduled) {
- state.resumeScheduled = true
- process.nextTick(resume_, stream, state)
+ state.resumeScheduled = true;
+ process.nextTick(resume_, stream, state);
}
}
function resume_(stream, state) {
- $debug('resume', state.reading)
+ $debug("resume", state.reading);
if (!state.reading) {
- stream.read(0)
+ stream.read(0);
}
- state.resumeScheduled = false
- stream.emit('resume')
- flow(stream)
- if (state.flowing && !state.reading) stream.read(0)
+ state.resumeScheduled = false;
+ stream.emit("resume");
+ flow(stream);
+ if (state.flowing && !state.reading) stream.read(0);
}
var destroyImpl = require_destroy();