diff options
Diffstat (limited to 'src/js/builtins/ProcessObjectInternals.ts')
-rw-r--r-- | src/js/builtins/ProcessObjectInternals.ts | 740 |
1 files changed, 114 insertions, 626 deletions
diff --git a/src/js/builtins/ProcessObjectInternals.ts b/src/js/builtins/ProcessObjectInternals.ts index 242b310fd..548a2d984 100644 --- a/src/js/builtins/ProcessObjectInternals.ts +++ b/src/js/builtins/ProcessObjectInternals.ts @@ -44,664 +44,152 @@ export function binding(bindingName) { ); } -export function getStdioWriteStream(fd_, getWindowSize) { - var EventEmitter = require("node:events"); +export function getStdioWriteStream(fd) { + const tty = require("node:tty"); - function createStdioWriteStream(fd_) { - var { Duplex, eos, destroy } = require("node:stream"); - var StdioWriteStream = class StdioWriteStream extends Duplex { - #writeStream; - #readStream; + const stream = tty.WriteStream(fd); - #readable = true; - #writable = true; - #fdPath; + process.on("SIGWINCH", () => { + stream._refreshSize(); + }); - #onClose; - #onDrain; - #onFinish; - #onReadable; - #isTTY; + if (fd === 1) { + stream.destroySoon = stream.destroy; + stream._destroy = function (err, cb) { + cb(err); + this._undestroy(); - get isTTY() { - return (this.#isTTY ??= require("node:tty").isatty(fd_)); - } - - get fd() { - return fd_; - } - - get writable() { - return this.#writable; - } - - get readable() { - return this.#readable; - } - - constructor(fd) { - super({ readable: true, writable: true }); - this.#fdPath = `/dev/fd/${fd}`; - } - - #onFinished(err) { - const cb = this.#onClose; - this.#onClose = null; - - if (cb) { - cb(err); - } else if (err) { - this.destroy(err); - } else if (!this.#readable && !this.#writable) { - this.destroy(); - } - } - - _destroy(err, callback) { - if (!err && this.#onClose !== null) { - var AbortError = class AbortError extends Error { - code: string; - name: string; - constructor(message = "The operation was aborted", options = void 0) { - if (options !== void 0 && typeof options !== "object") { - throw new Error(`Invalid AbortError options:\n\n${JSON.stringify(options, null, 2)}`); - } - super(message, options); - this.code = "ABORT_ERR"; - this.name = "AbortError"; - } - }; - err = new AbortError(); - } - - this.#onDrain = null; - this.#onFinish = null; - if (this.#onClose === null) { - callback(err); - } else { - this.#onClose = callback; - if (this.#writeStream) destroy(this.#writeStream, err); - if (this.#readStream) destroy(this.#readStream, err); - } - } - - _write(chunk, encoding, callback) { - if (!this.#writeStream) { - var { createWriteStream } = require("node:fs"); - var stream = (this.#writeStream = createWriteStream(this.#fdPath)); - - stream.on("finish", () => { - if (this.#onFinish) { - const cb = this.#onFinish; - this.#onFinish = null; - cb(); - } - }); - - stream.on("drain", () => { - if (this.#onDrain) { - const cb = this.#onDrain; - this.#onDrain = null; - cb(); - } - }); - - eos(stream, err => { - this.#writable = false; - if (err) { - destroy(stream, err); - } - this.#onFinished(err); - }); - } - - if (this.#writeStream.write(chunk, encoding)) { - callback(); - } else { - this.#onDrain = callback; - } - } - - _final(callback) { - this.#writeStream && this.#writeStream.end(); - this.#onFinish = callback; - } - - #loadReadStream() { - var { createReadStream } = require("node:fs"); - - var readStream = (this.#readStream = createReadStream(this.#fdPath)); - - readStream.on("readable", () => { - if (this.#onReadable) { - const cb = this.#onReadable; - this.#onReadable = null; - cb(); - } else { - this.read(); - } - }); - - readStream.on("end", () => { - this.push(null); + if (!this._writableState.emitClose) { + process.nextTick(() => { + this.emit("close"); }); - - eos(readStream, err => { - this.#readable = false; - if (err) { - destroy(readStream, err); - } - this.#onFinished(err); - }); - return readStream; - } - - _read() { - var stream = this.#readStream; - if (!stream) { - stream = this.#loadReadStream(); - } - - while (true) { - const buf = stream.read(); - if (buf === null || !this.push(buf)) { - return; - } - } } }; - return new StdioWriteStream(fd_); - } - - function isFastEncoding(encoding) { - if (!encoding) return true; - - var normalied = encoding.toLowerCase(); - return normalied === "utf8" || normalied === "utf-8" || normalied === "buffer" || normalied === "binary"; - } - - var readline; - var windowSizeArray = [0, 0]; - - var FastStdioWriteStreamInternal = class StdioWriteStream extends EventEmitter { - #fd; - #innerStream; - #writer; - #isTTY; - - bytesWritten = 0; - - setDefaultEncoding(encoding) { - if (this.#innerStream || !isFastEncoding(encoding)) { - this.#ensureInnerStream(); - return this.#innerStream.setDefaultEncoding(encoding); - } - } - - #createWriter() { - switch (this.#fd) { - case 1: { - var writer = Bun.stdout.writer({ highWaterMark: 0 }); - writer.unref(); - return writer; - } - - case 2: { - var writer = Bun.stderr.writer({ highWaterMark: 0 }); - writer.unref(); - return writer; - } - default: { - throw new Error("Unsupported writer"); - } - } - } - - #getWriter() { - return (this.#writer ??= this.#createWriter()); - } - - constructor(fd_) { - super(); - this.#fd = fd_; - } - - get fd() { - return this.#fd; - } - - ref() { - this.#getWriter().ref(); - } - - unref() { - this.#getWriter().unref(); - } - - on(event, listener) { - if (event === "close" || event === "finish") { - this.#ensureInnerStream(); - return this.#innerStream.on(event, listener); - } - - if (event === "drain") { - return super.on("drain", listener); - } - - if (event === "error") { - return super.on("error", listener); - } - - return super.on(event, listener); - } - - get _writableState() { - this.#ensureInnerStream(); - return this.#innerStream._writableState; - } - - get _readableState() { - this.#ensureInnerStream(); - return this.#innerStream._readableState; - } - - get writable() { - this.#ensureInnerStream(); - return this.#innerStream.writable; - } - - get readable() { - this.#ensureInnerStream(); - return this.#innerStream.readable; - } - - pipe(destination) { - this.#ensureInnerStream(); - return this.#innerStream.pipe(destination); - } - - unpipe(destination) { - this.#ensureInnerStream(); - return this.#innerStream.unpipe(destination); - } - - #ensureInnerStream() { - if (this.#innerStream) return; - this.#innerStream = createStdioWriteStream(this.#fd); - const events = this.eventNames(); - for (const event of events) { - this.#innerStream.on(event, (...args) => { - this.emit(event, ...args); + } else if (fd === 2) { + stream.destroySoon = stream.destroy; + stream._destroy = function (err, cb) { + cb(err); + this._undestroy(); + + if (!this._writableState.emitClose) { + process.nextTick(() => { + this.emit("close"); }); } - } - - #write1(chunk) { - var writer = this.#getWriter(); - const writeResult = writer.write(chunk); - this.bytesWritten += writeResult; - const flushResult = writer.flush(false); - return !!(writeResult || flushResult); - } - - #writeWithEncoding(chunk, encoding) { - if (!isFastEncoding(encoding)) { - this.#ensureInnerStream(); - return this.#innerStream.write(chunk, encoding); - } - - return this.#write1(chunk); - } - - #performCallback(cb, err?: any) { - if (err) { - this.emit("error", err); - } - - try { - cb(err ? err : null); - } catch (err2) { - this.emit("error", err2); - } - } - - #writeWithCallbackAndEncoding(chunk, encoding, callback) { - if (!isFastEncoding(encoding)) { - this.#ensureInnerStream(); - return this.#innerStream.write(chunk, encoding, callback); - } - - var writer = this.#getWriter(); - const writeResult = writer.write(chunk); - const flushResult = writer.flush(true); - if (flushResult?.then) { - flushResult.then( - () => { - this.#performCallback(callback); - this.emit("drain"); - }, - err => this.#performCallback(callback, err), - ); - return false; - } - - queueMicrotask(() => { - this.#performCallback(callback); - }); - - return !!(writeResult || flushResult); - } - - get isTTY() { - return false; - } - - write(chunk, encoding, callback) { - const result = this._write(chunk, encoding, callback); - - if (result) { - this.emit("drain"); - } - - return result; - } - - get hasColors() { - return Bun.tty[this.#fd].hasColors; - } - - _write(chunk, encoding, callback) { - var inner = this.#innerStream; - if (inner) { - return inner.write(chunk, encoding, callback); - } - - switch (arguments.length) { - case 0: { - var error = new Error("Invalid arguments"); - error.code = "ERR_INVALID_ARG_TYPE"; - throw error; - } - case 1: { - return this.#write1(chunk); - } - case 2: { - if (typeof encoding === "function") { - return this.#writeWithCallbackAndEncoding(chunk, "", encoding); - } else if (typeof encoding === "string") { - return this.#writeWithEncoding(chunk, encoding); - } - } - default: { - if ( - (typeof encoding !== "undefined" && typeof encoding !== "string") || - (typeof callback !== "undefined" && typeof callback !== "function") - ) { - var error = new Error("Invalid arguments"); - error.code = "ERR_INVALID_ARG_TYPE"; - throw error; - } - - if (typeof callback === "undefined") { - return this.#writeWithEncoding(chunk, encoding); - } - - return this.#writeWithCallbackAndEncoding(chunk, encoding, callback); - } - } - } - - destroy() { - return this; - } - - end() { - return this; - } - }; - if (getWindowSize(fd_, windowSizeArray)) { - var WriteStream = class WriteStream extends FastStdioWriteStreamInternal { - get isTTY() { - return true; - } - - cursorTo(x, y, callback) { - return (readline ??= require("node:readline")).cursorTo(this, x, y, callback); - } - - moveCursor(dx, dy, callback) { - return (readline ??= require("node:readline")).moveCursor(this, dx, dy, callback); - } - - clearLine(dir, callback) { - return (readline ??= require("node:readline")).clearLine(this, dir, callback); - } - - clearScreenDown(callback) { - return (readline ??= require("node:readline")).clearScreenDown(this, callback); - } - - getWindowSize() { - if (getWindowSize(fd_, windowSizeArray) === true) { - return [windowSizeArray[0], windowSizeArray[1]]; - } - } - - get columns() { - if (getWindowSize(fd_, windowSizeArray) === true) { - return windowSizeArray[0]; - } - } - - get rows() { - if (getWindowSize(fd_, windowSizeArray) === true) { - return windowSizeArray[1]; - } - } }; - - return new WriteStream(fd_); } - return new FastStdioWriteStreamInternal(fd_); -} - -export function getStdinStream(fd_) { - var { Duplex, eos, destroy } = require("node:stream"); + stream._type = "tty"; + stream._isStdio = true; + stream.fd = fd; - var StdinStream = class StdinStream extends Duplex { - #reader; - // TODO: investigate https://github.com/oven-sh/bun/issues/1607 - - #readRef; - #writeStream; - - #readable = true; - #unrefOnRead = false; - #writable = true; + return stream; +} - #onFinish; - #onClose; - #onDrain; +export function getStdinStream(fd) { + var { destroy } = require("node:stream"); + + var reader: ReadableStreamDefaultReader | undefined; + var readerRef; + var unrefOnRead = false; + function ref() { + reader ??= Bun.stdin.stream().getReader(); + // TODO: remove this. likely we are dereferencing the stream + // when there is still more data to be read. + readerRef ??= setInterval(() => {}, 1 << 30); + } - get isTTY() { - return require("node:tty").isatty(fd_); + function unref() { + if (readerRef) { + clearInterval(readerRef); + readerRef = undefined; } + } - get fd() { - return fd_; - } + const tty = require("node:tty"); + + const stream = new tty.ReadStream(fd); + + const originalOn = stream.on; + stream.on = function (event, listener) { + // Streams don't generally required to present any data when only + // `readable` events are present, i.e. `readableFlowing === false` + // + // However, Node.js has a this quirk whereby `process.stdin.read()` + // blocks under TTY mode, thus looping `.read()` in this particular + // case would not result in truncation. + // + // Therefore the following hack is only specific to `process.stdin` + // and does not apply to the underlying Stream implementation. + if (event === "readable") { + ref(); + unrefOnRead = true; + } + return originalOn.call(this, event, listener); + }; - constructor() { - super({ readable: true, writable: true }); - } + stream.fd = fd; - #onFinished(err?) { - const cb = this.#onClose; - this.#onClose = null; + const originalPause = stream.pause; + stream.pause = function () { + unref(); + return originalPause.call(this); + }; - if (cb) { - cb(err); - } else if (err) { - this.destroy(err); - } else if (!this.#readable && !this.#writable) { - this.destroy(); - } - } + const originalResume = stream.resume; + stream.resume = function () { + ref(); + return originalResume.call(this); + }; - _destroy(err, callback) { - if (!err && this.#onClose !== null) { - var AbortError = class AbortError extends Error { - constructor(message = "The operation was aborted", options = void 0) { - if (options !== void 0 && typeof options !== "object") { - throw new Error(`Invalid AbortError options:\n\n${JSON.stringify(options, null, 2)}`); - } - super(message, options); - this.code = "ABORT_ERR"; - this.name = "AbortError"; - } - }; - err = new AbortError(); - } + async function internalRead(stream) { + try { + var done: any, value: any; + const read = reader?.readMany(); - if (this.#onClose === null) { - callback(err); + if ($isPromise(read)) { + ({ done, value } = await read); } else { - this.#onClose = callback; - if (this.#writeStream) destroy(this.#writeStream, err); - } - } - - setRawMode(mode) {} - - on(name, callback) { - // Streams don't generally required to present any data when only - // `readable` events are present, i.e. `readableFlowing === false` - // - // However, Node.js has a this quirk whereby `process.stdin.read()` - // blocks under TTY mode, thus looping `.read()` in this particular - // case would not result in truncation. - // - // Therefore the following hack is only specific to `process.stdin` - // and does not apply to the underlying Stream implementation. - if (name === "readable") { - this.ref(); - this.#unrefOnRead = true; - } - return super.on(name, callback); - } - - pause() { - this.unref(); - return super.pause(); - } - - resume() { - this.ref(); - return super.resume(); - } - - ref() { - this.#reader ??= Bun.stdin.stream().getReader(); - this.#readRef ??= setInterval(() => {}, 1 << 30); - } - - unref() { - if (this.#readRef) { - clearInterval(this.#readRef); - this.#readRef = null; + // @ts-expect-error + ({ done, value } = read); } - } - - async #readInternal() { - try { - var done, value; - const read = this.#reader.readMany(); - - // read same-tick if possible - if (!read?.then) { - ({ done, value } = read); - } else { - ({ done, value } = await read); - } - - if (!done) { - this.push(value[0]); - - // shouldn't actually happen, but just in case - const length = value.length; - for (let i = 1; i < length; i++) { - this.push(value[i]); - } - } else { - this.push(null); - this.pause(); - this.#readable = false; - this.#onFinished(); - } - } catch (err) { - this.#readable = false; - this.#onFinished(err); - } - } - - _read(size) { - if (this.#unrefOnRead) { - this.unref(); - this.#unrefOnRead = false; - } - this.#readInternal(); - } - #constructWriteStream() { - var { createWriteStream } = require("node:fs"); - var writeStream = (this.#writeStream = createWriteStream("/dev/fd/0")); + if (!done) { + stream.push(value[0]); - writeStream.on("finish", () => { - if (this.#onFinish) { - const cb = this.#onFinish; - this.#onFinish = null; - cb(); + // shouldn't actually happen, but just in case + const length = value.length; + for (let i = 1; i < length; i++) { + stream.push(value[i]); } - }); - - writeStream.on("drain", () => { - if (this.#onDrain) { - const cb = this.#onDrain; - this.#onDrain = null; - cb(); - } - }); - - eos(writeStream, err => { - this.#writable = false; - if (err) { - destroy(writeStream, err); - } - this.#onFinished(err); - }); - - return writeStream; - } - - _write(chunk, encoding, callback) { - var writeStream = this.#writeStream; - if (!writeStream) { - writeStream = this.#constructWriteStream(); - } - - if (writeStream.write(chunk, encoding)) { - callback(); } else { - this.#onDrain = callback; + stream.push(null); + stream.pause(); } + } catch (err) { + stream.destroy(err); } + } - _final(callback) { - this.#writeStream.end(); - this.#onFinish = (...args) => callback(...args); + stream._read = function (size) { + if (unrefOnRead) { + unref(); + unrefOnRead = false; } + internalRead(this); }; - return new StdinStream(); + stream.on("pause", () => { + process.nextTick(() => { + destroy(stream); + }); + }); + + stream.on("close", () => { + process.nextTick(() => { + reader?.cancel(); + }); + }); + + return stream; } |