aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/bun.js/streams.exports.js208
1 files changed, 115 insertions, 93 deletions
diff --git a/src/bun.js/streams.exports.js b/src/bun.js/streams.exports.js
index 042dda5d9..9a21e5d44 100644
--- a/src/bun.js/streams.exports.js
+++ b/src/bun.js/streams.exports.js
@@ -16,8 +16,11 @@ globalThis.__IDS_TO_TRACK = process.env.DEBUG_TRACK_EE?.length
// By default, child_process gives
const __TRACK_EE__ = !!process.env.DEBUG_TRACK_EE;
-const __DEBUG__ =
- process.env.DEBUG || process.env.DEBUG_STREAMS || __TRACK_EE__;
+const __DEBUG__ = !!(
+ process.env.DEBUG ||
+ process.env.DEBUG_STREAMS ||
+ __TRACK_EE__
+);
var debug = __DEBUG__
? globalThis.__IDS_TO_TRACK
@@ -2659,31 +2662,32 @@ var require_readable = __commonJS({
if (ev === "data") {
state.readableListening = this.listenerCount("readable") > 0;
if (state.flowing !== false) {
- debug("in flowing mode!", this.__id);
+ __DEBUG__ && debug("in flowing mode!", this.__id);
this.resume();
} else {
- debug("in readable mode!", this.__id);
+ __DEBUG__ && debug("in readable mode!", this.__id);
}
} else if (ev === "readable") {
- debug("readable listener added!", this.__id);
+ __DEBUG__ && debug("readable listener added!", this.__id);
if (!state.endEmitted && !state.readableListening) {
state.readableListening = state.needReadable = true;
state.flowing = false;
state.emittedReadable = false;
- debug(
- "on readable - state.length, reading, emittedReadable",
- state.length,
- state.reading,
- state.emittedReadable,
- this.__id,
- );
+ __DEBUG__ &&
+ debug(
+ "on readable - state.length, reading, emittedReadable",
+ state.length,
+ state.reading,
+ state.emittedReadable,
+ this.__id,
+ );
if (state.length) {
emitReadable(this, state);
} else if (!state.reading) {
runOnNextTick(nReadingNextTick, this);
}
} else if (state.endEmitted) {
- debug("end already emitted...", this.__id);
+ __DEBUG__ && debug("end already emitted...", this.__id);
}
}
return res;
@@ -2739,7 +2743,7 @@ var require_readable = __commonJS({
}
async _read() {
- debug("ReadableFromWeb _read()", this.__id);
+ __DEBUG__ && debug("ReadableFromWeb _read()", this.__id);
var stream = this.#stream,
reader = this.#reader;
if (stream) {
@@ -2900,7 +2904,7 @@ var require_readable = __commonJS({
}
// REVERT ME
function emitReadable(stream, state) {
- debug("NativeReadable - emitReadable", stream.__id);
+ __DEBUG__ && debug("NativeReadable - emitReadable", stream.__id);
_emitReadable(stream, state);
}
var destroyImpl = require_destroy();
@@ -2935,7 +2939,7 @@ var require_readable = __commonJS({
return readableAddChunk(this, chunk, encoding, true);
};
function readableAddChunk(stream, chunk, encoding, addToFront) {
- debug("readableAddChunk", chunk, stream.__id);
+ __DEBUG__ && debug("readableAddChunk", chunk, stream.__id);
const state = stream._readableState;
let err;
if (!state.objectMode) {
@@ -2998,8 +3002,8 @@ var require_readable = __commonJS({
);
}
function addChunk(stream, state, chunk, addToFront) {
- debug("adding chunk", stream.__id);
- debug("chunk", chunk.toString(), stream.__id);
+ __DEBUG__ && debug("adding chunk", stream.__id);
+ __DEBUG__ && debug("chunk", chunk.toString(), stream.__id);
if (
state.flowing &&
state.length === 0 &&
@@ -3017,7 +3021,8 @@ var require_readable = __commonJS({
state.length += state.objectMode ? 1 : chunk.length;
if (addToFront) state.buffer.unshift(chunk);
else state.buffer.push(chunk);
- debug("needReadable @ addChunk", state.needReadable, stream.__id);
+ __DEBUG__ &&
+ debug("needReadable @ addChunk", state.needReadable, stream.__id);
if (state.needReadable) emitReadable(stream, state);
}
maybeReadMore(stream, state);
@@ -3071,7 +3076,7 @@ var require_readable = __commonJS({
}
// You can override either this method, or the async _read(n) below.
Readable.prototype.read = function (n) {
- debug("read - n =", n, this.__id);
+ __DEBUG__ && debug("read - n =", n, this.__id);
if (!NumberIsInteger(n)) {
n = NumberParseInt(n, 10);
}
@@ -3095,12 +3100,13 @@ var require_readable = __commonJS({
: state.length > 0) ||
state.ended)
) {
- debug(
- "read: emitReadable or endReadable",
- state.length,
- state.ended,
- this.__id,
- );
+ __DEBUG__ &&
+ debug(
+ "read: emitReadable or endReadable",
+ state.length,
+ state.ended,
+ this.__id,
+ );
if (state.length === 0 && state.ended) endReadable(this);
else emitReadable(this, state);
return null;
@@ -3110,12 +3116,13 @@ var require_readable = __commonJS({
// If we've ended, and we're now clear, then finish it up.
if (n === 0 && state.ended) {
- debug(
- "read: calling endReadable if length 0 -- length, state.ended",
- state.length,
- state.ended,
- this.__id,
- );
+ __DEBUG__ &&
+ debug(
+ "read: calling endReadable if length 0 -- length, state.ended",
+ state.length,
+ state.ended,
+ this.__id,
+ );
if (state.length === 0) endReadable(this);
return null;
}
@@ -3144,12 +3151,12 @@ var require_readable = __commonJS({
// if we need a readable event, then we need to do some reading.
let doRead = state.needReadable;
- debug("need readable", doRead, this.__id);
+ __DEBUG__ && debug("need readable", doRead, this.__id);
// If we currently have less than the highWaterMark, then also read some.
if (state.length === 0 || state.length - n < state.highWaterMark) {
doRead = true;
- debug("length less than watermark", doRead, this.__id);
+ __DEBUG__ && debug("length less than watermark", doRead, this.__id);
}
// However, if we've ended, then there's no point, if we're already
@@ -3162,11 +3169,11 @@ var require_readable = __commonJS({
state.errored ||
!state.constructed
) {
- debug("state.constructed?", state.constructed, this.__id);
+ __DEBUG__ && debug("state.constructed?", state.constructed, this.__id);
doRead = false;
- debug("reading, ended or constructing", doRead, this.__id);
+ __DEBUG__ && debug("reading, ended or constructing", doRead, this.__id);
} else if (doRead) {
- debug("do read", this.__id);
+ __DEBUG__ && debug("do read", this.__id);
state.reading = true;
state.sync = true;
// If the length is currently zero, then we *need* a readable event.
@@ -3176,16 +3183,16 @@ var require_readable = __commonJS({
try {
var result = this._read(state.highWaterMark);
if (isPromise(result)) {
- debug("async _read", this.__id);
+ __DEBUG__ && debug("async _read", this.__id);
const peeked = Bun.peek(result);
- debug("peeked promise", peeked, this.__id);
+ __DEBUG__ && debug("peeked promise", peeked, this.__id);
if (peeked !== result) {
result = peeked;
}
}
if (isPromise(result) && result?.then && isCallable(result.then)) {
- debug("async _read result.then setup", this.__id);
+ __DEBUG__ && debug("async _read result.then setup", this.__id);
result.then(nop, function (err) {
errorOrDestroy(this, err);
});
@@ -3200,16 +3207,17 @@ var require_readable = __commonJS({
if (!state.reading) n = howMuchToRead(nOrig, state);
}
- debug("n @ fromList", n, this.__id);
+ __DEBUG__ && debug("n @ fromList", n, this.__id);
let ret;
if (n > 0) ret = fromList(n, state);
else ret = null;
- debug("ret @ read", ret, this.__id);
+ __DEBUG__ && debug("ret @ read", ret, this.__id);
if (ret === null) {
state.needReadable = state.length <= state.highWaterMark;
- debug("state.length while ret = null", state.length, this.__id);
+ __DEBUG__ &&
+ debug("state.length while ret = null", state.length, this.__id);
n = 0;
} else {
state.length -= n;
@@ -3251,7 +3259,8 @@ var require_readable = __commonJS({
}
}
state.pipes.push(dest);
- debug("pipe count=%d opts=%j", state.pipes.length, pipeOpts, src.__id);
+ __DEBUG__ &&
+ debug("pipe count=%d opts=%j", state.pipes.length, pipeOpts, src.__id);
const doEnd =
(!pipeOpts || pipeOpts.end !== false) &&
dest !== process.stdout &&
@@ -3261,7 +3270,7 @@ var require_readable = __commonJS({
else src.once("end", endFn);
dest.on("unpipe", onunpipe);
function onunpipe(readable, unpipeInfo) {
- debug("onunpipe", src.__id);
+ __DEBUG__ && debug("onunpipe", src.__id);
if (readable === src) {
if (unpipeInfo && unpipeInfo.hasUnpiped === false) {
unpipeInfo.hasUnpiped = true;
@@ -3270,13 +3279,13 @@ var require_readable = __commonJS({
}
}
function onend() {
- debug("onend", src.__id);
+ __DEBUG__ && debug("onend", src.__id);
dest.end();
}
let ondrain;
let cleanedUp = false;
function cleanup() {
- debug("cleanup", src.__id);
+ __DEBUG__ && debug("cleanup", src.__id);
dest.removeListener("close", onclose);
dest.removeListener("finish", onfinish);
if (ondrain) {
@@ -3298,15 +3307,16 @@ var require_readable = __commonJS({
function pause() {
if (!cleanedUp) {
if (state.pipes.length === 1 && state.pipes[0] === dest) {
- debug("false write response, pause", 0, src.__id);
+ __DEBUG__ && debug("false write response, pause", 0, src.__id);
state.awaitDrainWriters = dest;
state.multiAwaitDrain = false;
} else if (state.pipes.length > 1 && state.pipes.includes(dest)) {
- debug(
- "false write response, pause",
- state.awaitDrainWriters.size,
- src.__id,
- );
+ __DEBUG__ &&
+ debug(
+ "false write response, pause",
+ state.awaitDrainWriters.size,
+ src.__id,
+ );
state.awaitDrainWriters.add(dest);
}
src.pause();
@@ -3318,9 +3328,9 @@ var require_readable = __commonJS({
}
src.on("data", ondata);
function ondata(chunk) {
- debug("ondata", src.__id);
+ __DEBUG__ && debug("ondata", src.__id);
const ret = dest.write(chunk);
- debug("dest.write", ret, src.__id);
+ __DEBUG__ && debug("dest.write", ret, src.__id);
if (ret === false) {
pause();
}
@@ -3434,13 +3444,13 @@ var require_readable = __commonJS({
}
}
function nReadingNextTick(self) {
- debug("on readable nextTick, calling read(0)", self.__id);
+ __DEBUG__ && debug("on readable nextTick, calling read(0)", self.__id);
self.read(0);
}
Readable.prototype.resume = function () {
const state = this._readableState;
if (!state.flowing) {
- debug("resume", this.__id);
+ __DEBUG__ && debug("resume", this.__id);
state.flowing = !state.readableListening;
resume(this, state);
}
@@ -3448,9 +3458,10 @@ var require_readable = __commonJS({
return this;
};
Readable.prototype.pause = function () {
- debug("call pause flowing=%j", this._readableState.flowing, this.__id);
+ __DEBUG__ &&
+ debug("call pause flowing=%j", this._readableState.flowing, this.__id);
if (this._readableState.flowing !== false) {
- debug("pause", this.__id);
+ __DEBUG__ && debug("pause", this.__id);
this._readableState.flowing = false;
this.emit("pause");
}
@@ -3687,19 +3698,21 @@ var require_readable = __commonJS({
}
function endReadable(stream) {
const state = stream._readableState;
- debug("endEmitted @ endReadable", state.endEmitted, stream.__id);
+ __DEBUG__ &&
+ debug("endEmitted @ endReadable", state.endEmitted, stream.__id);
if (!state.endEmitted) {
state.ended = true;
runOnNextTick(endReadableNT, state, stream);
}
}
function endReadableNT(state, stream) {
- debug(
- "endReadableNT -- endEmitted, state.length",
- state.endEmitted,
- state.length,
- stream.__id,
- );
+ __DEBUG__ &&
+ debug(
+ "endReadableNT -- endEmitted, state.length",
+ state.endEmitted,
+ state.length,
+ stream.__id,
+ );
if (
!state.errored &&
!state.closeEmitted &&
@@ -3708,7 +3721,7 @@ var require_readable = __commonJS({
) {
state.endEmitted = true;
stream.emit("end");
- debug("end emitted @ endReadableNT", stream.__id);
+ __DEBUG__ && debug("end emitted @ endReadableNT", stream.__id);
if (stream.writable && stream.allowHalfOpen === false) {
runOnNextTick(endWritableNT, stream);
} else if (state.autoDestroy) {
@@ -4189,7 +4202,7 @@ var require_writable = __commonJS({
Writable.prototype._writev = null;
Writable.prototype.end = function (chunk, encoding, cb, native = false) {
const state = this._writableState;
- debug("end", state, this.__id);
+ __DEBUG__ && debug("end", state, this.__id);
if (typeof chunk === "function") {
cb = chunk;
chunk = null;
@@ -4295,8 +4308,11 @@ var require_writable = __commonJS({
}
}
function finishMaybe(stream, state, sync) {
- debug("finishMaybe -- state, sync", state, sync, stream.__id);
+ __DEBUG__ &&
+ debug("finishMaybe -- state, sync", state, sync, stream.__id);
+
if (!needFinish(state, stream.__id)) return;
+
prefinish(stream, state);
if (state.pendingcb === 0) {
if (sync) {
@@ -5921,21 +5937,21 @@ function createNativeStreamReadable(nativeType, Readable) {
// However, in the case of an fs.ReadStream, we can pass the number of bytes we want to read
// which may be significantly less than the actual highWaterMark
_read(maxToRead) {
- debug("NativeReadable._read", this.__id);
+ __DEBUG__ && debug("NativeReadable._read", this.__id);
if (this.#pendingRead) {
- debug("pendingRead is true", this.__id);
+ __DEBUG__ && debug("pendingRead is true", this.__id);
return;
}
var ptr = this.#ptr;
- debug("ptr @ NativeReadable._read", ptr, this.__id);
+ __DEBUG__ && debug("ptr @ NativeReadable._read", ptr, this.__id);
if (ptr === 0) {
this.push(null);
return;
}
if (!this.#constructed) {
- debug("NativeReadable not constructed yet", this.__id);
+ __DEBUG__ && debug("NativeReadable not constructed yet", this.__id);
this.#internalConstruct(ptr);
}
@@ -5962,18 +5978,20 @@ function createNativeStreamReadable(nativeType, Readable) {
#internalConstruct(ptr) {
this.#constructed = true;
const result = start(ptr, this.#highWaterMark);
- debug("NativeReadable internal `start` result", result, this.__id);
+ __DEBUG__ &&
+ debug("NativeReadable internal `start` result", result, this.__id);
if (typeof result === "number" && result > 1) {
this.#hasResized = true;
- debug("NativeReadable resized", this.__id);
+ __DEBUG__ && debug("NativeReadable resized", this.__id);
this.#highWaterMark = Math.min(this.#highWaterMark, result);
}
if (drainFn) {
const drainResult = drainFn(ptr);
- debug("NativeReadable drain result", drainResult, this.__id);
+ __DEBUG__ &&
+ debug("NativeReadable drain result", drainResult, this.__id);
if ((drainResult?.byteLength ?? 0) > 0) {
this.push(drainResult);
}
@@ -5985,7 +6003,7 @@ function createNativeStreamReadable(nativeType, Readable) {
// how many bytes they want to read (ie. when reading only part of a file)
#getRemainingChunk(maxToRead = this.#highWaterMark) {
var chunk = this.#remainingChunk;
- debug("chunk @ #getRemainingChunk", chunk, this.__id);
+ __DEBUG__ && debug("chunk @ #getRemainingChunk", chunk, this.__id);
if (chunk?.byteLength ?? 0 < MIN_BUFFER_SIZE) {
var size = maxToRead > MIN_BUFFER_SIZE ? maxToRead : MIN_BUFFER_SIZE;
this.#remainingChunk = chunk = new Buffer(size);
@@ -5994,17 +6012,20 @@ function createNativeStreamReadable(nativeType, Readable) {
}
push(result, encoding) {
- debug(
- "NativeReadable push -- result, encoding",
- result,
- encoding,
- this.__id,
- );
+ __DEBUG__ &&
+ debug(
+ "NativeReadable push -- result, encoding",
+ result,
+ encoding,
+ this.__id,
+ );
return super.push(...arguments);
}
#handleResult(result, view, isClosed) {
- debug("result, isClosed @ #handleResult", result, isClosed, this.__id);
+ __DEBUG__ &&
+ debug("result, isClosed @ #handleResult", result, isClosed, this.__id);
+
if (typeof result === "number") {
if (result >= this.#highWaterMark && !this.#hasResized && !isClosed) {
this.#highWaterMark *= 2;
@@ -6023,18 +6044,18 @@ function createNativeStreamReadable(nativeType, Readable) {
) {
this.#highWaterMark *= 2;
this.#hasResized = true;
- debug("Resized", this.__id);
+ __DEBUG__ && debug("Resized", this.__id);
}
return handleArrayBufferViewResult(this, result, view, isClosed);
} else {
- debug("Unknown result type", result, this.__id);
+ __DEBUG__ && debug("Unknown result type", result, this.__id);
throw new Error("Invalid result from pull");
}
}
#internalRead(view, ptr) {
- debug("#internalRead()", this.__id);
+ __DEBUG__ && debug("#internalRead()", this.__id);
closer[0] = false;
var result = pull(ptr, view, closer);
if (isPromise(result)) {
@@ -6042,14 +6063,15 @@ function createNativeStreamReadable(nativeType, Readable) {
return result.then(
(result) => {
this.#pendingRead = false;
- debug(
- "pending no longerrrrrrrr (result returned from pull)",
- this.__id,
- );
+ __DEBUG__ &&
+ debug(
+ "pending no longerrrrrrrr (result returned from pull)",
+ this.__id,
+ );
this.#remainingChunk = this.#handleResult(result, view, closer[0]);
},
(reason) => {
- debug("error from pull", reason, this.__id);
+ __DEBUG__ && debug("error from pull", reason, this.__id);
errorOrDestroy(this, reason);
},
);
@@ -6074,7 +6096,7 @@ function createNativeStreamReadable(nativeType, Readable) {
if (updateRef) {
updateRef(ptr, false);
}
- debug("NativeReadable destroyed", this.__id);
+ __DEBUG__ && debug("NativeReadable destroyed", this.__id);
cancel(ptr, error);
callback(error);
}