aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/bun.js/streams.exports.js24
-rw-r--r--test/bun.js/node-stream.test.js28
2 files changed, 41 insertions, 11 deletions
diff --git a/src/bun.js/streams.exports.js b/src/bun.js/streams.exports.js
index cc6325923..042dda5d9 100644
--- a/src/bun.js/streams.exports.js
+++ b/src/bun.js/streams.exports.js
@@ -77,7 +77,7 @@ DebugEventEmitter.prototype.on = function (event, handler) {
return _EE.prototype.on.call(this, event, handler);
};
DebugEventEmitter.prototype.addListener = function (event, handler) {
- this.on(event, handler);
+ return this.on(event, handler);
};
var __commonJS = (cb, mod) =>
@@ -3251,7 +3251,7 @@ var require_readable = __commonJS({
}
}
state.pipes.push(dest);
- debug("pipe count=%d opts=%j", state.pipes.length, pipeOpts, this.__id);
+ debug("pipe count=%d opts=%j", state.pipes.length, pipeOpts, src.__id);
const doEnd =
(!pipeOpts || pipeOpts.end !== false) &&
dest !== process.stdout &&
@@ -3261,7 +3261,7 @@ var require_readable = __commonJS({
else src.once("end", endFn);
dest.on("unpipe", onunpipe);
function onunpipe(readable, unpipeInfo) {
- debug("onunpipe", this.__id);
+ debug("onunpipe", src.__id);
if (readable === src) {
if (unpipeInfo && unpipeInfo.hasUnpiped === false) {
unpipeInfo.hasUnpiped = true;
@@ -3270,13 +3270,13 @@ var require_readable = __commonJS({
}
}
function onend() {
- debug("onend", this.__id);
+ debug("onend", src.__id);
dest.end();
}
let ondrain;
let cleanedUp = false;
function cleanup() {
- debug("cleanup", this__id);
+ debug("cleanup", src.__id);
dest.removeListener("close", onclose);
dest.removeListener("finish", onfinish);
if (ondrain) {
@@ -3298,11 +3298,15 @@ var require_readable = __commonJS({
function pause() {
if (!cleanedUp) {
if (state.pipes.length === 1 && state.pipes[0] === dest) {
- debug("false write response, pause", 0);
+ debug("false write response, pause", 0, src.__id);
state.awaitDrainWriters = dest;
state.multiAwaitDrain = false;
} else if (state.pipes.length > 1 && state.pipes.includes(dest)) {
- debug("false write response, pause", state.awaitDrainWriters.size);
+ debug(
+ "false write response, pause",
+ state.awaitDrainWriters.size,
+ src.__id,
+ );
state.awaitDrainWriters.add(dest);
}
src.pause();
@@ -3314,9 +3318,9 @@ var require_readable = __commonJS({
}
src.on("data", ondata);
function ondata(chunk) {
- debug("ondata");
+ debug("ondata", src.__id);
const ret = dest.write(chunk);
- debug("dest.write", ret);
+ debug("dest.write", ret, src.__id);
if (ret === false) {
pause();
}
@@ -3444,7 +3448,7 @@ var require_readable = __commonJS({
return this;
};
Readable.prototype.pause = function () {
- debug("call pause flowing=%j", this._readableState.flowing, this__id);
+ debug("call pause flowing=%j", this._readableState.flowing, this.__id);
if (this._readableState.flowing !== false) {
debug("pause", this.__id);
this._readableState.flowing = false;
diff --git a/test/bun.js/node-stream.test.js b/test/bun.js/node-stream.test.js
index 8024ab562..549f7c180 100644
--- a/test/bun.js/node-stream.test.js
+++ b/test/bun.js/node-stream.test.js
@@ -1,5 +1,31 @@
import { expect, describe, it } from "bun:test";
-import { Duplex, Transform, PassThrough } from "node:stream";
+import {
+ Readable,
+ Writable,
+ Duplex,
+ Transform,
+ PassThrough,
+} from "node:stream";
+
+describe("Readable", () => {
+ it("should be able to be piped via .pipe", () => {
+ const readable = new Readable({
+ _read() {
+ this.push("Hello World!");
+ this.push(null);
+ },
+ });
+
+ const writable = new Writable({
+ _write(chunk, encoding, callback) {
+ expect(chunk.toString()).toBe("Hello World!");
+ callback();
+ },
+ });
+
+ readable.pipe(writable);
+ });
+});
describe("Duplex", () => {
it("should allow subclasses to be derived via .call() on class", () => {