diff options
author | 2023-08-21 23:39:41 -0700 | |
---|---|---|
committer | 2023-08-21 23:39:41 -0700 | |
commit | 44e4d5852a1c31e1d124828fa6417d3d1884cdb2 (patch) | |
tree | 0ca2110b909f466d4afda454357257c4c5e933f1 /src/js/builtins/ProcessObjectInternals.ts | |
parent | 3a45f2c71bb17fbad0168fa76b32ae0c8ee67935 (diff) | |
download | bun-44e4d5852a1c31e1d124828fa6417d3d1884cdb2.tar.gz bun-44e4d5852a1c31e1d124828fa6417d3d1884cdb2.tar.zst bun-44e4d5852a1c31e1d124828fa6417d3d1884cdb2.zip |
fix stdin stream unref and resuming (#4250)
* fix stream unref and resuming stream
* fix `child-process-stdio` test
Diffstat (limited to 'src/js/builtins/ProcessObjectInternals.ts')
-rw-r--r-- | src/js/builtins/ProcessObjectInternals.ts | 28 |
1 files changed, 17 insertions, 11 deletions
diff --git a/src/js/builtins/ProcessObjectInternals.ts b/src/js/builtins/ProcessObjectInternals.ts index 548a2d984..e6c04c90f 100644 --- a/src/js/builtins/ProcessObjectInternals.ts +++ b/src/js/builtins/ProcessObjectInternals.ts @@ -87,11 +87,8 @@ export function getStdioWriteStream(fd) { } export function getStdinStream(fd) { - var { destroy } = require("node:stream"); - var reader: ReadableStreamDefaultReader | undefined; var readerRef; - var unrefOnRead = false; function ref() { reader ??= Bun.stdin.stream().getReader(); // TODO: remove this. likely we are dereferencing the stream @@ -104,6 +101,10 @@ export function getStdinStream(fd) { clearInterval(readerRef); readerRef = undefined; } + if (reader) { + reader.cancel(); + reader = undefined; + } } const tty = require("node:tty"); @@ -123,7 +124,6 @@ export function getStdinStream(fd) { // and does not apply to the underlying Stream implementation. if (event === "readable") { ref(); - unrefOnRead = true; } return originalOn.call(this, event, listener); }; @@ -163,7 +163,7 @@ export function getStdinStream(fd) { stream.push(value[i]); } } else { - stream.push(null); + stream.emit("end"); stream.pause(); } } catch (err) { @@ -172,22 +172,28 @@ export function getStdinStream(fd) { } stream._read = function (size) { - if (unrefOnRead) { - unref(); - unrefOnRead = false; - } internalRead(this); }; + stream.on("resume", () => { + ref(); + stream._undestroy(); + }); + + stream._readableState.reading = false; + stream.on("pause", () => { process.nextTick(() => { - destroy(stream); + if (!stream.readableFlowing) { + stream._readableState.reading = false; + } }); }); stream.on("close", () => { process.nextTick(() => { - reader?.cancel(); + stream.destroy(); + unref(); }); }); |