diff options
-rw-r--r-- | src/bun.js/fs.exports.js | 624 |
1 files changed, 621 insertions, 3 deletions
diff --git a/src/bun.js/fs.exports.js b/src/bun.js/fs.exports.js index 495250a3a..019a98d6f 100644 --- a/src/bun.js/fs.exports.js +++ b/src/bun.js/fs.exports.js @@ -107,7 +107,6 @@ export var lutimes = function lutimes(...args) { }; function callbackify(fsFunction, args) { - try { const result = fsFunction.apply(fs, args.slice(0, args.length - 1)); queueMicrotask(() => args[args.length - 1](null, result)); @@ -185,8 +184,627 @@ export var utimesSync = fs.utimesSync.bind(fs); export var lutimesSync = fs.lutimesSync.bind(fs); export var rmSync = fs.rmSync.bind(fs); -export var createReadStream = fs.createReadStream.bind(fs); -export var createWriteStream = fs.createWriteStream.bind(fs); +// Results from Object.keys() in Node 18 +// fd +// path +// flags +// mode +// start +// end +// pos +// bytesRead +// _readableState +// _events +// _eventsCount +// _maxListener +var _lazyReadStream; +var readStreamPathFastPathSymbol = Symbol.for( + "Bun.Node.readStreamPathFastPath" +); +const readStreamSymbol = Symbol.for("Bun.NodeReadStream"); +const readStreamPathOrFdSymbol = Symbol.for("Bun.NodeReadStreamPathOrFd"); +var writeStreamPathFastPathSymbol = Symbol.for("Bun.NodeWriteStreamFastPath"); +var writeStreamPathFastPathCallSymbol = Symbol.for( + "Bun.NodeWriteStreamFastPathCall" +); +var kIoDone = Symbol.for("kIoDone"); + +function getLazyReadStream() { + if (_lazyReadStream) { + return _lazyReadStream; + } + + var { Readable, eos: eos_ } = import.meta.require("node:stream"); + var defaultReadStreamOptions = { + file: undefined, + fd: undefined, + flags: "r", + encoding: undefined, + mode: 0o666, + autoClose: true, + emitClose: true, + start: 0, + end: Infinity, + highWaterMark: 64 * 1024, + fs: { + read, + open: (path, flags, mode, cb) => { + var fd; + try { + fd = openSync(path, flags, mode); + } catch (e) { + cb(e); + return; + } + + cb(null, fd); + }, + close, + }, + autoDestroy: true, + }; + + var internalReadFn; + var ReadStream = class ReadStream extends Readable { + constructor(pathOrFd, options = defaultReadStreamOptions) { + if (typeof options !== "object" || !options) { + throw new TypeError("Expected options to be an object"); + } + + var { + flags = defaultReadStreamOptions.flags, + encoding = defaultReadStreamOptions.encoding, + mode = defaultReadStreamOptions.mode, + autoClose = defaultReadStreamOptions.autoClose, + emitClose = defaultReadStreamOptions.emitClose, + start = defaultReadStreamOptions.start, + end = defaultReadStreamOptions.end, + autoDestroy = defaultReadStreamOptions.autoClose, + fs = defaultReadStreamOptions.fs, + highWaterMark = defaultReadStreamOptions.highWaterMark, + } = options; + + super({ + ...options, + encoding, + autoDestroy, + autoClose, + emitClose, + }); + + this.end = end; + this._read = this.#internalRead; + this.start = start; + this.flags = flags; + this.mode = mode; + this.emitClose = emitClose; + this.#fs = fs; + this[readStreamPathFastPathSymbol] = + start === 0 && + end === Infinity && + autoClose && + fs === defaultReadStreamOptions.fs && + // is it an encoding which we don't need to decode? + (encoding === "buffer" || + encoding === "binary" || + encoding == null || + encoding === "utf-8" || + encoding === "utf8"); + this._readableState.autoClose = autoDestroy = autoClose; + this._readableState.highWaterMark = highWaterMark; + + if (pathOrFd?.constructor?.name === "URL") { + pathOrFd = Bun.fileURLToPath(pathOrFd); + } + + if (typeof pathOrFd === "string") { + if (pathOrFd.startsWith("file://")) { + pathOrFd = Bun.fileURLToPath(pathOrFd); + } + if (pathOrFd.length === 0) { + throw new TypeError("Expected path to be a non-empty string"); + } + this.path = this.file = this[readStreamPathOrFdSymbol] = pathOrFd; + } else if (typeof pathOrFd === "number") { + pathOrFd |= 0; + if (pathOrFd < 0) { + throw new TypeError("Expected fd to be a positive integer"); + } + this.fd = this[readStreamPathOrFdSymbol] = pathOrFd; + + this.autoClose = false; + } else { + throw new TypeError("Expected a path or file descriptor"); + } + } + #fs; + file; + path; + fd = null; + flags; + mode; + start; + end; + pos; + bytesRead = 0; + #fileSize = -1; + _read; + + [readStreamSymbol] = true; + [readStreamPathOrFdSymbol]; + [readStreamPathFastPathSymbol]; + + _construct(callback) { + if (typeof this.fd === "number") { + callback(); + return; + } + // this[readStreamPathFastPathSymbol] = false; + var { path, flags, mode } = this; + + this.#fs.open(path, flags, mode, (er, fd) => { + if (er) { + callback(er); + return; + } + + this.fd = fd; + callback(); + this.emit("open", this.fd); + this.emit("ready"); + }); + } + + _destroy(err, cb) { + try { + var fd = this.fd; + this[readStreamPathFastPathSymbol] = false; + + if (!fd) { + cb(err); + } else { + this.#fs.close(fd, (er) => { + cb(er || err); + }); + this.fd = null; + } + } catch (e) { + throw e; + } + } + + close(cb) { + if (typeof cb === "function") eos_()(this, cb); + this.destroy(); + } + + #internalRead(n) { + var { pos, end, bytesRead, fd, encoding } = this; + + n = + pos !== undefined + ? Math.min(end - pos + 1, n) + : Math.min(end - bytesRead + 1, n); + + if (n <= 0) { + this.push(null); + return; + } + + if ( + this.#fileSize === -1 && + this.#fs.read === defaultReadStreamOptions.fs.read && + bytesRead === 0 && + pos === undefined + ) { + const stat = fstatSync(this.fd); + this.#fileSize = stat.size; + if (this.#fileSize > 0 && n > this.#fileSize) { + // add 1 byte so that we can detect EOF + n = this.#fileSize + 1; + } + } + + const buf = Buffer.allocUnsafeSlow(n); + + this[kIoDone] = false; + this.#fs.read(fd, buf, 0, n, pos, (er, bytesRead) => { + this[kIoDone] = true; + + // Tell ._destroy() that it's safe to close the fd now. + if (this.destroyed) { + this.emit(kIoDone, er); + return; + } + + if (er) { + this.#errorOrDestroy(er); + return; + } + + if (bytesRead > 0) { + this.#handleRead(buf, bytesRead); + } else { + this.push(null); + } + }); + } + + #errorOrDestroy(err, sync = null) { + var { + _readableState: r = { destroyed: false, autoDestroy: false }, + _writableState: w = { destroyed: false, autoDestroy: false }, + } = this; + + if (w?.destroyed || r?.destroyed) { + return this; + } + if (r?.autoDestroy || w?.autoDestroy) this.destroy(err); + else if (err) { + this.emit("error", err); + } + } + + #handleRead(buf, bytesRead) { + this.bytesRead += bytesRead; + if (this.pos !== undefined) { + this.pos += bytesRead; + } + + if (bytesRead !== buf.length) { + if (buf.length - bytesRead < 256) { + // We allow up to 256 bytes of wasted space + this.push(buf.slice(0, bytesRead)); + } else { + // Slow path. Shrink to fit. + // Copy instead of slice so that we don't retain + // large backing buffer for small reads. + const dst = Buffer.allocUnsafeSlow(bytesRead); + buf.copy(dst, 0, 0, bytesRead); + this.push(dst); + } + } else { + this.push(buf); + } + } + + pause() { + this[readStreamPathFastPathSymbol] = false; + return super.pause(); + } + + resume() { + this[readStreamPathFastPathSymbol] = false; + return super.resume(); + } + + unshift(...args) { + this[readStreamPathFastPathSymbol] = false; + return super.unshift(...args); + } + + pipe(dest, pipeOpts) { + if ( + this[readStreamPathFastPathSymbol] && + (pipeOpts?.end ?? true) && + this._readableState?.pipes?.length === 0 + ) { + if ( + writeStreamPathFastPathSymbol in dest && + dest[writeStreamPathFastPathSymbol] + ) { + if (dest[writeStreamPathFastPathCallSymbol](this, pipeOpts)) { + return this; + } + } + } + + this[readStreamPathFastPathSymbol] = false; + return super.pipe(dest, pipeOpts); + } + }; + return (_lazyReadStream = ReadStream); +} + +var internalCreateReadStream = function createReadStream(path, options) { + const ReadStream = getLazyReadStream(); + return new ReadStream(path, options); +}; +export var createReadStream = internalCreateReadStream; + +var _lazyWriteStream; + +function getLazyWriteStream() { + if (_lazyWriteStream) return _lazyWriteStream; + + const { Writable, eos } = import.meta.require("node:stream"); + + var defaultWriteStreamOptions = { + fd: null, + start: undefined, + pos: undefined, + encoding: undefined, + flags: "w", + mode: 0o666, + fs: { + write, + close, + open, + }, + }; + + var WriteStream = class WriteStream extends Writable { + constructor(path, options = defaultWriteStreamOptions) { + if (!options) { + throw new TypeError("Expected options to be an object"); + } + + var { + fs = defaultWriteStreamOptions.fs, + start = defaultWriteStreamOptions.start, + flags = defaultWriteStreamOptions.flags, + mode = defaultWriteStreamOptions.mode, + autoClose = true, + emitClose = false, + autoDestroy = autoClose, + encoding = defaultWriteStreamOptions.encoding, + fd = defaultWriteStreamOptions.fd, + pos = defaultWriteStreamOptions.pos, + } = options; + super({ ...options, decodeStrings: false, autoDestroy, emitClose }); + + if (typeof fs?.write !== "function") { + throw new TypeError("Expected fs.write to be a function"); + } + + if (typeof fs?.close !== "function") { + throw new TypeError("Expected fs.close to be a function"); + } + + if (typeof fs?.open !== "function") { + throw new TypeError("Expected fs.open to be a function"); + } + + if (typeof path === "object" && path) { + if (path instanceof URL) { + path = Bun.fileURLToPath(path); + } + } + + if (typeof path !== "string" && typeof fd !== "number") { + throw new TypeError("Expected a path or file descriptor"); + } + + if (typeof path === "string") { + if (path.length === 0) { + throw new TypeError("Expected a non-empty path"); + } + + if (path.startsWith("file:")) { + path = Bun.fileURLToPath(path); + } + + this.path = path; + this.fd = null; + this[writeStreamPathFastPathSymbol] = + autoClose && + (start === undefined || start === 0) && + fs.write === defaultWriteStreamOptions.fs.write && + fs.close === defaultWriteStreamOptions.fs.close; + } else { + this.fd = fd; + this[writeStreamPathFastPathSymbol] = false; + } + this.start = start; + this.#fs = fs; + this.flags = flags; + this.mode = mode; + + if (this.start !== undefined) { + this.pos = this.start; + } + + if (encoding !== defaultWriteStreamOptions.encoding) { + this.setDefaultEncoding(encoding); + if ( + encoding !== "buffer" && + encoding !== "utf8" && + encoding !== "utf-8" && + encoding !== "binary" + ) { + this[writeStreamPathFastPathSymbol] = false; + } + } + } + + get autoClose() { + return this._writableState.autoDestroy; + } + + set autoClose(val) { + this._writableState.autoDestroy = val; + } + + destroySoon = this.end; + + // noop, node has deprecated this + open() {} + + path; + fd; + flags; + mode; + #fs; + bytesWritten = 0; + pos; + [writeStreamPathFastPathSymbol]; + start; + + [writeStreamPathFastPathCallSymbol](readStream, pipeOpts) { + if (!this[writeStreamPathFastPathSymbol]) { + return false; + } + + if (this.fd !== null) { + this[writeStreamPathFastPathSymbol] = false; + return false; + } + + this[kIoDone] = false; + readStream[kIoDone] = false; + return Bun.write( + this[writeStreamPathFastPathSymbol], + readStream[readStreamPathOrFdSymbol] + ).then( + (bytesWritten) => { + readStream[kIoDone] = this[kIoDone] = true; + this.bytesWritten += bytesWritten; + readStream.bytesRead += bytesWritten; + this.end(); + readStream.close(); + }, + (err) => { + readStream[kIoDone] = this[kIoDone] = true; + this.#errorOrDestroy(err); + readStream.emit("error", err); + } + ); + } + + isBunFastPathEnabled() { + return this[writeStreamPathFastPathSymbol]; + } + + disableBunFastPath() { + this[writeStreamPathFastPathSymbol] = false; + } + + #handleWrite(er, bytes) { + if (er) { + return this.#errorOrDestroy(er); + } + + this.bytesWritten += bytes; + } + + #internalClose(err, cb) { + this[writeStreamPathFastPathSymbol] = false; + var fd = this.fd; + this.#fs.close(fd, (er) => { + this.fd = null; + cb(err || er); + }); + } + + _destroy(err, cb) { + if (this.fd === null) { + return cb(err); + } + + if (this[kIoDone]) { + this.once(kIoDone, () => this.#internalClose(err, cb)); + return; + } + + this.#internalClose(err, cb); + } + + [kIoDone] = false; + + close(cb) { + if (cb) { + if (this.closed) { + process.nextTick(cb); + return; + } + this.on("close", cb); + } + + // If we are not autoClosing, we should call + // destroy on 'finish'. + if (!this.autoClose) { + this.on("finish", this.destroy); + } + + // We use end() instead of destroy() because of + // https://github.com/nodejs/node/issues/2006 + this.end(); + } + #internalWrite(chunk, encoding, cb) { + this[writeStreamPathFastPathSymbol] = false; + if (typeof chunk === "string") { + chunk = Buffer.from(chunk, encoding); + } + + if (this.pos !== undefined) { + this[kIoDone] = true; + this.#fs.write( + this.fd, + chunk, + 0, + chunk.length, + this.pos, + (err, bytes) => { + ths[kIoDone] = false; + this.#handleWrite(err, bytes); + this.emit(kIoDone); + + !err ? cb() : cb(err); + } + ); + } else { + this[kIoDone] = true; + this.#fs.write(this.fd, chunk, 0, chunk.length, null, (err, bytes) => { + ths[kIoDone] = false; + this.#handleWrite(err, bytes); + this.emit(kIoDone); + !err ? cb() : cb(err); + }); + } + } + _write = this.#internalWrite; + _writev = undefined; + + get pending() { + return this.fd === null; + } + + _destroy(err, cb) { + this.close(err, cb); + } + + #errorOrDestroy(err) { + var { + _readableState: r = { destroyed: false, autoDestroy: false }, + _writableState: w = { destroyed: false, autoDestroy: false }, + } = this; + + if (w?.destroyed || r?.destroyed) { + return this; + } + if (r?.autoDestroy || w?.autoDestroy) this.destroy(err); + else if (err) { + this.emit("error", err); + } + } + }; + return (_lazyWriteStream = WriteStream); +} + +var internalCreateWriteStream = function createWriteStream(path, options) { + const WriteStream = getLazyWriteStream(); + return new WriteStream(path, options); +}; + +export var createWriteStream = internalCreateWriteStream; +Object.defineProperties(fs, { + createReadStream: { + value: internalCreateReadStream, + }, + createWriteStream: { + value: createWriteStream, + }, +}); export var promises = { access: promisify(fs.accessSync), |