diff options
| author | 2022-12-12 20:58:28 -0600 | |
|---|---|---|
| committer | 2022-12-12 18:58:28 -0800 | |
| commit | 9f9db85a946dfc3c0a2075cc49e58fca5aab0ad3 (patch) | |
| tree | 6a80884b585c455e40ca35788e346c69f9eaebe4 /src | |
| parent | bbc2dacd840709f2fdf932ae27c978078f348ac6 (diff) | |
| download | bun-9f9db85a946dfc3c0a2075cc49e58fca5aab0ad3.tar.gz bun-9f9db85a946dfc3c0a2075cc49e58fca5aab0ad3.tar.zst bun-9f9db85a946dfc3c0a2075cc49e58fca5aab0ad3.zip | |
fix(stream): Fix Readable.pipe() (#1606)
* fix(stream): fix some debug logs that were breaking .pipe
* fix(stream): another debug fix
* test(stream): add test for .pipe
Diffstat (limited to 'src')
| -rw-r--r-- | src/bun.js/streams.exports.js | 24 |
1 files changed, 14 insertions, 10 deletions
diff --git a/src/bun.js/streams.exports.js b/src/bun.js/streams.exports.js index cc6325923..042dda5d9 100644 --- a/src/bun.js/streams.exports.js +++ b/src/bun.js/streams.exports.js @@ -77,7 +77,7 @@ DebugEventEmitter.prototype.on = function (event, handler) { return _EE.prototype.on.call(this, event, handler); }; DebugEventEmitter.prototype.addListener = function (event, handler) { - this.on(event, handler); + return this.on(event, handler); }; var __commonJS = (cb, mod) => @@ -3251,7 +3251,7 @@ var require_readable = __commonJS({ } } state.pipes.push(dest); - debug("pipe count=%d opts=%j", state.pipes.length, pipeOpts, this.__id); + debug("pipe count=%d opts=%j", state.pipes.length, pipeOpts, src.__id); const doEnd = (!pipeOpts || pipeOpts.end !== false) && dest !== process.stdout && @@ -3261,7 +3261,7 @@ var require_readable = __commonJS({ else src.once("end", endFn); dest.on("unpipe", onunpipe); function onunpipe(readable, unpipeInfo) { - debug("onunpipe", this.__id); + debug("onunpipe", src.__id); if (readable === src) { if (unpipeInfo && unpipeInfo.hasUnpiped === false) { unpipeInfo.hasUnpiped = true; @@ -3270,13 +3270,13 @@ var require_readable = __commonJS({ } } function onend() { - debug("onend", this.__id); + debug("onend", src.__id); dest.end(); } let ondrain; let cleanedUp = false; function cleanup() { - debug("cleanup", this__id); + debug("cleanup", src.__id); dest.removeListener("close", onclose); dest.removeListener("finish", onfinish); if (ondrain) { @@ -3298,11 +3298,15 @@ var require_readable = __commonJS({ function pause() { if (!cleanedUp) { if (state.pipes.length === 1 && state.pipes[0] === dest) { - debug("false write response, pause", 0); + debug("false write response, pause", 0, src.__id); state.awaitDrainWriters = dest; state.multiAwaitDrain = false; } else if (state.pipes.length > 1 && state.pipes.includes(dest)) { - debug("false write response, pause", state.awaitDrainWriters.size); + debug( + "false write response, pause", + state.awaitDrainWriters.size, + src.__id, + ); state.awaitDrainWriters.add(dest); } src.pause(); @@ -3314,9 +3318,9 @@ var require_readable = __commonJS({ } src.on("data", ondata); function ondata(chunk) { - debug("ondata"); + debug("ondata", src.__id); const ret = dest.write(chunk); - debug("dest.write", ret); + debug("dest.write", ret, src.__id); if (ret === false) { pause(); } @@ -3444,7 +3448,7 @@ var require_readable = __commonJS({ return this; }; Readable.prototype.pause = function () { - debug("call pause flowing=%j", this._readableState.flowing, this__id); + debug("call pause flowing=%j", this._readableState.flowing, this.__id); if (this._readableState.flowing !== false) { debug("pause", this.__id); this._readableState.flowing = false; |
