diff options
Diffstat (limited to 'src/bun.js')
-rw-r--r-- | src/bun.js/streams.exports.js | 208 |
1 files changed, 115 insertions, 93 deletions
diff --git a/src/bun.js/streams.exports.js b/src/bun.js/streams.exports.js index 042dda5d9..9a21e5d44 100644 --- a/src/bun.js/streams.exports.js +++ b/src/bun.js/streams.exports.js @@ -16,8 +16,11 @@ globalThis.__IDS_TO_TRACK = process.env.DEBUG_TRACK_EE?.length // By default, child_process gives const __TRACK_EE__ = !!process.env.DEBUG_TRACK_EE; -const __DEBUG__ = - process.env.DEBUG || process.env.DEBUG_STREAMS || __TRACK_EE__; +const __DEBUG__ = !!( + process.env.DEBUG || + process.env.DEBUG_STREAMS || + __TRACK_EE__ +); var debug = __DEBUG__ ? globalThis.__IDS_TO_TRACK @@ -2659,31 +2662,32 @@ var require_readable = __commonJS({ if (ev === "data") { state.readableListening = this.listenerCount("readable") > 0; if (state.flowing !== false) { - debug("in flowing mode!", this.__id); + __DEBUG__ && debug("in flowing mode!", this.__id); this.resume(); } else { - debug("in readable mode!", this.__id); + __DEBUG__ && debug("in readable mode!", this.__id); } } else if (ev === "readable") { - debug("readable listener added!", this.__id); + __DEBUG__ && debug("readable listener added!", this.__id); if (!state.endEmitted && !state.readableListening) { state.readableListening = state.needReadable = true; state.flowing = false; state.emittedReadable = false; - debug( - "on readable - state.length, reading, emittedReadable", - state.length, - state.reading, - state.emittedReadable, - this.__id, - ); + __DEBUG__ && + debug( + "on readable - state.length, reading, emittedReadable", + state.length, + state.reading, + state.emittedReadable, + this.__id, + ); if (state.length) { emitReadable(this, state); } else if (!state.reading) { runOnNextTick(nReadingNextTick, this); } } else if (state.endEmitted) { - debug("end already emitted...", this.__id); + __DEBUG__ && debug("end already emitted...", this.__id); } } return res; @@ -2739,7 +2743,7 @@ var require_readable = __commonJS({ } async _read() { - debug("ReadableFromWeb _read()", this.__id); + __DEBUG__ && debug("ReadableFromWeb _read()", this.__id); var stream = this.#stream, reader = this.#reader; if (stream) { @@ -2900,7 +2904,7 @@ var require_readable = __commonJS({ } // REVERT ME function emitReadable(stream, state) { - debug("NativeReadable - emitReadable", stream.__id); + __DEBUG__ && debug("NativeReadable - emitReadable", stream.__id); _emitReadable(stream, state); } var destroyImpl = require_destroy(); @@ -2935,7 +2939,7 @@ var require_readable = __commonJS({ return readableAddChunk(this, chunk, encoding, true); }; function readableAddChunk(stream, chunk, encoding, addToFront) { - debug("readableAddChunk", chunk, stream.__id); + __DEBUG__ && debug("readableAddChunk", chunk, stream.__id); const state = stream._readableState; let err; if (!state.objectMode) { @@ -2998,8 +3002,8 @@ var require_readable = __commonJS({ ); } function addChunk(stream, state, chunk, addToFront) { - debug("adding chunk", stream.__id); - debug("chunk", chunk.toString(), stream.__id); + __DEBUG__ && debug("adding chunk", stream.__id); + __DEBUG__ && debug("chunk", chunk.toString(), stream.__id); if ( state.flowing && state.length === 0 && @@ -3017,7 +3021,8 @@ var require_readable = __commonJS({ state.length += state.objectMode ? 1 : chunk.length; if (addToFront) state.buffer.unshift(chunk); else state.buffer.push(chunk); - debug("needReadable @ addChunk", state.needReadable, stream.__id); + __DEBUG__ && + debug("needReadable @ addChunk", state.needReadable, stream.__id); if (state.needReadable) emitReadable(stream, state); } maybeReadMore(stream, state); @@ -3071,7 +3076,7 @@ var require_readable = __commonJS({ } // You can override either this method, or the async _read(n) below. Readable.prototype.read = function (n) { - debug("read - n =", n, this.__id); + __DEBUG__ && debug("read - n =", n, this.__id); if (!NumberIsInteger(n)) { n = NumberParseInt(n, 10); } @@ -3095,12 +3100,13 @@ var require_readable = __commonJS({ : state.length > 0) || state.ended) ) { - debug( - "read: emitReadable or endReadable", - state.length, - state.ended, - this.__id, - ); + __DEBUG__ && + debug( + "read: emitReadable or endReadable", + state.length, + state.ended, + this.__id, + ); if (state.length === 0 && state.ended) endReadable(this); else emitReadable(this, state); return null; @@ -3110,12 +3116,13 @@ var require_readable = __commonJS({ // If we've ended, and we're now clear, then finish it up. if (n === 0 && state.ended) { - debug( - "read: calling endReadable if length 0 -- length, state.ended", - state.length, - state.ended, - this.__id, - ); + __DEBUG__ && + debug( + "read: calling endReadable if length 0 -- length, state.ended", + state.length, + state.ended, + this.__id, + ); if (state.length === 0) endReadable(this); return null; } @@ -3144,12 +3151,12 @@ var require_readable = __commonJS({ // if we need a readable event, then we need to do some reading. let doRead = state.needReadable; - debug("need readable", doRead, this.__id); + __DEBUG__ && debug("need readable", doRead, this.__id); // If we currently have less than the highWaterMark, then also read some. if (state.length === 0 || state.length - n < state.highWaterMark) { doRead = true; - debug("length less than watermark", doRead, this.__id); + __DEBUG__ && debug("length less than watermark", doRead, this.__id); } // However, if we've ended, then there's no point, if we're already @@ -3162,11 +3169,11 @@ var require_readable = __commonJS({ state.errored || !state.constructed ) { - debug("state.constructed?", state.constructed, this.__id); + __DEBUG__ && debug("state.constructed?", state.constructed, this.__id); doRead = false; - debug("reading, ended or constructing", doRead, this.__id); + __DEBUG__ && debug("reading, ended or constructing", doRead, this.__id); } else if (doRead) { - debug("do read", this.__id); + __DEBUG__ && debug("do read", this.__id); state.reading = true; state.sync = true; // If the length is currently zero, then we *need* a readable event. @@ -3176,16 +3183,16 @@ var require_readable = __commonJS({ try { var result = this._read(state.highWaterMark); if (isPromise(result)) { - debug("async _read", this.__id); + __DEBUG__ && debug("async _read", this.__id); const peeked = Bun.peek(result); - debug("peeked promise", peeked, this.__id); + __DEBUG__ && debug("peeked promise", peeked, this.__id); if (peeked !== result) { result = peeked; } } if (isPromise(result) && result?.then && isCallable(result.then)) { - debug("async _read result.then setup", this.__id); + __DEBUG__ && debug("async _read result.then setup", this.__id); result.then(nop, function (err) { errorOrDestroy(this, err); }); @@ -3200,16 +3207,17 @@ var require_readable = __commonJS({ if (!state.reading) n = howMuchToRead(nOrig, state); } - debug("n @ fromList", n, this.__id); + __DEBUG__ && debug("n @ fromList", n, this.__id); let ret; if (n > 0) ret = fromList(n, state); else ret = null; - debug("ret @ read", ret, this.__id); + __DEBUG__ && debug("ret @ read", ret, this.__id); if (ret === null) { state.needReadable = state.length <= state.highWaterMark; - debug("state.length while ret = null", state.length, this.__id); + __DEBUG__ && + debug("state.length while ret = null", state.length, this.__id); n = 0; } else { state.length -= n; @@ -3251,7 +3259,8 @@ var require_readable = __commonJS({ } } state.pipes.push(dest); - debug("pipe count=%d opts=%j", state.pipes.length, pipeOpts, src.__id); + __DEBUG__ && + debug("pipe count=%d opts=%j", state.pipes.length, pipeOpts, src.__id); const doEnd = (!pipeOpts || pipeOpts.end !== false) && dest !== process.stdout && @@ -3261,7 +3270,7 @@ var require_readable = __commonJS({ else src.once("end", endFn); dest.on("unpipe", onunpipe); function onunpipe(readable, unpipeInfo) { - debug("onunpipe", src.__id); + __DEBUG__ && debug("onunpipe", src.__id); if (readable === src) { if (unpipeInfo && unpipeInfo.hasUnpiped === false) { unpipeInfo.hasUnpiped = true; @@ -3270,13 +3279,13 @@ var require_readable = __commonJS({ } } function onend() { - debug("onend", src.__id); + __DEBUG__ && debug("onend", src.__id); dest.end(); } let ondrain; let cleanedUp = false; function cleanup() { - debug("cleanup", src.__id); + __DEBUG__ && debug("cleanup", src.__id); dest.removeListener("close", onclose); dest.removeListener("finish", onfinish); if (ondrain) { @@ -3298,15 +3307,16 @@ var require_readable = __commonJS({ function pause() { if (!cleanedUp) { if (state.pipes.length === 1 && state.pipes[0] === dest) { - debug("false write response, pause", 0, src.__id); + __DEBUG__ && debug("false write response, pause", 0, src.__id); state.awaitDrainWriters = dest; state.multiAwaitDrain = false; } else if (state.pipes.length > 1 && state.pipes.includes(dest)) { - debug( - "false write response, pause", - state.awaitDrainWriters.size, - src.__id, - ); + __DEBUG__ && + debug( + "false write response, pause", + state.awaitDrainWriters.size, + src.__id, + ); state.awaitDrainWriters.add(dest); } src.pause(); @@ -3318,9 +3328,9 @@ var require_readable = __commonJS({ } src.on("data", ondata); function ondata(chunk) { - debug("ondata", src.__id); + __DEBUG__ && debug("ondata", src.__id); const ret = dest.write(chunk); - debug("dest.write", ret, src.__id); + __DEBUG__ && debug("dest.write", ret, src.__id); if (ret === false) { pause(); } @@ -3434,13 +3444,13 @@ var require_readable = __commonJS({ } } function nReadingNextTick(self) { - debug("on readable nextTick, calling read(0)", self.__id); + __DEBUG__ && debug("on readable nextTick, calling read(0)", self.__id); self.read(0); } Readable.prototype.resume = function () { const state = this._readableState; if (!state.flowing) { - debug("resume", this.__id); + __DEBUG__ && debug("resume", this.__id); state.flowing = !state.readableListening; resume(this, state); } @@ -3448,9 +3458,10 @@ var require_readable = __commonJS({ return this; }; Readable.prototype.pause = function () { - debug("call pause flowing=%j", this._readableState.flowing, this.__id); + __DEBUG__ && + debug("call pause flowing=%j", this._readableState.flowing, this.__id); if (this._readableState.flowing !== false) { - debug("pause", this.__id); + __DEBUG__ && debug("pause", this.__id); this._readableState.flowing = false; this.emit("pause"); } @@ -3687,19 +3698,21 @@ var require_readable = __commonJS({ } function endReadable(stream) { const state = stream._readableState; - debug("endEmitted @ endReadable", state.endEmitted, stream.__id); + __DEBUG__ && + debug("endEmitted @ endReadable", state.endEmitted, stream.__id); if (!state.endEmitted) { state.ended = true; runOnNextTick(endReadableNT, state, stream); } } function endReadableNT(state, stream) { - debug( - "endReadableNT -- endEmitted, state.length", - state.endEmitted, - state.length, - stream.__id, - ); + __DEBUG__ && + debug( + "endReadableNT -- endEmitted, state.length", + state.endEmitted, + state.length, + stream.__id, + ); if ( !state.errored && !state.closeEmitted && @@ -3708,7 +3721,7 @@ var require_readable = __commonJS({ ) { state.endEmitted = true; stream.emit("end"); - debug("end emitted @ endReadableNT", stream.__id); + __DEBUG__ && debug("end emitted @ endReadableNT", stream.__id); if (stream.writable && stream.allowHalfOpen === false) { runOnNextTick(endWritableNT, stream); } else if (state.autoDestroy) { @@ -4189,7 +4202,7 @@ var require_writable = __commonJS({ Writable.prototype._writev = null; Writable.prototype.end = function (chunk, encoding, cb, native = false) { const state = this._writableState; - debug("end", state, this.__id); + __DEBUG__ && debug("end", state, this.__id); if (typeof chunk === "function") { cb = chunk; chunk = null; @@ -4295,8 +4308,11 @@ var require_writable = __commonJS({ } } function finishMaybe(stream, state, sync) { - debug("finishMaybe -- state, sync", state, sync, stream.__id); + __DEBUG__ && + debug("finishMaybe -- state, sync", state, sync, stream.__id); + if (!needFinish(state, stream.__id)) return; + prefinish(stream, state); if (state.pendingcb === 0) { if (sync) { @@ -5921,21 +5937,21 @@ function createNativeStreamReadable(nativeType, Readable) { // However, in the case of an fs.ReadStream, we can pass the number of bytes we want to read // which may be significantly less than the actual highWaterMark _read(maxToRead) { - debug("NativeReadable._read", this.__id); + __DEBUG__ && debug("NativeReadable._read", this.__id); if (this.#pendingRead) { - debug("pendingRead is true", this.__id); + __DEBUG__ && debug("pendingRead is true", this.__id); return; } var ptr = this.#ptr; - debug("ptr @ NativeReadable._read", ptr, this.__id); + __DEBUG__ && debug("ptr @ NativeReadable._read", ptr, this.__id); if (ptr === 0) { this.push(null); return; } if (!this.#constructed) { - debug("NativeReadable not constructed yet", this.__id); + __DEBUG__ && debug("NativeReadable not constructed yet", this.__id); this.#internalConstruct(ptr); } @@ -5962,18 +5978,20 @@ function createNativeStreamReadable(nativeType, Readable) { #internalConstruct(ptr) { this.#constructed = true; const result = start(ptr, this.#highWaterMark); - debug("NativeReadable internal `start` result", result, this.__id); + __DEBUG__ && + debug("NativeReadable internal `start` result", result, this.__id); if (typeof result === "number" && result > 1) { this.#hasResized = true; - debug("NativeReadable resized", this.__id); + __DEBUG__ && debug("NativeReadable resized", this.__id); this.#highWaterMark = Math.min(this.#highWaterMark, result); } if (drainFn) { const drainResult = drainFn(ptr); - debug("NativeReadable drain result", drainResult, this.__id); + __DEBUG__ && + debug("NativeReadable drain result", drainResult, this.__id); if ((drainResult?.byteLength ?? 0) > 0) { this.push(drainResult); } @@ -5985,7 +6003,7 @@ function createNativeStreamReadable(nativeType, Readable) { // how many bytes they want to read (ie. when reading only part of a file) #getRemainingChunk(maxToRead = this.#highWaterMark) { var chunk = this.#remainingChunk; - debug("chunk @ #getRemainingChunk", chunk, this.__id); + __DEBUG__ && debug("chunk @ #getRemainingChunk", chunk, this.__id); if (chunk?.byteLength ?? 0 < MIN_BUFFER_SIZE) { var size = maxToRead > MIN_BUFFER_SIZE ? maxToRead : MIN_BUFFER_SIZE; this.#remainingChunk = chunk = new Buffer(size); @@ -5994,17 +6012,20 @@ function createNativeStreamReadable(nativeType, Readable) { } push(result, encoding) { - debug( - "NativeReadable push -- result, encoding", - result, - encoding, - this.__id, - ); + __DEBUG__ && + debug( + "NativeReadable push -- result, encoding", + result, + encoding, + this.__id, + ); return super.push(...arguments); } #handleResult(result, view, isClosed) { - debug("result, isClosed @ #handleResult", result, isClosed, this.__id); + __DEBUG__ && + debug("result, isClosed @ #handleResult", result, isClosed, this.__id); + if (typeof result === "number") { if (result >= this.#highWaterMark && !this.#hasResized && !isClosed) { this.#highWaterMark *= 2; @@ -6023,18 +6044,18 @@ function createNativeStreamReadable(nativeType, Readable) { ) { this.#highWaterMark *= 2; this.#hasResized = true; - debug("Resized", this.__id); + __DEBUG__ && debug("Resized", this.__id); } return handleArrayBufferViewResult(this, result, view, isClosed); } else { - debug("Unknown result type", result, this.__id); + __DEBUG__ && debug("Unknown result type", result, this.__id); throw new Error("Invalid result from pull"); } } #internalRead(view, ptr) { - debug("#internalRead()", this.__id); + __DEBUG__ && debug("#internalRead()", this.__id); closer[0] = false; var result = pull(ptr, view, closer); if (isPromise(result)) { @@ -6042,14 +6063,15 @@ function createNativeStreamReadable(nativeType, Readable) { return result.then( (result) => { this.#pendingRead = false; - debug( - "pending no longerrrrrrrr (result returned from pull)", - this.__id, - ); + __DEBUG__ && + debug( + "pending no longerrrrrrrr (result returned from pull)", + this.__id, + ); this.#remainingChunk = this.#handleResult(result, view, closer[0]); }, (reason) => { - debug("error from pull", reason, this.__id); + __DEBUG__ && debug("error from pull", reason, this.__id); errorOrDestroy(this, reason); }, ); @@ -6074,7 +6096,7 @@ function createNativeStreamReadable(nativeType, Readable) { if (updateRef) { updateRef(ptr, false); } - debug("NativeReadable destroyed", this.__id); + __DEBUG__ && debug("NativeReadable destroyed", this.__id); cancel(ptr, error); callback(error); } |