aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorGravatar Derrick Farris <mr.dcfarris@gmail.com> 2022-12-12 20:58:28 -0600
committerGravatar GitHub <noreply@github.com> 2022-12-12 18:58:28 -0800
commit9f9db85a946dfc3c0a2075cc49e58fca5aab0ad3 (patch)
tree6a80884b585c455e40ca35788e346c69f9eaebe4 /src
parentbbc2dacd840709f2fdf932ae27c978078f348ac6 (diff)
downloadbun-9f9db85a946dfc3c0a2075cc49e58fca5aab0ad3.tar.gz
bun-9f9db85a946dfc3c0a2075cc49e58fca5aab0ad3.tar.zst
bun-9f9db85a946dfc3c0a2075cc49e58fca5aab0ad3.zip
fix(stream): Fix Readable.pipe() (#1606)
* fix(stream): fix some debug logs that were breaking .pipe * fix(stream): another debug fix * test(stream): add test for .pipe
Diffstat (limited to 'src')
-rw-r--r--src/bun.js/streams.exports.js24
1 files changed, 14 insertions, 10 deletions
diff --git a/src/bun.js/streams.exports.js b/src/bun.js/streams.exports.js
index cc6325923..042dda5d9 100644
--- a/src/bun.js/streams.exports.js
+++ b/src/bun.js/streams.exports.js
@@ -77,7 +77,7 @@ DebugEventEmitter.prototype.on = function (event, handler) {
return _EE.prototype.on.call(this, event, handler);
};
DebugEventEmitter.prototype.addListener = function (event, handler) {
- this.on(event, handler);
+ return this.on(event, handler);
};
var __commonJS = (cb, mod) =>
@@ -3251,7 +3251,7 @@ var require_readable = __commonJS({
}
}
state.pipes.push(dest);
- debug("pipe count=%d opts=%j", state.pipes.length, pipeOpts, this.__id);
+ debug("pipe count=%d opts=%j", state.pipes.length, pipeOpts, src.__id);
const doEnd =
(!pipeOpts || pipeOpts.end !== false) &&
dest !== process.stdout &&
@@ -3261,7 +3261,7 @@ var require_readable = __commonJS({
else src.once("end", endFn);
dest.on("unpipe", onunpipe);
function onunpipe(readable, unpipeInfo) {
- debug("onunpipe", this.__id);
+ debug("onunpipe", src.__id);
if (readable === src) {
if (unpipeInfo && unpipeInfo.hasUnpiped === false) {
unpipeInfo.hasUnpiped = true;
@@ -3270,13 +3270,13 @@ var require_readable = __commonJS({
}
}
function onend() {
- debug("onend", this.__id);
+ debug("onend", src.__id);
dest.end();
}
let ondrain;
let cleanedUp = false;
function cleanup() {
- debug("cleanup", this__id);
+ debug("cleanup", src.__id);
dest.removeListener("close", onclose);
dest.removeListener("finish", onfinish);
if (ondrain) {
@@ -3298,11 +3298,15 @@ var require_readable = __commonJS({
function pause() {
if (!cleanedUp) {
if (state.pipes.length === 1 && state.pipes[0] === dest) {
- debug("false write response, pause", 0);
+ debug("false write response, pause", 0, src.__id);
state.awaitDrainWriters = dest;
state.multiAwaitDrain = false;
} else if (state.pipes.length > 1 && state.pipes.includes(dest)) {
- debug("false write response, pause", state.awaitDrainWriters.size);
+ debug(
+ "false write response, pause",
+ state.awaitDrainWriters.size,
+ src.__id,
+ );
state.awaitDrainWriters.add(dest);
}
src.pause();
@@ -3314,9 +3318,9 @@ var require_readable = __commonJS({
}
src.on("data", ondata);
function ondata(chunk) {
- debug("ondata");
+ debug("ondata", src.__id);
const ret = dest.write(chunk);
- debug("dest.write", ret);
+ debug("dest.write", ret, src.__id);
if (ret === false) {
pause();
}
@@ -3444,7 +3448,7 @@ var require_readable = __commonJS({
return this;
};
Readable.prototype.pause = function () {
- debug("call pause flowing=%j", this._readableState.flowing, this__id);
+ debug("call pause flowing=%j", this._readableState.flowing, this.__id);
if (this._readableState.flowing !== false) {
debug("pause", this.__id);
this._readableState.flowing = false;