aboutsummaryrefslogtreecommitdiff
path: root/src/js/builtins/ProcessObjectInternals.ts
diff options
context:
space:
mode:
Diffstat (limited to 'src/js/builtins/ProcessObjectInternals.ts')
-rw-r--r--src/js/builtins/ProcessObjectInternals.ts28
1 files changed, 17 insertions, 11 deletions
diff --git a/src/js/builtins/ProcessObjectInternals.ts b/src/js/builtins/ProcessObjectInternals.ts
index 548a2d984..e6c04c90f 100644
--- a/src/js/builtins/ProcessObjectInternals.ts
+++ b/src/js/builtins/ProcessObjectInternals.ts
@@ -87,11 +87,8 @@ export function getStdioWriteStream(fd) {
}
export function getStdinStream(fd) {
- var { destroy } = require("node:stream");
-
var reader: ReadableStreamDefaultReader | undefined;
var readerRef;
- var unrefOnRead = false;
function ref() {
reader ??= Bun.stdin.stream().getReader();
// TODO: remove this. likely we are dereferencing the stream
@@ -104,6 +101,10 @@ export function getStdinStream(fd) {
clearInterval(readerRef);
readerRef = undefined;
}
+ if (reader) {
+ reader.cancel();
+ reader = undefined;
+ }
}
const tty = require("node:tty");
@@ -123,7 +124,6 @@ export function getStdinStream(fd) {
// and does not apply to the underlying Stream implementation.
if (event === "readable") {
ref();
- unrefOnRead = true;
}
return originalOn.call(this, event, listener);
};
@@ -163,7 +163,7 @@ export function getStdinStream(fd) {
stream.push(value[i]);
}
} else {
- stream.push(null);
+ stream.emit("end");
stream.pause();
}
} catch (err) {
@@ -172,22 +172,28 @@ export function getStdinStream(fd) {
}
stream._read = function (size) {
- if (unrefOnRead) {
- unref();
- unrefOnRead = false;
- }
internalRead(this);
};
+ stream.on("resume", () => {
+ ref();
+ stream._undestroy();
+ });
+
+ stream._readableState.reading = false;
+
stream.on("pause", () => {
process.nextTick(() => {
- destroy(stream);
+ if (!stream.readableFlowing) {
+ stream._readableState.reading = false;
+ }
});
});
stream.on("close", () => {
process.nextTick(() => {
- reader?.cancel();
+ stream.destroy();
+ unref();
});
});