diff options
Diffstat (limited to '')
| -rw-r--r-- | src/bun.js/streams.exports.js | 24 | ||||
| -rw-r--r-- | test/bun.js/node-stream.test.js | 28 |
2 files changed, 41 insertions, 11 deletions
diff --git a/src/bun.js/streams.exports.js b/src/bun.js/streams.exports.js index cc6325923..042dda5d9 100644 --- a/src/bun.js/streams.exports.js +++ b/src/bun.js/streams.exports.js @@ -77,7 +77,7 @@ DebugEventEmitter.prototype.on = function (event, handler) { return _EE.prototype.on.call(this, event, handler); }; DebugEventEmitter.prototype.addListener = function (event, handler) { - this.on(event, handler); + return this.on(event, handler); }; var __commonJS = (cb, mod) => @@ -3251,7 +3251,7 @@ var require_readable = __commonJS({ } } state.pipes.push(dest); - debug("pipe count=%d opts=%j", state.pipes.length, pipeOpts, this.__id); + debug("pipe count=%d opts=%j", state.pipes.length, pipeOpts, src.__id); const doEnd = (!pipeOpts || pipeOpts.end !== false) && dest !== process.stdout && @@ -3261,7 +3261,7 @@ var require_readable = __commonJS({ else src.once("end", endFn); dest.on("unpipe", onunpipe); function onunpipe(readable, unpipeInfo) { - debug("onunpipe", this.__id); + debug("onunpipe", src.__id); if (readable === src) { if (unpipeInfo && unpipeInfo.hasUnpiped === false) { unpipeInfo.hasUnpiped = true; @@ -3270,13 +3270,13 @@ var require_readable = __commonJS({ } } function onend() { - debug("onend", this.__id); + debug("onend", src.__id); dest.end(); } let ondrain; let cleanedUp = false; function cleanup() { - debug("cleanup", this__id); + debug("cleanup", src.__id); dest.removeListener("close", onclose); dest.removeListener("finish", onfinish); if (ondrain) { @@ -3298,11 +3298,15 @@ var require_readable = __commonJS({ function pause() { if (!cleanedUp) { if (state.pipes.length === 1 && state.pipes[0] === dest) { - debug("false write response, pause", 0); + debug("false write response, pause", 0, src.__id); state.awaitDrainWriters = dest; state.multiAwaitDrain = false; } else if (state.pipes.length > 1 && state.pipes.includes(dest)) { - debug("false write response, pause", state.awaitDrainWriters.size); + debug( + "false write response, pause", + state.awaitDrainWriters.size, + src.__id, + ); state.awaitDrainWriters.add(dest); } src.pause(); @@ -3314,9 +3318,9 @@ var require_readable = __commonJS({ } src.on("data", ondata); function ondata(chunk) { - debug("ondata"); + debug("ondata", src.__id); const ret = dest.write(chunk); - debug("dest.write", ret); + debug("dest.write", ret, src.__id); if (ret === false) { pause(); } @@ -3444,7 +3448,7 @@ var require_readable = __commonJS({ return this; }; Readable.prototype.pause = function () { - debug("call pause flowing=%j", this._readableState.flowing, this__id); + debug("call pause flowing=%j", this._readableState.flowing, this.__id); if (this._readableState.flowing !== false) { debug("pause", this.__id); this._readableState.flowing = false; diff --git a/test/bun.js/node-stream.test.js b/test/bun.js/node-stream.test.js index 8024ab562..549f7c180 100644 --- a/test/bun.js/node-stream.test.js +++ b/test/bun.js/node-stream.test.js @@ -1,5 +1,31 @@ import { expect, describe, it } from "bun:test"; -import { Duplex, Transform, PassThrough } from "node:stream"; +import { + Readable, + Writable, + Duplex, + Transform, + PassThrough, +} from "node:stream"; + +describe("Readable", () => { + it("should be able to be piped via .pipe", () => { + const readable = new Readable({ + _read() { + this.push("Hello World!"); + this.push(null); + }, + }); + + const writable = new Writable({ + _write(chunk, encoding, callback) { + expect(chunk.toString()).toBe("Hello World!"); + callback(); + }, + }); + + readable.pipe(writable); + }); +}); describe("Duplex", () => { it("should allow subclasses to be derived via .call() on class", () => { |
