aboutsummaryrefslogtreecommitdiff
path: root/src/js/builtins/ProcessObjectInternals.ts
diff options
context:
space:
mode:
Diffstat (limited to 'src/js/builtins/ProcessObjectInternals.ts')
-rw-r--r--src/js/builtins/ProcessObjectInternals.ts740
1 files changed, 114 insertions, 626 deletions
diff --git a/src/js/builtins/ProcessObjectInternals.ts b/src/js/builtins/ProcessObjectInternals.ts
index 242b310fd..548a2d984 100644
--- a/src/js/builtins/ProcessObjectInternals.ts
+++ b/src/js/builtins/ProcessObjectInternals.ts
@@ -44,664 +44,152 @@ export function binding(bindingName) {
);
}
-export function getStdioWriteStream(fd_, getWindowSize) {
- var EventEmitter = require("node:events");
+export function getStdioWriteStream(fd) {
+ const tty = require("node:tty");
- function createStdioWriteStream(fd_) {
- var { Duplex, eos, destroy } = require("node:stream");
- var StdioWriteStream = class StdioWriteStream extends Duplex {
- #writeStream;
- #readStream;
+ const stream = tty.WriteStream(fd);
- #readable = true;
- #writable = true;
- #fdPath;
+ process.on("SIGWINCH", () => {
+ stream._refreshSize();
+ });
- #onClose;
- #onDrain;
- #onFinish;
- #onReadable;
- #isTTY;
+ if (fd === 1) {
+ stream.destroySoon = stream.destroy;
+ stream._destroy = function (err, cb) {
+ cb(err);
+ this._undestroy();
- get isTTY() {
- return (this.#isTTY ??= require("node:tty").isatty(fd_));
- }
-
- get fd() {
- return fd_;
- }
-
- get writable() {
- return this.#writable;
- }
-
- get readable() {
- return this.#readable;
- }
-
- constructor(fd) {
- super({ readable: true, writable: true });
- this.#fdPath = `/dev/fd/${fd}`;
- }
-
- #onFinished(err) {
- const cb = this.#onClose;
- this.#onClose = null;
-
- if (cb) {
- cb(err);
- } else if (err) {
- this.destroy(err);
- } else if (!this.#readable && !this.#writable) {
- this.destroy();
- }
- }
-
- _destroy(err, callback) {
- if (!err && this.#onClose !== null) {
- var AbortError = class AbortError extends Error {
- code: string;
- name: string;
- constructor(message = "The operation was aborted", options = void 0) {
- if (options !== void 0 && typeof options !== "object") {
- throw new Error(`Invalid AbortError options:\n\n${JSON.stringify(options, null, 2)}`);
- }
- super(message, options);
- this.code = "ABORT_ERR";
- this.name = "AbortError";
- }
- };
- err = new AbortError();
- }
-
- this.#onDrain = null;
- this.#onFinish = null;
- if (this.#onClose === null) {
- callback(err);
- } else {
- this.#onClose = callback;
- if (this.#writeStream) destroy(this.#writeStream, err);
- if (this.#readStream) destroy(this.#readStream, err);
- }
- }
-
- _write(chunk, encoding, callback) {
- if (!this.#writeStream) {
- var { createWriteStream } = require("node:fs");
- var stream = (this.#writeStream = createWriteStream(this.#fdPath));
-
- stream.on("finish", () => {
- if (this.#onFinish) {
- const cb = this.#onFinish;
- this.#onFinish = null;
- cb();
- }
- });
-
- stream.on("drain", () => {
- if (this.#onDrain) {
- const cb = this.#onDrain;
- this.#onDrain = null;
- cb();
- }
- });
-
- eos(stream, err => {
- this.#writable = false;
- if (err) {
- destroy(stream, err);
- }
- this.#onFinished(err);
- });
- }
-
- if (this.#writeStream.write(chunk, encoding)) {
- callback();
- } else {
- this.#onDrain = callback;
- }
- }
-
- _final(callback) {
- this.#writeStream && this.#writeStream.end();
- this.#onFinish = callback;
- }
-
- #loadReadStream() {
- var { createReadStream } = require("node:fs");
-
- var readStream = (this.#readStream = createReadStream(this.#fdPath));
-
- readStream.on("readable", () => {
- if (this.#onReadable) {
- const cb = this.#onReadable;
- this.#onReadable = null;
- cb();
- } else {
- this.read();
- }
- });
-
- readStream.on("end", () => {
- this.push(null);
+ if (!this._writableState.emitClose) {
+ process.nextTick(() => {
+ this.emit("close");
});
-
- eos(readStream, err => {
- this.#readable = false;
- if (err) {
- destroy(readStream, err);
- }
- this.#onFinished(err);
- });
- return readStream;
- }
-
- _read() {
- var stream = this.#readStream;
- if (!stream) {
- stream = this.#loadReadStream();
- }
-
- while (true) {
- const buf = stream.read();
- if (buf === null || !this.push(buf)) {
- return;
- }
- }
}
};
- return new StdioWriteStream(fd_);
- }
-
- function isFastEncoding(encoding) {
- if (!encoding) return true;
-
- var normalied = encoding.toLowerCase();
- return normalied === "utf8" || normalied === "utf-8" || normalied === "buffer" || normalied === "binary";
- }
-
- var readline;
- var windowSizeArray = [0, 0];
-
- var FastStdioWriteStreamInternal = class StdioWriteStream extends EventEmitter {
- #fd;
- #innerStream;
- #writer;
- #isTTY;
-
- bytesWritten = 0;
-
- setDefaultEncoding(encoding) {
- if (this.#innerStream || !isFastEncoding(encoding)) {
- this.#ensureInnerStream();
- return this.#innerStream.setDefaultEncoding(encoding);
- }
- }
-
- #createWriter() {
- switch (this.#fd) {
- case 1: {
- var writer = Bun.stdout.writer({ highWaterMark: 0 });
- writer.unref();
- return writer;
- }
-
- case 2: {
- var writer = Bun.stderr.writer({ highWaterMark: 0 });
- writer.unref();
- return writer;
- }
- default: {
- throw new Error("Unsupported writer");
- }
- }
- }
-
- #getWriter() {
- return (this.#writer ??= this.#createWriter());
- }
-
- constructor(fd_) {
- super();
- this.#fd = fd_;
- }
-
- get fd() {
- return this.#fd;
- }
-
- ref() {
- this.#getWriter().ref();
- }
-
- unref() {
- this.#getWriter().unref();
- }
-
- on(event, listener) {
- if (event === "close" || event === "finish") {
- this.#ensureInnerStream();
- return this.#innerStream.on(event, listener);
- }
-
- if (event === "drain") {
- return super.on("drain", listener);
- }
-
- if (event === "error") {
- return super.on("error", listener);
- }
-
- return super.on(event, listener);
- }
-
- get _writableState() {
- this.#ensureInnerStream();
- return this.#innerStream._writableState;
- }
-
- get _readableState() {
- this.#ensureInnerStream();
- return this.#innerStream._readableState;
- }
-
- get writable() {
- this.#ensureInnerStream();
- return this.#innerStream.writable;
- }
-
- get readable() {
- this.#ensureInnerStream();
- return this.#innerStream.readable;
- }
-
- pipe(destination) {
- this.#ensureInnerStream();
- return this.#innerStream.pipe(destination);
- }
-
- unpipe(destination) {
- this.#ensureInnerStream();
- return this.#innerStream.unpipe(destination);
- }
-
- #ensureInnerStream() {
- if (this.#innerStream) return;
- this.#innerStream = createStdioWriteStream(this.#fd);
- const events = this.eventNames();
- for (const event of events) {
- this.#innerStream.on(event, (...args) => {
- this.emit(event, ...args);
+ } else if (fd === 2) {
+ stream.destroySoon = stream.destroy;
+ stream._destroy = function (err, cb) {
+ cb(err);
+ this._undestroy();
+
+ if (!this._writableState.emitClose) {
+ process.nextTick(() => {
+ this.emit("close");
});
}
- }
-
- #write1(chunk) {
- var writer = this.#getWriter();
- const writeResult = writer.write(chunk);
- this.bytesWritten += writeResult;
- const flushResult = writer.flush(false);
- return !!(writeResult || flushResult);
- }
-
- #writeWithEncoding(chunk, encoding) {
- if (!isFastEncoding(encoding)) {
- this.#ensureInnerStream();
- return this.#innerStream.write(chunk, encoding);
- }
-
- return this.#write1(chunk);
- }
-
- #performCallback(cb, err?: any) {
- if (err) {
- this.emit("error", err);
- }
-
- try {
- cb(err ? err : null);
- } catch (err2) {
- this.emit("error", err2);
- }
- }
-
- #writeWithCallbackAndEncoding(chunk, encoding, callback) {
- if (!isFastEncoding(encoding)) {
- this.#ensureInnerStream();
- return this.#innerStream.write(chunk, encoding, callback);
- }
-
- var writer = this.#getWriter();
- const writeResult = writer.write(chunk);
- const flushResult = writer.flush(true);
- if (flushResult?.then) {
- flushResult.then(
- () => {
- this.#performCallback(callback);
- this.emit("drain");
- },
- err => this.#performCallback(callback, err),
- );
- return false;
- }
-
- queueMicrotask(() => {
- this.#performCallback(callback);
- });
-
- return !!(writeResult || flushResult);
- }
-
- get isTTY() {
- return false;
- }
-
- write(chunk, encoding, callback) {
- const result = this._write(chunk, encoding, callback);
-
- if (result) {
- this.emit("drain");
- }
-
- return result;
- }
-
- get hasColors() {
- return Bun.tty[this.#fd].hasColors;
- }
-
- _write(chunk, encoding, callback) {
- var inner = this.#innerStream;
- if (inner) {
- return inner.write(chunk, encoding, callback);
- }
-
- switch (arguments.length) {
- case 0: {
- var error = new Error("Invalid arguments");
- error.code = "ERR_INVALID_ARG_TYPE";
- throw error;
- }
- case 1: {
- return this.#write1(chunk);
- }
- case 2: {
- if (typeof encoding === "function") {
- return this.#writeWithCallbackAndEncoding(chunk, "", encoding);
- } else if (typeof encoding === "string") {
- return this.#writeWithEncoding(chunk, encoding);
- }
- }
- default: {
- if (
- (typeof encoding !== "undefined" && typeof encoding !== "string") ||
- (typeof callback !== "undefined" && typeof callback !== "function")
- ) {
- var error = new Error("Invalid arguments");
- error.code = "ERR_INVALID_ARG_TYPE";
- throw error;
- }
-
- if (typeof callback === "undefined") {
- return this.#writeWithEncoding(chunk, encoding);
- }
-
- return this.#writeWithCallbackAndEncoding(chunk, encoding, callback);
- }
- }
- }
-
- destroy() {
- return this;
- }
-
- end() {
- return this;
- }
- };
- if (getWindowSize(fd_, windowSizeArray)) {
- var WriteStream = class WriteStream extends FastStdioWriteStreamInternal {
- get isTTY() {
- return true;
- }
-
- cursorTo(x, y, callback) {
- return (readline ??= require("node:readline")).cursorTo(this, x, y, callback);
- }
-
- moveCursor(dx, dy, callback) {
- return (readline ??= require("node:readline")).moveCursor(this, dx, dy, callback);
- }
-
- clearLine(dir, callback) {
- return (readline ??= require("node:readline")).clearLine(this, dir, callback);
- }
-
- clearScreenDown(callback) {
- return (readline ??= require("node:readline")).clearScreenDown(this, callback);
- }
-
- getWindowSize() {
- if (getWindowSize(fd_, windowSizeArray) === true) {
- return [windowSizeArray[0], windowSizeArray[1]];
- }
- }
-
- get columns() {
- if (getWindowSize(fd_, windowSizeArray) === true) {
- return windowSizeArray[0];
- }
- }
-
- get rows() {
- if (getWindowSize(fd_, windowSizeArray) === true) {
- return windowSizeArray[1];
- }
- }
};
-
- return new WriteStream(fd_);
}
- return new FastStdioWriteStreamInternal(fd_);
-}
-
-export function getStdinStream(fd_) {
- var { Duplex, eos, destroy } = require("node:stream");
+ stream._type = "tty";
+ stream._isStdio = true;
+ stream.fd = fd;
- var StdinStream = class StdinStream extends Duplex {
- #reader;
- // TODO: investigate https://github.com/oven-sh/bun/issues/1607
-
- #readRef;
- #writeStream;
-
- #readable = true;
- #unrefOnRead = false;
- #writable = true;
+ return stream;
+}
- #onFinish;
- #onClose;
- #onDrain;
+export function getStdinStream(fd) {
+ var { destroy } = require("node:stream");
+
+ var reader: ReadableStreamDefaultReader | undefined;
+ var readerRef;
+ var unrefOnRead = false;
+ function ref() {
+ reader ??= Bun.stdin.stream().getReader();
+ // TODO: remove this. likely we are dereferencing the stream
+ // when there is still more data to be read.
+ readerRef ??= setInterval(() => {}, 1 << 30);
+ }
- get isTTY() {
- return require("node:tty").isatty(fd_);
+ function unref() {
+ if (readerRef) {
+ clearInterval(readerRef);
+ readerRef = undefined;
}
+ }
- get fd() {
- return fd_;
- }
+ const tty = require("node:tty");
+
+ const stream = new tty.ReadStream(fd);
+
+ const originalOn = stream.on;
+ stream.on = function (event, listener) {
+ // Streams don't generally required to present any data when only
+ // `readable` events are present, i.e. `readableFlowing === false`
+ //
+ // However, Node.js has a this quirk whereby `process.stdin.read()`
+ // blocks under TTY mode, thus looping `.read()` in this particular
+ // case would not result in truncation.
+ //
+ // Therefore the following hack is only specific to `process.stdin`
+ // and does not apply to the underlying Stream implementation.
+ if (event === "readable") {
+ ref();
+ unrefOnRead = true;
+ }
+ return originalOn.call(this, event, listener);
+ };
- constructor() {
- super({ readable: true, writable: true });
- }
+ stream.fd = fd;
- #onFinished(err?) {
- const cb = this.#onClose;
- this.#onClose = null;
+ const originalPause = stream.pause;
+ stream.pause = function () {
+ unref();
+ return originalPause.call(this);
+ };
- if (cb) {
- cb(err);
- } else if (err) {
- this.destroy(err);
- } else if (!this.#readable && !this.#writable) {
- this.destroy();
- }
- }
+ const originalResume = stream.resume;
+ stream.resume = function () {
+ ref();
+ return originalResume.call(this);
+ };
- _destroy(err, callback) {
- if (!err && this.#onClose !== null) {
- var AbortError = class AbortError extends Error {
- constructor(message = "The operation was aborted", options = void 0) {
- if (options !== void 0 && typeof options !== "object") {
- throw new Error(`Invalid AbortError options:\n\n${JSON.stringify(options, null, 2)}`);
- }
- super(message, options);
- this.code = "ABORT_ERR";
- this.name = "AbortError";
- }
- };
- err = new AbortError();
- }
+ async function internalRead(stream) {
+ try {
+ var done: any, value: any;
+ const read = reader?.readMany();
- if (this.#onClose === null) {
- callback(err);
+ if ($isPromise(read)) {
+ ({ done, value } = await read);
} else {
- this.#onClose = callback;
- if (this.#writeStream) destroy(this.#writeStream, err);
- }
- }
-
- setRawMode(mode) {}
-
- on(name, callback) {
- // Streams don't generally required to present any data when only
- // `readable` events are present, i.e. `readableFlowing === false`
- //
- // However, Node.js has a this quirk whereby `process.stdin.read()`
- // blocks under TTY mode, thus looping `.read()` in this particular
- // case would not result in truncation.
- //
- // Therefore the following hack is only specific to `process.stdin`
- // and does not apply to the underlying Stream implementation.
- if (name === "readable") {
- this.ref();
- this.#unrefOnRead = true;
- }
- return super.on(name, callback);
- }
-
- pause() {
- this.unref();
- return super.pause();
- }
-
- resume() {
- this.ref();
- return super.resume();
- }
-
- ref() {
- this.#reader ??= Bun.stdin.stream().getReader();
- this.#readRef ??= setInterval(() => {}, 1 << 30);
- }
-
- unref() {
- if (this.#readRef) {
- clearInterval(this.#readRef);
- this.#readRef = null;
+ // @ts-expect-error
+ ({ done, value } = read);
}
- }
-
- async #readInternal() {
- try {
- var done, value;
- const read = this.#reader.readMany();
-
- // read same-tick if possible
- if (!read?.then) {
- ({ done, value } = read);
- } else {
- ({ done, value } = await read);
- }
-
- if (!done) {
- this.push(value[0]);
-
- // shouldn't actually happen, but just in case
- const length = value.length;
- for (let i = 1; i < length; i++) {
- this.push(value[i]);
- }
- } else {
- this.push(null);
- this.pause();
- this.#readable = false;
- this.#onFinished();
- }
- } catch (err) {
- this.#readable = false;
- this.#onFinished(err);
- }
- }
-
- _read(size) {
- if (this.#unrefOnRead) {
- this.unref();
- this.#unrefOnRead = false;
- }
- this.#readInternal();
- }
- #constructWriteStream() {
- var { createWriteStream } = require("node:fs");
- var writeStream = (this.#writeStream = createWriteStream("/dev/fd/0"));
+ if (!done) {
+ stream.push(value[0]);
- writeStream.on("finish", () => {
- if (this.#onFinish) {
- const cb = this.#onFinish;
- this.#onFinish = null;
- cb();
+ // shouldn't actually happen, but just in case
+ const length = value.length;
+ for (let i = 1; i < length; i++) {
+ stream.push(value[i]);
}
- });
-
- writeStream.on("drain", () => {
- if (this.#onDrain) {
- const cb = this.#onDrain;
- this.#onDrain = null;
- cb();
- }
- });
-
- eos(writeStream, err => {
- this.#writable = false;
- if (err) {
- destroy(writeStream, err);
- }
- this.#onFinished(err);
- });
-
- return writeStream;
- }
-
- _write(chunk, encoding, callback) {
- var writeStream = this.#writeStream;
- if (!writeStream) {
- writeStream = this.#constructWriteStream();
- }
-
- if (writeStream.write(chunk, encoding)) {
- callback();
} else {
- this.#onDrain = callback;
+ stream.push(null);
+ stream.pause();
}
+ } catch (err) {
+ stream.destroy(err);
}
+ }
- _final(callback) {
- this.#writeStream.end();
- this.#onFinish = (...args) => callback(...args);
+ stream._read = function (size) {
+ if (unrefOnRead) {
+ unref();
+ unrefOnRead = false;
}
+ internalRead(this);
};
- return new StdinStream();
+ stream.on("pause", () => {
+ process.nextTick(() => {
+ destroy(stream);
+ });
+ });
+
+ stream.on("close", () => {
+ process.nextTick(() => {
+ reader?.cancel();
+ });
+ });
+
+ return stream;
}