diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/bun.js/streams.exports.js | 208 | 
1 files changed, 115 insertions, 93 deletions
| diff --git a/src/bun.js/streams.exports.js b/src/bun.js/streams.exports.js index 042dda5d9..9a21e5d44 100644 --- a/src/bun.js/streams.exports.js +++ b/src/bun.js/streams.exports.js @@ -16,8 +16,11 @@ globalThis.__IDS_TO_TRACK = process.env.DEBUG_TRACK_EE?.length  // By default, child_process gives  const __TRACK_EE__ = !!process.env.DEBUG_TRACK_EE; -const __DEBUG__ = -  process.env.DEBUG || process.env.DEBUG_STREAMS || __TRACK_EE__; +const __DEBUG__ = !!( +  process.env.DEBUG || +  process.env.DEBUG_STREAMS || +  __TRACK_EE__ +);  var debug = __DEBUG__    ? globalThis.__IDS_TO_TRACK @@ -2659,31 +2662,32 @@ var require_readable = __commonJS({        if (ev === "data") {          state.readableListening = this.listenerCount("readable") > 0;          if (state.flowing !== false) { -          debug("in flowing mode!", this.__id); +          __DEBUG__ && debug("in flowing mode!", this.__id);            this.resume();          } else { -          debug("in readable mode!", this.__id); +          __DEBUG__ && debug("in readable mode!", this.__id);          }        } else if (ev === "readable") { -        debug("readable listener added!", this.__id); +        __DEBUG__ && debug("readable listener added!", this.__id);          if (!state.endEmitted && !state.readableListening) {            state.readableListening = state.needReadable = true;            state.flowing = false;            state.emittedReadable = false; -          debug( -            "on readable - state.length, reading, emittedReadable", -            state.length, -            state.reading, -            state.emittedReadable, -            this.__id, -          ); +          __DEBUG__ && +            debug( +              "on readable - state.length, reading, emittedReadable", +              state.length, +              state.reading, +              state.emittedReadable, +              this.__id, +            );            if (state.length) {              emitReadable(this, state);            } else if (!state.reading) {              runOnNextTick(nReadingNextTick, this);            }          } else if (state.endEmitted) { -          debug("end already emitted...", this.__id); +          __DEBUG__ && debug("end already emitted...", this.__id);          }        }        return res; @@ -2739,7 +2743,7 @@ var require_readable = __commonJS({        }        async _read() { -        debug("ReadableFromWeb _read()", this.__id); +        __DEBUG__ && debug("ReadableFromWeb _read()", this.__id);          var stream = this.#stream,            reader = this.#reader;          if (stream) { @@ -2900,7 +2904,7 @@ var require_readable = __commonJS({      }      // REVERT ME      function emitReadable(stream, state) { -      debug("NativeReadable - emitReadable", stream.__id); +      __DEBUG__ && debug("NativeReadable - emitReadable", stream.__id);        _emitReadable(stream, state);      }      var destroyImpl = require_destroy(); @@ -2935,7 +2939,7 @@ var require_readable = __commonJS({        return readableAddChunk(this, chunk, encoding, true);      };      function readableAddChunk(stream, chunk, encoding, addToFront) { -      debug("readableAddChunk", chunk, stream.__id); +      __DEBUG__ && debug("readableAddChunk", chunk, stream.__id);        const state = stream._readableState;        let err;        if (!state.objectMode) { @@ -2998,8 +3002,8 @@ var require_readable = __commonJS({        );      }      function addChunk(stream, state, chunk, addToFront) { -      debug("adding chunk", stream.__id); -      debug("chunk", chunk.toString(), stream.__id); +      __DEBUG__ && debug("adding chunk", stream.__id); +      __DEBUG__ && debug("chunk", chunk.toString(), stream.__id);        if (          state.flowing &&          state.length === 0 && @@ -3017,7 +3021,8 @@ var require_readable = __commonJS({          state.length += state.objectMode ? 1 : chunk.length;          if (addToFront) state.buffer.unshift(chunk);          else state.buffer.push(chunk); -        debug("needReadable @ addChunk", state.needReadable, stream.__id); +        __DEBUG__ && +          debug("needReadable @ addChunk", state.needReadable, stream.__id);          if (state.needReadable) emitReadable(stream, state);        }        maybeReadMore(stream, state); @@ -3071,7 +3076,7 @@ var require_readable = __commonJS({      }      // You can override either this method, or the async _read(n) below.      Readable.prototype.read = function (n) { -      debug("read - n =", n, this.__id); +      __DEBUG__ && debug("read - n =", n, this.__id);        if (!NumberIsInteger(n)) {          n = NumberParseInt(n, 10);        } @@ -3095,12 +3100,13 @@ var require_readable = __commonJS({            : state.length > 0) ||            state.ended)        ) { -        debug( -          "read: emitReadable or endReadable", -          state.length, -          state.ended, -          this.__id, -        ); +        __DEBUG__ && +          debug( +            "read: emitReadable or endReadable", +            state.length, +            state.ended, +            this.__id, +          );          if (state.length === 0 && state.ended) endReadable(this);          else emitReadable(this, state);          return null; @@ -3110,12 +3116,13 @@ var require_readable = __commonJS({        // If we've ended, and we're now clear, then finish it up.        if (n === 0 && state.ended) { -        debug( -          "read: calling endReadable if length 0 -- length, state.ended", -          state.length, -          state.ended, -          this.__id, -        ); +        __DEBUG__ && +          debug( +            "read: calling endReadable if length 0 -- length, state.ended", +            state.length, +            state.ended, +            this.__id, +          );          if (state.length === 0) endReadable(this);          return null;        } @@ -3144,12 +3151,12 @@ var require_readable = __commonJS({        // if we need a readable event, then we need to do some reading.        let doRead = state.needReadable; -      debug("need readable", doRead, this.__id); +      __DEBUG__ && debug("need readable", doRead, this.__id);        // If we currently have less than the highWaterMark, then also read some.        if (state.length === 0 || state.length - n < state.highWaterMark) {          doRead = true; -        debug("length less than watermark", doRead, this.__id); +        __DEBUG__ && debug("length less than watermark", doRead, this.__id);        }        // However, if we've ended, then there's no point, if we're already @@ -3162,11 +3169,11 @@ var require_readable = __commonJS({          state.errored ||          !state.constructed        ) { -        debug("state.constructed?", state.constructed, this.__id); +        __DEBUG__ && debug("state.constructed?", state.constructed, this.__id);          doRead = false; -        debug("reading, ended or constructing", doRead, this.__id); +        __DEBUG__ && debug("reading, ended or constructing", doRead, this.__id);        } else if (doRead) { -        debug("do read", this.__id); +        __DEBUG__ && debug("do read", this.__id);          state.reading = true;          state.sync = true;          // If the length is currently zero, then we *need* a readable event. @@ -3176,16 +3183,16 @@ var require_readable = __commonJS({          try {            var result = this._read(state.highWaterMark);            if (isPromise(result)) { -            debug("async _read", this.__id); +            __DEBUG__ && debug("async _read", this.__id);              const peeked = Bun.peek(result); -            debug("peeked promise", peeked, this.__id); +            __DEBUG__ && debug("peeked promise", peeked, this.__id);              if (peeked !== result) {                result = peeked;              }            }            if (isPromise(result) && result?.then && isCallable(result.then)) { -            debug("async _read result.then setup", this.__id); +            __DEBUG__ && debug("async _read result.then setup", this.__id);              result.then(nop, function (err) {                errorOrDestroy(this, err);              }); @@ -3200,16 +3207,17 @@ var require_readable = __commonJS({          if (!state.reading) n = howMuchToRead(nOrig, state);        } -      debug("n @ fromList", n, this.__id); +      __DEBUG__ && debug("n @ fromList", n, this.__id);        let ret;        if (n > 0) ret = fromList(n, state);        else ret = null; -      debug("ret @ read", ret, this.__id); +      __DEBUG__ && debug("ret @ read", ret, this.__id);        if (ret === null) {          state.needReadable = state.length <= state.highWaterMark; -        debug("state.length while ret = null", state.length, this.__id); +        __DEBUG__ && +          debug("state.length while ret = null", state.length, this.__id);          n = 0;        } else {          state.length -= n; @@ -3251,7 +3259,8 @@ var require_readable = __commonJS({          }        }        state.pipes.push(dest); -      debug("pipe count=%d opts=%j", state.pipes.length, pipeOpts, src.__id); +      __DEBUG__ && +        debug("pipe count=%d opts=%j", state.pipes.length, pipeOpts, src.__id);        const doEnd =          (!pipeOpts || pipeOpts.end !== false) &&          dest !== process.stdout && @@ -3261,7 +3270,7 @@ var require_readable = __commonJS({        else src.once("end", endFn);        dest.on("unpipe", onunpipe);        function onunpipe(readable, unpipeInfo) { -        debug("onunpipe", src.__id); +        __DEBUG__ && debug("onunpipe", src.__id);          if (readable === src) {            if (unpipeInfo && unpipeInfo.hasUnpiped === false) {              unpipeInfo.hasUnpiped = true; @@ -3270,13 +3279,13 @@ var require_readable = __commonJS({          }        }        function onend() { -        debug("onend", src.__id); +        __DEBUG__ && debug("onend", src.__id);          dest.end();        }        let ondrain;        let cleanedUp = false;        function cleanup() { -        debug("cleanup", src.__id); +        __DEBUG__ && debug("cleanup", src.__id);          dest.removeListener("close", onclose);          dest.removeListener("finish", onfinish);          if (ondrain) { @@ -3298,15 +3307,16 @@ var require_readable = __commonJS({        function pause() {          if (!cleanedUp) {            if (state.pipes.length === 1 && state.pipes[0] === dest) { -            debug("false write response, pause", 0, src.__id); +            __DEBUG__ && debug("false write response, pause", 0, src.__id);              state.awaitDrainWriters = dest;              state.multiAwaitDrain = false;            } else if (state.pipes.length > 1 && state.pipes.includes(dest)) { -            debug( -              "false write response, pause", -              state.awaitDrainWriters.size, -              src.__id, -            ); +            __DEBUG__ && +              debug( +                "false write response, pause", +                state.awaitDrainWriters.size, +                src.__id, +              );              state.awaitDrainWriters.add(dest);            }            src.pause(); @@ -3318,9 +3328,9 @@ var require_readable = __commonJS({        }        src.on("data", ondata);        function ondata(chunk) { -        debug("ondata", src.__id); +        __DEBUG__ && debug("ondata", src.__id);          const ret = dest.write(chunk); -        debug("dest.write", ret, src.__id); +        __DEBUG__ && debug("dest.write", ret, src.__id);          if (ret === false) {            pause();          } @@ -3434,13 +3444,13 @@ var require_readable = __commonJS({        }      }      function nReadingNextTick(self) { -      debug("on readable nextTick, calling read(0)", self.__id); +      __DEBUG__ && debug("on readable nextTick, calling read(0)", self.__id);        self.read(0);      }      Readable.prototype.resume = function () {        const state = this._readableState;        if (!state.flowing) { -        debug("resume", this.__id); +        __DEBUG__ && debug("resume", this.__id);          state.flowing = !state.readableListening;          resume(this, state);        } @@ -3448,9 +3458,10 @@ var require_readable = __commonJS({        return this;      };      Readable.prototype.pause = function () { -      debug("call pause flowing=%j", this._readableState.flowing, this.__id); +      __DEBUG__ && +        debug("call pause flowing=%j", this._readableState.flowing, this.__id);        if (this._readableState.flowing !== false) { -        debug("pause", this.__id); +        __DEBUG__ && debug("pause", this.__id);          this._readableState.flowing = false;          this.emit("pause");        } @@ -3687,19 +3698,21 @@ var require_readable = __commonJS({      }      function endReadable(stream) {        const state = stream._readableState; -      debug("endEmitted @ endReadable", state.endEmitted, stream.__id); +      __DEBUG__ && +        debug("endEmitted @ endReadable", state.endEmitted, stream.__id);        if (!state.endEmitted) {          state.ended = true;          runOnNextTick(endReadableNT, state, stream);        }      }      function endReadableNT(state, stream) { -      debug( -        "endReadableNT -- endEmitted, state.length", -        state.endEmitted, -        state.length, -        stream.__id, -      ); +      __DEBUG__ && +        debug( +          "endReadableNT -- endEmitted, state.length", +          state.endEmitted, +          state.length, +          stream.__id, +        );        if (          !state.errored &&          !state.closeEmitted && @@ -3708,7 +3721,7 @@ var require_readable = __commonJS({        ) {          state.endEmitted = true;          stream.emit("end"); -        debug("end emitted @ endReadableNT", stream.__id); +        __DEBUG__ && debug("end emitted @ endReadableNT", stream.__id);          if (stream.writable && stream.allowHalfOpen === false) {            runOnNextTick(endWritableNT, stream);          } else if (state.autoDestroy) { @@ -4189,7 +4202,7 @@ var require_writable = __commonJS({      Writable.prototype._writev = null;      Writable.prototype.end = function (chunk, encoding, cb, native = false) {        const state = this._writableState; -      debug("end", state, this.__id); +      __DEBUG__ && debug("end", state, this.__id);        if (typeof chunk === "function") {          cb = chunk;          chunk = null; @@ -4295,8 +4308,11 @@ var require_writable = __commonJS({        }      }      function finishMaybe(stream, state, sync) { -      debug("finishMaybe -- state, sync", state, sync, stream.__id); +      __DEBUG__ && +        debug("finishMaybe -- state, sync", state, sync, stream.__id); +        if (!needFinish(state, stream.__id)) return; +        prefinish(stream, state);        if (state.pendingcb === 0) {          if (sync) { @@ -5921,21 +5937,21 @@ function createNativeStreamReadable(nativeType, Readable) {      // However, in the case of an fs.ReadStream, we can pass the number of bytes we want to read      // which may be significantly less than the actual highWaterMark      _read(maxToRead) { -      debug("NativeReadable._read", this.__id); +      __DEBUG__ && debug("NativeReadable._read", this.__id);        if (this.#pendingRead) { -        debug("pendingRead is true", this.__id); +        __DEBUG__ && debug("pendingRead is true", this.__id);          return;        }        var ptr = this.#ptr; -      debug("ptr @ NativeReadable._read", ptr, this.__id); +      __DEBUG__ && debug("ptr @ NativeReadable._read", ptr, this.__id);        if (ptr === 0) {          this.push(null);          return;        }        if (!this.#constructed) { -        debug("NativeReadable not constructed yet", this.__id); +        __DEBUG__ && debug("NativeReadable not constructed yet", this.__id);          this.#internalConstruct(ptr);        } @@ -5962,18 +5978,20 @@ function createNativeStreamReadable(nativeType, Readable) {      #internalConstruct(ptr) {        this.#constructed = true;        const result = start(ptr, this.#highWaterMark); -      debug("NativeReadable internal `start` result", result, this.__id); +      __DEBUG__ && +        debug("NativeReadable internal `start` result", result, this.__id);        if (typeof result === "number" && result > 1) {          this.#hasResized = true; -        debug("NativeReadable resized", this.__id); +        __DEBUG__ && debug("NativeReadable resized", this.__id);          this.#highWaterMark = Math.min(this.#highWaterMark, result);        }        if (drainFn) {          const drainResult = drainFn(ptr); -        debug("NativeReadable drain result", drainResult, this.__id); +        __DEBUG__ && +          debug("NativeReadable drain result", drainResult, this.__id);          if ((drainResult?.byteLength ?? 0) > 0) {            this.push(drainResult);          } @@ -5985,7 +6003,7 @@ function createNativeStreamReadable(nativeType, Readable) {      // how many bytes they want to read (ie. when reading only part of a file)      #getRemainingChunk(maxToRead = this.#highWaterMark) {        var chunk = this.#remainingChunk; -      debug("chunk @ #getRemainingChunk", chunk, this.__id); +      __DEBUG__ && debug("chunk @ #getRemainingChunk", chunk, this.__id);        if (chunk?.byteLength ?? 0 < MIN_BUFFER_SIZE) {          var size = maxToRead > MIN_BUFFER_SIZE ? maxToRead : MIN_BUFFER_SIZE;          this.#remainingChunk = chunk = new Buffer(size); @@ -5994,17 +6012,20 @@ function createNativeStreamReadable(nativeType, Readable) {      }      push(result, encoding) { -      debug( -        "NativeReadable push -- result, encoding", -        result, -        encoding, -        this.__id, -      ); +      __DEBUG__ && +        debug( +          "NativeReadable push -- result, encoding", +          result, +          encoding, +          this.__id, +        );        return super.push(...arguments);      }      #handleResult(result, view, isClosed) { -      debug("result, isClosed @ #handleResult", result, isClosed, this.__id); +      __DEBUG__ && +        debug("result, isClosed @ #handleResult", result, isClosed, this.__id); +        if (typeof result === "number") {          if (result >= this.#highWaterMark && !this.#hasResized && !isClosed) {            this.#highWaterMark *= 2; @@ -6023,18 +6044,18 @@ function createNativeStreamReadable(nativeType, Readable) {          ) {            this.#highWaterMark *= 2;            this.#hasResized = true; -          debug("Resized", this.__id); +          __DEBUG__ && debug("Resized", this.__id);          }          return handleArrayBufferViewResult(this, result, view, isClosed);        } else { -        debug("Unknown result type", result, this.__id); +        __DEBUG__ && debug("Unknown result type", result, this.__id);          throw new Error("Invalid result from pull");        }      }      #internalRead(view, ptr) { -      debug("#internalRead()", this.__id); +      __DEBUG__ && debug("#internalRead()", this.__id);        closer[0] = false;        var result = pull(ptr, view, closer);        if (isPromise(result)) { @@ -6042,14 +6063,15 @@ function createNativeStreamReadable(nativeType, Readable) {          return result.then(            (result) => {              this.#pendingRead = false; -            debug( -              "pending no longerrrrrrrr (result returned from pull)", -              this.__id, -            ); +            __DEBUG__ && +              debug( +                "pending no longerrrrrrrr (result returned from pull)", +                this.__id, +              );              this.#remainingChunk = this.#handleResult(result, view, closer[0]);            },            (reason) => { -            debug("error from pull", reason, this.__id); +            __DEBUG__ && debug("error from pull", reason, this.__id);              errorOrDestroy(this, reason);            },          ); @@ -6074,7 +6096,7 @@ function createNativeStreamReadable(nativeType, Readable) {        if (updateRef) {          updateRef(ptr, false);        } -      debug("NativeReadable destroyed", this.__id); +      __DEBUG__ && debug("NativeReadable destroyed", this.__id);        cancel(ptr, error);        callback(error);      } | 
