diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/bun.js/api/bun/subprocess.zig | 55 | ||||
-rw-r--r-- | src/bun.js/base.zig | 8 | ||||
-rw-r--r-- | src/bun.js/child_process.exports.js | 14 | ||||
-rw-r--r-- | src/bun.js/node/node_fs.zig | 7 | ||||
-rw-r--r-- | src/bun.js/node/node_fs_binding.zig | 8 | ||||
-rw-r--r-- | src/bun.js/node/syscall.zig | 65 | ||||
-rw-r--r-- | src/bun.js/streams.exports.js | 72 | ||||
-rw-r--r-- | src/bun.js/webcore/streams.zig | 285 | ||||
-rw-r--r-- | src/cli/test_command.zig | 6 | ||||
-rw-r--r-- | src/global.zig | 5 | ||||
-rw-r--r-- | src/linux_c.zig | 2 |
11 files changed, 348 insertions, 179 deletions
diff --git a/src/bun.js/api/bun/subprocess.zig b/src/bun.js/api/bun/subprocess.zig index 2d8127901..2b30449ac 100644 --- a/src/bun.js/api/bun/subprocess.zig +++ b/src/bun.js/api/bun/subprocess.zig @@ -108,11 +108,12 @@ pub const Subprocess = struct { } return this.stream.toJS(); } - + const is_fifo = this.buffer.is_fifo; const stream = this.buffer.toReadableStream(globalThis, exited); this.* = .{ .stream = stream }; if (this.stream.ptr == .File) { this.stream.ptr.File.signal = JSC.WebCore.Signal.init(readable); + this.stream.ptr.File.is_fifo = is_fifo; } return stream.value; } @@ -127,6 +128,7 @@ pub const Subprocess = struct { .pipe = .{ .buffer = BufferedOutput{ .fd = fd, + .is_fifo = true, }, }, }; @@ -485,10 +487,21 @@ pub const Subprocess = struct { received_eof: bool = false, pending_error: ?JSC.Node.Syscall.Error = null, poll_ref: ?*JSC.FilePoll = null, + is_fifo: bool = false, pub usingnamespace JSC.WebCore.NewReadyWatcher(BufferedOutput, .readable, ready); - pub fn ready(this: *BufferedOutput, _: i64) void { + pub fn ready(this: *BufferedOutput, available_to_read: i64) void { + if (comptime Environment.isMac) { + if (this.poll_ref) |poll| { + if (available_to_read > 0) { + poll.flags.insert(.readable); + } else { + poll.flags.remove(.readable); + } + } + } + // TODO: what happens if the task was already enqueued after unwatch()? this.readAll(false); } @@ -516,7 +529,6 @@ pub const Subprocess = struct { // and we don't want this to become an event loop ticking point if (!this.canRead()) { this.watch(this.fd); - this.poll_ref.?.flags.insert(.fifo); return; } } @@ -543,9 +555,7 @@ pub const Subprocess = struct { this.autoCloseFileDescriptor(); return; } else if (!is_readable) { - if (comptime !force) { - return; - } + return; } } @@ -563,7 +573,7 @@ pub const Subprocess = struct { switch (JSC.Node.Syscall.read(this.fd, buf)) { .err => |e| { if (e.isRetry()) { - if (!this.isWatching()) + if (!this.isWatching() and this.isFIFO()) this.watch(this.fd); this.poll_ref.?.flags.insert(.fifo); return; @@ -608,13 +618,15 @@ pub const Subprocess = struct { if (buf[bytes_read..].len > 0 or !this.canRead()) { if (!this.isWatching()) this.watch(this.fd); - this.poll_ref.?.flags.insert(.fifo); - this.received_eof = true; + if (this.is_fifo) + this.poll_ref.?.flags.insert(.fifo) + else + this.received_eof = true; return; } } else { // we consider a short read as being EOF - this.received_eof = this.received_eof or bytes_read < buf.len; + this.received_eof = !this.is_fifo and this.received_eof or bytes_read < buf.len; if (this.received_eof) { if (this.closeOnEOF()) { this.autoCloseFileDescriptor(); @@ -642,6 +654,9 @@ pub const Subprocess = struct { if (exited) { // exited + received EOF => no more read() if (this.received_eof) { + var poll_ref = this.poll_ref; + this.poll_ref = null; + this.autoCloseFileDescriptor(); // also no data at all @@ -654,9 +669,6 @@ pub const Subprocess = struct { ).?; } - var poll_ref = this.poll_ref; - this.poll_ref = null; - return JSC.WebCore.ReadableStream.fromJS( JSC.WebCore.ReadableStream.fromBlobWithPoll( globalThis, @@ -900,7 +912,7 @@ pub const Subprocess = struct { stdio[1] = .{ .pipe = null }; stdio[2] = .{ .pipe = null }; } - + var lazy = false; var on_exit_callback = JSValue.zero; var PATH = jsc_vm.bundler.env.get("PATH") orelse ""; var argv: std.ArrayListUnmanaged(?[*:0]const u8) = undefined; @@ -916,6 +928,9 @@ pub const Subprocess = struct { if (args_type.isArray()) { cmd_value = args; args = secondaryArgsValue orelse JSValue.zero; + } else if (!args.isObject()) { + globalThis.throwInvalidArguments("cmd must be an array", .{}); + return .zero; } else if (args.get(globalThis, "cmd")) |cmd_value_| { cmd_value = cmd_value_; } else { @@ -1054,6 +1069,14 @@ pub const Subprocess = struct { return .zero; } } + + if (comptime !is_sync) { + if (args.get(globalThis, "lazy")) |lazy_val| { + if (lazy_val.isBoolean()) { + lazy = lazy_val.toBoolean(); + } + } + } } } @@ -1229,7 +1252,7 @@ pub const Subprocess = struct { if (subprocess.stdout.pipe.buffer.canRead()) { subprocess.stdout.pipe.buffer.readAll(true); } - } else { + } else if (!lazy) { subprocess.stdout.pipe.buffer.readIfPossible(false); } } @@ -1239,7 +1262,7 @@ pub const Subprocess = struct { if (subprocess.stderr.pipe.buffer.canRead()) { subprocess.stderr.pipe.buffer.readAll(true); } - } else { + } else if (!lazy) { subprocess.stderr.pipe.buffer.readIfPossible(false); } } diff --git a/src/bun.js/base.zig b/src/bun.js/base.zig index deeab246a..3d1233589 100644 --- a/src/bun.js/base.zig +++ b/src/bun.js/base.zig @@ -4199,6 +4199,14 @@ pub const FilePoll = struct { return this.flags.contains(.has_incremented_poll_count); } + pub inline fn isWatching(this: *const FilePoll) bool { + return !this.flags.contains(.needs_rearm) and (this.flags.contains(.poll_readable) or this.flags.contains(.poll_writable) or this.flags.contains(.poll_process)); + } + + pub inline fn isKeepingProcessAlive(this: *const FilePoll) bool { + return !this.flags.contains(.disable) and this.isActive(); + } + /// Make calling ref() on this poll into a no-op. pub fn disableKeepingProcessAlive(this: *FilePoll, vm: *JSC.VirtualMachine) void { if (this.flags.contains(.disable)) diff --git a/src/bun.js/child_process.exports.js b/src/bun.js/child_process.exports.js index bc401cc37..a9a1589dd 100644 --- a/src/bun.js/child_process.exports.js +++ b/src/bun.js/child_process.exports.js @@ -78,6 +78,17 @@ const debug = process.env.DEBUG ? console.log : () => {}; // OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE // USE OR OTHER DEALINGS IN THE SOFTWARE. +function spawnTimeoutFunction(child, timeoutHolder) { + var timeoutId = timeoutHolder.timeoutId; + if (timeoutId > -1) { + try { + child.kill(killSignal); + } catch (err) { + child.emit("error", err); + } + timeoutHolder.timeoutId = -1; + } +} /** * Spawns a new process using the given `file`. * @param {string} file @@ -537,7 +548,7 @@ export function spawnSync(file, args, options) { if (!success) { result.error = new SystemError( - result.stderr, + result.output[2], options.file, "spawnSync", -1, @@ -1030,6 +1041,7 @@ export class ChildProcess extends EventEmitter { cwd: options.cwd || undefined, env: options.envPairs || undefined, onExit: this.#handleOnExit.bind(this), + lazy: true, }); this.#handleExited = this.#handle.exited; this.#encoding = options.encoding || undefined; diff --git a/src/bun.js/node/node_fs.zig b/src/bun.js/node/node_fs.zig index ff9f1ba1f..b5ba2b983 100644 --- a/src/bun.js/node/node_fs.zig +++ b/src/bun.js/node/node_fs.zig @@ -3639,11 +3639,8 @@ pub const NodeFS = struct { pub fn rmdir(this: *NodeFS, args: Arguments.RmDir, comptime flavor: Flavor) Maybe(Return.Rmdir) { switch (comptime flavor) { .sync => { - var dir = args.old_path.sliceZ(&this.sync_error_buf); - switch (Syscall.getErrno(system.rmdir(dir))) { - .SUCCESS => return Maybe(Return.Rmdir).success, - else => |err| return Maybe(Return.Rmdir).errnoSys(err, .rmdir), - } + return Maybe(Return.Rmdir).errnoSysP(system.rmdir(args.path.sliceZ(&this.sync_error_buf)), .rmdir, args.path.slice()) orelse + Maybe(Return.Rmdir).success; }, else => {}, } diff --git a/src/bun.js/node/node_fs_binding.zig b/src/bun.js/node/node_fs_binding.zig index d4d97e9a9..a76c83637 100644 --- a/src/bun.js/node/node_fs_binding.zig +++ b/src/bun.js/node/node_fs_binding.zig @@ -243,6 +243,10 @@ pub const NodeFSBindings = JSC.NewClass( .name = "rm", .rfn = call(.rm), }, + .rmdir = .{ + .name = "rmdir", + .rfn = call(.rmdir), + }, .realpath = .{ .name = "realpath", .rfn = call(.realpath), @@ -426,6 +430,10 @@ pub const NodeFSBindings = JSC.NewClass( .name = "rmSync", .rfn = callSync(.rm), }, + .rmdirSync = .{ + .name = "rmdirSync", + .rfn = callSync(.rmdir), + }, }, .{}, ); diff --git a/src/bun.js/node/syscall.zig b/src/bun.js/node/syscall.zig index 7ad927c3a..14cee4e36 100644 --- a/src/bun.js/node/syscall.zig +++ b/src/bun.js/node/syscall.zig @@ -203,6 +203,7 @@ pub fn open(file_path: [:0]const u8, flags: JSC.Node.Mode, perm: JSC.Node.Mode) // The zig standard library marks BADF as unreachable // That error is not unreachable for us pub fn close(fd: std.os.fd_t) ?Syscall.Error { + log("close({d})", .{fd}); if (comptime Environment.isMac) { // This avoids the EINTR problem. return switch (system.getErrno(system.@"close$NOCANCEL"(fd))) { @@ -682,3 +683,67 @@ pub const Error = struct { return this.toSystemError().toErrorInstance(ptr); } }; + +pub fn setPipeCapacityOnLinux(fd: JSC.Node.FileDescriptor, capacity: usize) Maybe(usize) { + if (comptime !Environment.isLinux) @compileError("Linux-only"); + std.debug.assert(capacity > 0); + + // In Linux versions before 2.6.11, the capacity of a + // pipe was the same as the system page size (e.g., 4096 + // bytes on i386). Since Linux 2.6.11, the pipe + // capacity is 16 pages (i.e., 65,536 bytes in a system + // with a page size of 4096 bytes). Since Linux 2.6.35, + // the default pipe capacity is 16 pages, but the + // capacity can be queried and set using the + // fcntl(2) F_GETPIPE_SZ and F_SETPIPE_SZ operations. + // See fcntl(2) for more information. + //:# define F_SETPIPE_SZ 1031 /* Set pipe page size array. + const F_SETPIPE_SZ = 1031; + const F_GETPIPE_SZ = 1032; + + // We don't use glibc here + // It didn't work. Always returned 0. + const pipe_len = std.os.linux.fcntl(fd, F_GETPIPE_SZ, 0); + if (Maybe(usize).errno(pipe_len)) |err| return err; + if (pipe_len == 0) return Maybe(usize){ .result = 0 }; + if (pipe_len >= capacity) return Maybe(usize){ .result = pipe_len }; + + const new_pipe_len = std.os.linux.fcntl(fd, F_SETPIPE_SZ, capacity); + if (Maybe(usize).errno(new_pipe_len)) |err| return err; + return Maybe(usize){ .result = new_pipe_len }; +} + +pub fn getMaxPipeSizeOnLinux() usize { + return @intCast( + usize, + bun.once(struct { + fn once() c_int { + const strings = bun.strings; + const default_out_size = 512 * 1024; + const pipe_max_size_fd = switch (JSC.Node.Syscall.open("/proc/sys/fs/pipe-max-size", std.os.O.RDONLY, 0)) { + .result => |fd2| fd2, + .err => |err| { + log("Failed to open /proc/sys/fs/pipe-max-size: {d}\n", .{err.errno}); + return default_out_size; + }, + }; + defer _ = JSC.Node.Syscall.close(pipe_max_size_fd); + var max_pipe_size_buf: [128]u8 = undefined; + const max_pipe_size = switch (JSC.Node.Syscall.read(pipe_max_size_fd, max_pipe_size_buf[0..])) { + .result => |bytes_read| std.fmt.parseInt(i64, strings.trim(max_pipe_size_buf[0..bytes_read], "\n"), 10) catch |err| { + log("Failed to parse /proc/sys/fs/pipe-max-size: {any}\n", .{@errorName(err)}); + return default_out_size; + }, + .err => |err| { + log("Failed to read /proc/sys/fs/pipe-max-size: {d}\n", .{err.errno}); + return default_out_size; + }, + }; + + // we set the absolute max to 8 MB because honestly that's a huge pipe + // my current linux machine only goes up to 1 MB, so that's very unlikely to be hit + return @minimum(@truncate(c_int, max_pipe_size -| 32), 1024 * 1024 * 8); + } + }.once, c_int), + ); +} diff --git a/src/bun.js/streams.exports.js b/src/bun.js/streams.exports.js index 15d32adb5..4e812437e 100644 --- a/src/bun.js/streams.exports.js +++ b/src/bun.js/streams.exports.js @@ -5679,7 +5679,7 @@ var require_ours = __commonJS({ * */ function createNativeStream(nativeType, Readable) { - var [pull, start, cancel, setClose, deinit, updateRef] = + var [pull, start, cancel, setClose, deinit, updateRef, drainFn] = globalThis[Symbol.for("Bun.lazy")](nativeType); var closer = [false]; @@ -5687,7 +5687,7 @@ function createNativeStream(nativeType, Readable) { if (result > 0) { const slice = view.subarray(0, result); const remainder = view.subarray(result); - if (remainder.byteLength > 0) { + if (slice.byteLength > 0) { nativeReadable.push(slice); } @@ -5701,6 +5701,8 @@ function createNativeStream(nativeType, Readable) { if (isClosed) { nativeReadable.push(null); } + + return view; }; var handleArrayBufferViewResult = function ( @@ -5720,23 +5722,9 @@ function createNativeStream(nativeType, Readable) { return view; }; - var handleResult = function (nativeReadable, result, view, isClosed) { - if (typeof result === "number") { - return handleNumberResult(nativeReadable, result, view, isClosed); - } else if (typeof result === "boolean") { - nativeReadable.push(null); - return view?.byteLength ?? 0 > 0 ? view : undefined; - } else if (ArrayBuffer.isView(result)) { - return handleArrayBufferViewResult( - nativeReadable, - result, - view, - isClosed, - ); - } else { - throw new Error("Invalid result from pull"); - } - }; + var DYNAMICALLY_ADJUST_CHUNK_SIZE = + process.env.BUN_DISABLE_DYNAMIC_CHUNK_SIZE !== "1"; + var NativeReadable = class NativeReadable extends Readable { #ptr; #refCount = 1; @@ -5744,6 +5732,7 @@ function createNativeStream(nativeType, Readable) { #remainingChunk = undefined; #highWaterMark; #pendingRead = false; + #hasResized = !DYNAMICALLY_ADJUST_CHUNK_SIZE; constructor(ptr, options = {}) { super(options); if (typeof options.highWaterMark === "number") { @@ -5767,13 +5756,24 @@ function createNativeStream(nativeType, Readable) { } if (!this.#constructed) { - this.#constructed = true; - start(ptr, this.#highWaterMark); + this.#internalConstruct(ptr); } return this.#internalRead(this.#getRemainingChunk(), ptr); } + #internalConstruct(ptr) { + this.#constructed = true; + start(ptr, this.#highWaterMark); + + if (drainFn) { + const drainResult = drainFn(ptr); + if ((drainResult?.byteLength ?? 0) > 0) { + this.push(drainResult); + } + } + } + #getRemainingChunk() { var chunk = this.#remainingChunk; var highWaterMark = this.#highWaterMark; @@ -5784,23 +5784,43 @@ function createNativeStream(nativeType, Readable) { return chunk; } + #handleResult(result, view, isClosed) { + if (typeof result === "number") { + if (result >= this.#highWaterMark && !this.#hasResized) { + this.#highWaterMark *= 2; + this.#hasResized = true; + } + + return handleNumberResult(this, result, view, isClosed); + } else if (typeof result === "boolean") { + this.push(null); + return view?.byteLength ?? 0 > 0 ? view : undefined; + } else if (ArrayBuffer.isView(result)) { + if (result.byteLength >= this.#highWaterMark && !this.#hasResized) { + this.#highWaterMark *= 2; + this.#hasResized = true; + } + + return handleArrayBufferViewResult(this, result, view, isClosed); + } else { + throw new Error("Invalid result from pull"); + } + } + #internalRead(view, ptr) { closer[0] = false; var result = pull(ptr, view, closer); if (isPromise(result)) { this.#pendingRead = true; - var originalFlowing = this._readableState.flowing; - this._readableState.flowing = false; return result.then( (result) => { - this._readableState.flowing = originalFlowing; this.#pendingRead = false; - this.#remainingChunk = handleResult(this, result, view, closer[0]); + this.#remainingChunk = this.#handleResult(result, view, closer[0]); }, (reason) => errorOrDestroy(this, reason), ); } else { - this.#remainingChunk = handleResult(this, result, view, closer[0]); + this.#remainingChunk = this.#handleResult(result, view, closer[0]); } } diff --git a/src/bun.js/webcore/streams.zig b/src/bun.js/webcore/streams.zig index 7ec20b046..c9a3fb226 100644 --- a/src/bun.js/webcore/streams.zig +++ b/src/bun.js/webcore/streams.zig @@ -1117,26 +1117,16 @@ pub const FileSink = struct { } fn adjustPipeLengthOnLinux(this: *FileSink, fd: JSC.Node.FileDescriptor, remain_len: usize) void { - const F_SETPIPE_SZ = 1031; - const F_GETPIPE_SZ = 1032; - // On Linux, we can adjust the pipe size to avoid blocking. this.has_adjusted_pipe_size_on_linux = true; - var pipe_len: c_int = 0; - _ = std.c.fcntl(fd, F_GETPIPE_SZ, &pipe_len); - if (pipe_len < 0) return; - - // If we have a valid pipe_len, then pessimistically set it to that. - this.max_write_size = @intCast(usize, pipe_len); - if (pipe_len < remain_len) { - // If our real pipe length is less than the amount of data we have left to write, - // let's figure out what the maximum pipe size is and grow it to that. - var out_size = getMaxPipeSizeOnLinux(); - _ = std.c.fcntl(fd, F_SETPIPE_SZ, &out_size); - if (out_size > 0) { - this.max_write_size = @intCast(usize, out_size); - } + switch (JSC.Node.Syscall.setPipeCapacityOnLinux(fd, @minimum(Syscall.getMaxPipeSizeOnLinux(), remain_len))) { + .result => |len| { + if (len > 0) { + this.max_write_size = len; + } + }, + else => {}, } } @@ -1184,12 +1174,10 @@ pub const FileSink = struct { break :brk this.max_write_size; } else remain.len; - while (remain.len > 0) { const write_buf = remain[0..@minimum(remain.len, max_to_write)]; - - log("Write {d} bytes (fd: {d}, head: {d}, {d}/{d})", .{ write_buf.len, fd, this.head, remain.len, total }); const res = JSC.Node.Syscall.write(fd, write_buf); + if (res == .err) { const retry = std.os.E.AGAIN; @@ -1298,13 +1286,14 @@ pub const FileSink = struct { } fn cleanup(this: *FileSink) void { + if (this.poll_ref) |poll| { + this.poll_ref = null; + poll.deinit(); + } + if (this.fd != JSC.Node.invalid_fd) { if (this.scheduled_count > 0) { this.scheduled_count = 0; - if (this.poll_ref) |poll| { - this.poll_ref = null; - poll.deinit(); - } } _ = JSC.Node.Syscall.close(this.fd); @@ -1448,7 +1437,7 @@ pub const FileSink = struct { fn isPending(this: *const FileSink) bool { var poll_ref = this.poll_ref orelse return false; - return poll_ref.isRegistered(); + return poll_ref.isRegistered() and !poll_ref.flags.contains(.needs_rearm); } pub fn end(this: *FileSink, err: ?Syscall.Error) JSC.Node.Maybe(void) { @@ -2612,6 +2601,7 @@ pub fn ReadableStreamSource( comptime onCancel: fn (this: *Context) void, comptime deinit: fn (this: *Context) void, comptime setRefUnrefFn: ?fn (this: *Context, enable: bool) void, + comptime drainInternalBuffer: ?fn (this: *Context) bun.ByteList, ) type { return struct { context: Context, @@ -2699,6 +2689,14 @@ pub fn ReadableStreamSource( return null; } + pub fn drain(this: *This) bun.ByteList { + if (drainInternalBuffer) |drain_fn| { + return drain_fn(&this.context); + } + + return .{}; + } + pub fn toJS(this: *ReadableStreamSourceType, globalThis: *JSGlobalObject) JSC.JSValue { return ReadableStream.fromNative(globalThis, Context.tag, this); } @@ -2783,6 +2781,15 @@ pub fn ReadableStreamSource( return JSValue.jsUndefined(); } + pub fn drain(globalThis: *JSGlobalObject, callFrame: *JSC.CallFrame) callconv(.C) JSC.JSValue { + var this = callFrame.argument(0).asPtr(ReadableStreamSourceType); + var list = this.drain(); + if (list.len > 0) { + return JSC.ArrayBuffer.fromBytes(list.slice(), .Uint8Array).toJS(globalThis, null); + } + return JSValue.jsUndefined(); + } + pub fn load(globalThis: *JSGlobalObject) callconv(.C) JSC.JSValue { if (comptime JSC.is_bindgen) unreachable; // This is used also in Node.js streams @@ -2796,6 +2803,10 @@ pub fn ReadableStreamSource( JSC.NewFunction(globalThis, null, 2, JSReadableStreamSource.updateRef, true) else JSC.JSValue.jsNull(), + if (drainInternalBuffer != null) + JSC.NewFunction(globalThis, null, 1, JSReadableStreamSource.drain, true) + else + JSC.JSValue.jsNull(), }); } @@ -2889,7 +2900,16 @@ pub const ByteBlobLoader = struct { bun.default_allocator.destroy(this); } - pub const Source = ReadableStreamSource(@This(), "ByteBlob", onStart, onPull, onCancel, deinit, null); + pub const Source = ReadableStreamSource( + @This(), + "ByteBlob", + onStart, + onPull, + onCancel, + deinit, + null, + null, + ); }; pub const PipeFunction = fn (ctx: *anyopaque, stream: StreamResult, allocator: std.mem.Allocator) void; @@ -3169,7 +3189,16 @@ pub const ByteStream = struct { bun.default_allocator.destroy(this.parent()); } - pub const Source = ReadableStreamSource(@This(), "ByteStream", onStart, onPull, onCancel, deinit, null); + pub const Source = ReadableStreamSource( + @This(), + "ByteStream", + onStart, + onPull, + onCancel, + deinit, + null, + null, + ); }; /// **Not** the Web "FileReader" API @@ -3199,6 +3228,7 @@ pub const FileReader = struct { stored_global_this_: ?*JSC.JSGlobalObject = null, poll_ref: ?*JSC.FilePoll = null, has_adjusted_pipe_size_on_linux: bool = false, + is_fifo: bool = false, finished: bool = false, /// When we have some way of knowing that EOF truly is the write end of the @@ -3440,9 +3470,11 @@ pub const FileReader = struct { }, }; - if (this.poll_ref) |poll| { + if (this.poll_ref != null or this.is_fifo) { file.seekable = false; - std.debug.assert(poll.fd == @intCast(@TypeOf(poll.fd), fd)); + this.is_fifo = true; + if (this.poll_ref) |poll| + std.debug.assert(poll.fd == @intCast(@TypeOf(poll.fd), fd)); } else { if (!auto_close) { // ensure we have non-blocking IO set @@ -3500,6 +3532,7 @@ pub const FileReader = struct { file.seekable = std.os.S.ISREG(stat.mode); file.mode = @intCast(JSC.Node.Mode, stat.mode); this.mode = file.mode; + this.is_fifo = std.os.S.ISFIFO(stat.mode); if (file.seekable orelse false) file.max_size = @intCast(Blob.SizeType, stat.size); @@ -3541,13 +3574,6 @@ pub const FileReader = struct { const chunk_size = this.calculateChunkSize(std.math.maxInt(usize)); std.debug.assert(this.started); - if (this.buffered_data.len > 0) { - const data = this.buffered_data; - this.buffered_data.len = 0; - this.buffered_data.cap = 0; - return .{ .owned = data }; - } - switch (chunk_size) { 0 => { std.debug.assert(this.store.data.file.seekable orelse false); @@ -3555,16 +3581,18 @@ pub const FileReader = struct { return .{ .done = {} }; }, run_on_different_thread_size...std.math.maxInt(@TypeOf(chunk_size)) => { - this.view.set(this.globalThis(), view); - // should never be reached - this.pending.result = .{ - .err = Syscall.Error.todo, - }; - this.buf = buffer; + if (!this.isFIFO()) { + this.view.set(this.globalThis(), view); + // should never be reached + this.pending.result = .{ + .err = Syscall.Error.todo, + }; + this.buf = buffer; - this.scheduleAsync(@truncate(Blob.SizeType, chunk_size)); + this.scheduleAsync(@truncate(Blob.SizeType, chunk_size)); - return .{ .pending = &this.pending }; + return .{ .pending = &this.pending }; + } }, else => {}, } @@ -3632,56 +3660,41 @@ pub const FileReader = struct { var buf_to_use = read_buf; var free_buffer_on_error: bool = false; + var pipe_is_empty_on_linux = bun.VoidUnless(bool, Environment.isLinux, false); + var len: c_int = available_to_read orelse 0; // if it's a pipe, we really don't know what to expect what the max size will be // if the pipe is sending us WAY bigger data than what we can fit in the buffer // we allocate a new buffer of up to 4 MB if (this.isFIFO() and view != .zero) { outer: { - var len: c_int = available_to_read orelse 0; // macOS FIONREAD doesn't seem to work here // but we can get this information from the kqueue callback so we don't need to if (comptime Environment.isLinux) { if (len == 0) { - const FIONREAD = if (Environment.isLinux) std.os.linux.T.FIONREAD else bun.C.FIONREAD; - const rc: c_int = std.c.ioctl(fd, FIONREAD, &len); + const rc: c_int = std.c.ioctl(fd, std.os.linux.T.FIONREAD, &len); if (rc != 0) { len = 0; } - // In Linux versions before 2.6.11, the capacity of a - // pipe was the same as the system page size (e.g., 4096 - // bytes on i386). Since Linux 2.6.11, the pipe - // capacity is 16 pages (i.e., 65,536 bytes in a system - // with a page size of 4096 bytes). Since Linux 2.6.35, - // the default pipe capacity is 16 pages, but the - // capacity can be queried and set using the - // fcntl(2) F_GETPIPE_SZ and F_SETPIPE_SZ operations. - // See fcntl(2) for more information. - - //:# define F_SETPIPE_SZ 1031 /* Set pipe page size array. - const F_SETPIPE_SZ = 1031; - const F_GETPIPE_SZ = 1032; - if (len > 0) { if (this.poll_ref) |poll| { poll.flags.insert(.readable); } - } else if (this.poll_ref) |poll| { - poll.flags.remove(.readable); + } else { + if (this.poll_ref) |poll| { + poll.flags.remove(.readable); + } + + pipe_is_empty_on_linux = true; } + // we do not un-mark it as readable if there's nothing in the pipe if (!this.has_adjusted_pipe_size_on_linux) { - if (len >= std.mem.page_size * 16) { + if (len > 0 and buf_to_use.len >= std.mem.page_size * 16) { this.has_adjusted_pipe_size_on_linux = true; - var pipe_len: c_int = 0; - _ = std.c.fcntl(fd, F_GETPIPE_SZ, &pipe_len); - - if (pipe_len > 0 and pipe_len < std.mem.page_size * 16) { - var out_size: c_int = getMaxPipeSizeOnLinux(); - _ = std.c.fcntl(fd, F_SETPIPE_SZ, &out_size); - } + _ = Syscall.setPipeCapacityOnLinux(fd, @minimum(buf_to_use.len * 4, Syscall.getMaxPipeSizeOnLinux())); } } } @@ -3706,8 +3719,10 @@ pub const FileReader = struct { } if (this.poll_ref) |poll| { - if ((available_to_read orelse 0) > 0) { - poll.flags.insert(.readable); + if (comptime Environment.isMac) { + if ((available_to_read orelse 0) > 0) { + poll.flags.insert(.readable); + } } const is_readable = poll.isReadable(); @@ -3720,7 +3735,7 @@ pub const FileReader = struct { } else if (!is_readable and poll.isHUP()) { this.finalize(); return .{ .done = {} }; - } else if (!is_readable and poll.isRegistered()) { + } else if (!is_readable) { if (this.finished) { this.finalize(); return .{ .done = {} }; @@ -3730,18 +3745,46 @@ pub const FileReader = struct { this.view.set(this.globalThis(), view); this.buf = read_buf; if (!this.isWatching()) - this.watch(this.fd); + this.watch(fd); + } + + return .{ + .pending = &this.pending, + }; + } + } + + if (comptime Environment.isLinux) { + if (pipe_is_empty_on_linux) { + std.debug.assert(this.poll_ref == null); + if (view != .zero) { + this.view.set(this.globalThis(), view); + this.buf = read_buf; } + this.watch(fd); return .{ .pending = &this.pending, }; } } - const rc = Syscall.read(fd, buf_to_use); + // const rc: JSC.Node.Maybe(usize) = if (comptime Environment.isLinux) brk: { + // if (len == 65536 and this.has_adjusted_pipe_size_on_linux and buf_to_use.len > len) { + // var iovecs = [_]std.os.iovec{.{ .iov_base = @intToPtr([*]u8, @ptrToInt(buf_to_use.ptr)), .iov_len = @intCast(usize, buf_to_use.len) }}; + // const rc = bun.C.linux.vmsplice(fd, &iovecs, 1, 0); + // Output.debug("vmsplice({d}, {d}) = {d}", .{ fd, buf_to_use.len, rc }); + // if (JSC.Node.Maybe(usize).errnoSys(rc, .read)) |err| { + // break :brk err; + // } - switch (rc) { + // break :brk JSC.Node.Maybe(usize){ .result = @intCast(usize, rc) }; + // } + + // break :brk Syscall.read(fd, buf_to_use); + // } else Syscall.read(fd, buf_to_use); + + switch (Syscall.read(fd, buf_to_use)) { .err => |err| { const retry = std.os.E.AGAIN; const errno = brk: { @@ -3798,19 +3841,16 @@ pub const FileReader = struct { return .{ .err = sys }; }, .result => |result| { - if (this.poll_ref) |poll| { - if (this.isFIFO()) { - if (result < buf_to_use.len) { - // do not insert .eof here - poll.flags.remove(.readable); + if (this.isFIFO()) { + if (this.poll_ref) |poll| { - if (result > 0 and !poll.flags.contains(.hup) and !this.finished) { - // partial read, but not close. be sure to ask for more data - if (!this.isWatching()) - this.watch(fd); - } - } + // do not insert .eof here + if (result < buf_to_use.len) + poll.flags.remove(.readable); } + + if (!this.finished and !this.isWatching()) + this.watch(fd); } if (result == 0 and free_buffer_on_error) { @@ -3822,13 +3862,11 @@ pub const FileReader = struct { return this.handleReadChunk(result, view, true, buf_to_use); } - if (result == 0 and !this.finished and !this.close_on_eof and this.isFIFO()) { + if (result == 0 and this.isFIFO() and view != .zero) { this.view.set(this.globalThis(), view); this.buf = read_buf; - if (!this.isWatching()) - this.watch(fd); - this.poll_ref.?.flags.remove(.readable); - + if (this.poll_ref) |poll| + poll.flags.remove(.readable); return .{ .pending = &this.pending, }; @@ -3955,7 +3993,25 @@ pub const FileReader = struct { } } - pub const Source = ReadableStreamSource(@This(), "FileReader", onStart, onPullInto, onCancel, deinit, setRefOrUnref); + pub fn drainInternalBuffer(this: *FileReader) bun.ByteList { + var buffered = this.buffered_data; + if (buffered.len > 0) { + this.buffered_data = .{}; + } + + return buffered; + } + + pub const Source = ReadableStreamSource( + @This(), + "FileReader", + onStart, + onPullInto, + onCancel, + deinit, + setRefOrUnref, + drainInternalBuffer, + ); }; pub fn NewReadyWatcher( @@ -3970,11 +4026,15 @@ pub fn NewReadyWatcher( const Watcher = @This(); pub inline fn isFIFO(this: *const Context) bool { - if (this.poll_ref) |poll| { - return poll.flags.contains(.fifo) or poll.flags.contains(.tty); + if (comptime @hasField(Context, "is_fifo")) { + return this.is_fifo; + } + + if (this.poll_ref != null) { + return true; } - if (@hasField(Context, "mode")) { + if (comptime @hasField(Context, "mode")) { return std.os.S.ISFIFO(this.mode) or std.os.S.ISCHR(this.mode); } @@ -4046,34 +4106,3 @@ pub fn NewReadyWatcher( // pub fn onError(this: *Streamer): anytype, // }; // } - -fn getMaxPipeSizeOnLinux() c_int { - return bun.once(struct { - fn once() c_int { - const default_out_size = 512 * 1024; - const pipe_max_size_fd = switch (JSC.Node.Syscall.open("/proc/sys/fs/pipe-max-size", std.os.O.RDONLY, 0)) { - .result => |fd2| fd2, - .err => |err| { - Output.debug("Failed to open /proc/sys/fs/pipe-max-size: {d}\n", .{err.errno}); - return default_out_size; - }, - }; - defer _ = JSC.Node.Syscall.close(pipe_max_size_fd); - var max_pipe_size_buf: [128]u8 = undefined; - const max_pipe_size = switch (JSC.Node.Syscall.read(pipe_max_size_fd, max_pipe_size_buf[0..])) { - .result => |bytes_read| std.fmt.parseInt(i64, strings.trim(max_pipe_size_buf[0..bytes_read], "\n"), 10) catch |err| { - Output.debug("Failed to parse /proc/sys/fs/pipe-max-size: {any}\n", .{@errorName(err)}); - return default_out_size; - }, - .err => |err| { - Output.debug("Failed to read /proc/sys/fs/pipe-max-size: {d}\n", .{err.errno}); - return default_out_size; - }, - }; - - // we set the absolute max to 8 MB because honestly that's a huge pipe - // my current linux machine only goes up to 1 MB, so that's very unlikely to be hit - return @minimum(@truncate(c_int, max_pipe_size), 1024 * 1024 * 8); - } - }.once, c_int); -} diff --git a/src/cli/test_command.zig b/src/cli/test_command.zig index ac210dfe2..3f0082940 100644 --- a/src/cli/test_command.zig +++ b/src/cli/test_command.zig @@ -504,10 +504,10 @@ pub const TestCommand = struct { if (!Jest.Jest.runner.?.has_pending_tests) Jest.Jest.runner.?.drain(); vm.eventLoop().tick(); - while (Jest.Jest.runner.?.has_pending_tests) : (vm.eventLoop().tick()) { - vm.eventLoop().tick(); - if (!Jest.Jest.runner.?.has_pending_tests) break; + while (Jest.Jest.runner.?.has_pending_tests) { vm.eventLoop().autoTick(); + if (!Jest.Jest.runner.?.has_pending_tests) break; + vm.eventLoop().tick(); } } _ = vm.global.vm().runGC(false); diff --git a/src/global.zig b/src/global.zig index 6135e807d..97e5dc29d 100644 --- a/src/global.zig +++ b/src/global.zig @@ -396,3 +396,8 @@ pub fn once(comptime function: anytype, comptime ReturnType: type) ReturnType { return Result.execute(); } + +pub fn VoidUnless(comptime T: type, comptime cond: bool, comptime default: T) T { + if (cond) return default; + return {}; +} diff --git a/src/linux_c.zig b/src/linux_c.zig index 04a2a508c..b200becdc 100644 --- a/src/linux_c.zig +++ b/src/linux_c.zig @@ -462,3 +462,5 @@ pub extern "c" fn posix_spawn_file_actions_addfchdir_np(actions: *posix_spawn_fi // pub extern "c" fn posix_spawn_file_actions_addinherit_np(actions: *posix_spawn_file_actions_t, filedes: fd_t) c_int; pub extern "c" fn posix_spawn_file_actions_addchdir_np(actions: *posix_spawn_file_actions_t, path: [*:0]const u8) c_int; + +pub extern fn vmsplice(fd: c_int, iovec: [*]const std.os.iovec, iovec_count: usize, flags: u32) isize; |