diff options
author | 2022-11-12 18:30:12 -0800 | |
---|---|---|
committer | 2022-11-12 18:30:12 -0800 | |
commit | 21bf3ddaf23c842dc12a1d76dbd3b48daf08f349 (patch) | |
tree | 06706104877984e9f083fed7c3278c9d007193cc /src/bun.js/api | |
parent | 514f2a8eddf1a1d35a33cc096ed7403a79afe36f (diff) | |
download | bun-21bf3ddaf23c842dc12a1d76dbd3b48daf08f349.tar.gz bun-21bf3ddaf23c842dc12a1d76dbd3b48daf08f349.tar.zst bun-21bf3ddaf23c842dc12a1d76dbd3b48daf08f349.zip |
Redo how we poll pipes (#1496)
* Fix pipe
* Handle unregistered
* Fix failing test
Diffstat (limited to 'src/bun.js/api')
-rw-r--r-- | src/bun.js/api/bun/spawn.zig | 13 | ||||
-rw-r--r-- | src/bun.js/api/bun/subprocess.zig | 199 |
2 files changed, 146 insertions, 66 deletions
diff --git a/src/bun.js/api/bun/spawn.zig b/src/bun.js/api/bun/spawn.zig index c1deebd3a..d594d44a7 100644 --- a/src/bun.js/api/bun/spawn.zig +++ b/src/bun.js/api/bun/spawn.zig @@ -208,8 +208,17 @@ pub const PosixSpawn = struct { envp, ); - if (Maybe(pid_t).errno(rc)) |err| { - return err; + if (comptime bun.Environment.isLinux) { + // rc is negative because it's libc errno + if (rc > 0) { + if (Maybe(pid_t).errnoSysP(-rc, .posix_spawn, path)) |err| { + return err; + } + } + } else { + if (Maybe(pid_t).errnoSysP(rc, .posix_spawn, path)) |err| { + return err; + } } return Maybe(pid_t){ .result = pid }; diff --git a/src/bun.js/api/bun/subprocess.zig b/src/bun.js/api/bun/subprocess.zig index c82b4744f..dcfa88a40 100644 --- a/src/bun.js/api/bun/subprocess.zig +++ b/src/bun.js/api/bun/subprocess.zig @@ -31,7 +31,7 @@ pub const Subprocess = struct { killed: bool = false, reffer: JSC.Ref = JSC.Ref.init(), - poll_ref: JSC.PollRef = JSC.PollRef.init(), + poll_ref: ?*JSC.FilePoll = null, exit_promise: JSC.Strong = .{}, @@ -56,7 +56,7 @@ pub const Subprocess = struct { pub fn ref(this: *Subprocess) void { this.reffer.ref(this.globalThis.bunVM()); - this.poll_ref.ref(this.globalThis.bunVM()); + if (this.poll_ref) |poll| poll.ref(this.globalThis.bunVM()); } pub fn unref(this: *Subprocess) void { @@ -96,7 +96,7 @@ pub const Subprocess = struct { return; } - if (this.buffer.fd != std.math.maxInt(JSC.Node.FileDescriptor)) { + if (this.buffer.fd != JSC.Node.invalid_fd) { this.buffer.close(); } } @@ -184,7 +184,7 @@ pub const Subprocess = struct { // TODO: handle when there's pending unread data in the pipe // For some reason, this currently hangs forever - if (!this.pipe.buffer.received_eof and this.pipe.buffer.fd != std.math.maxInt(JSC.Node.FileDescriptor)) { + if (!this.pipe.buffer.received_eof and this.pipe.buffer.fd != JSC.Node.invalid_fd) { if (this.pipe.buffer.canRead()) this.pipe.buffer.readIfPossible(true); } @@ -349,8 +349,8 @@ pub const Subprocess = struct { pub const BufferedInput = struct { remain: []const u8 = "", - fd: JSC.Node.FileDescriptor = std.math.maxInt(JSC.Node.FileDescriptor), - poll_ref: JSC.PollRef = .{}, + fd: JSC.Node.FileDescriptor = JSC.Node.invalid_fd, + poll_ref: ?*JSC.FilePoll = null, written: usize = 0, source: union(enum) { @@ -358,14 +358,23 @@ pub const Subprocess = struct { array_buffer: JSC.ArrayBuffer.Strong, }, - pub usingnamespace JSC.WebCore.NewReadyWatcher(BufferedInput, .write, onReady); + pub usingnamespace JSC.WebCore.NewReadyWatcher(BufferedInput, .writable, onReady); pub fn onReady(this: *BufferedInput, _: i64) void { this.write(); } pub fn canWrite(this: *BufferedInput) bool { - return bun.isWritable(this.fd); + const is_writable = bun.isWritable(this.fd); + if (is_writable) { + if (this.poll_ref) |poll_ref| { + poll_ref.flags.insert(.writable); + poll_ref.flags.insert(.fifo); + std.debug.assert(poll_ref.flags.contains(.poll_writable)); + } + } + + return is_writable; } pub fn writeIfPossible(this: *BufferedInput, comptime is_sync: bool) void { @@ -376,6 +385,7 @@ pub const Subprocess = struct { // because we don't want to block the thread waiting for the write if (!this.canWrite()) { this.watch(this.fd); + this.poll_ref.?.flags.insert(.fifo); return; } } @@ -387,7 +397,6 @@ pub const Subprocess = struct { var to_write = this.remain; if (to_write.len == 0) { - if (this.poll_ref.isActive()) this.unwatch(this.fd); // we are done! this.closeFDIfOpen(); return; @@ -406,6 +415,7 @@ pub const Subprocess = struct { }); this.watch(this.fd); + this.poll_ref.?.flags.insert(.fifo); return; } @@ -445,11 +455,14 @@ pub const Subprocess = struct { } fn closeFDIfOpen(this: *BufferedInput) void { - if (this.poll_ref.isActive()) this.unwatch(this.fd); + if (this.poll_ref) |poll| { + this.poll_ref = null; + poll.deinit(); + } - if (this.fd != std.math.maxInt(JSC.Node.FileDescriptor)) { + if (this.fd != JSC.Node.invalid_fd) { _ = JSC.Node.Syscall.close(this.fd); - this.fd = std.math.maxInt(JSC.Node.FileDescriptor); + this.fd = JSC.Node.invalid_fd; } } @@ -470,12 +483,12 @@ pub const Subprocess = struct { pub const BufferedOutput = struct { internal_buffer: bun.ByteList = .{}, max_internal_buffer: u32 = default_max_buffer_size, - fd: JSC.Node.FileDescriptor = std.math.maxInt(JSC.Node.FileDescriptor), + fd: JSC.Node.FileDescriptor = JSC.Node.invalid_fd, received_eof: bool = false, pending_error: ?JSC.Node.Syscall.Error = null, - poll_ref: JSC.PollRef = .{}, + poll_ref: ?*JSC.FilePoll = null, - pub usingnamespace JSC.WebCore.NewReadyWatcher(BufferedOutput, .read, ready); + pub usingnamespace JSC.WebCore.NewReadyWatcher(BufferedOutput, .readable, ready); pub fn ready(this: *BufferedOutput, _: i64) void { // TODO: what happens if the task was already enqueued after unwatch()? @@ -483,7 +496,17 @@ pub const Subprocess = struct { } pub fn canRead(this: *BufferedOutput) bool { - return bun.isReadable(this.fd); + const is_readable = bun.isReadable(this.fd); + + if (is_readable) { + if (this.poll_ref) |poll_ref| { + poll_ref.flags.insert(.readable); + poll_ref.flags.insert(.fifo); + std.debug.assert(poll_ref.flags.contains(.poll_readable)); + } + } + + return is_readable; } pub fn readIfPossible(this: *BufferedOutput, comptime force: bool) void { @@ -495,6 +518,7 @@ pub const Subprocess = struct { // and we don't want this to become an event loop ticking point if (!this.canRead()) { this.watch(this.fd); + this.poll_ref.?.flags.insert(.fifo); return; } } @@ -502,9 +526,33 @@ pub const Subprocess = struct { this.readAll(force); } + pub fn closeOnEOF(this: *BufferedOutput) bool { + var poll = this.poll_ref orelse return true; + poll.flags.insert(.eof); + return false; + } + pub fn readAll(this: *BufferedOutput, comptime force: bool) void { + if (this.poll_ref) |poll| { + const is_readable = poll.isReadable(); + if (!is_readable and poll.isEOF()) { + if (poll.isHUP()) { + this.autoCloseFileDescriptor(); + } + + return; + } else if (!is_readable and poll.isHUP()) { + this.autoCloseFileDescriptor(); + return; + } else if (!is_readable) { + if (comptime !force) { + return; + } + } + } + // read as much as we can from the pipe - while (this.internal_buffer.len <= this.max_internal_buffer) { + while (this.internal_buffer.len < this.max_internal_buffer) { var buffer_: [@maximum(std.mem.page_size, 16384)]u8 = undefined; var buf: []u8 = buffer_[0..]; @@ -517,7 +565,9 @@ pub const Subprocess = struct { switch (JSC.Node.Syscall.read(this.fd, buf)) { .err => |e| { if (e.isRetry()) { - this.watch(this.fd); + if (!this.isWatching()) + this.watch(this.fd); + this.poll_ref.?.flags.insert(.fifo); return; } @@ -558,7 +608,9 @@ pub const Subprocess = struct { if (comptime !force) { if (buf[bytes_read..].len > 0 or !this.canRead()) { - this.watch(this.fd); + if (!this.isWatching()) + this.watch(this.fd); + this.poll_ref.?.flags.insert(.fifo); this.received_eof = true; return; } @@ -566,6 +618,10 @@ pub const Subprocess = struct { // we consider a short read as being EOF this.received_eof = this.received_eof or bytes_read < buf.len; if (this.received_eof) { + if (this.closeOnEOF()) { + this.autoCloseFileDescriptor(); + } + // do not auto-close the file descriptor here // it's totally legit to have a short read return; @@ -579,7 +635,7 @@ pub const Subprocess = struct { pub fn toBlob(this: *BufferedOutput, globalThis: *JSC.JSGlobalObject) JSC.WebCore.Blob { const blob = JSC.WebCore.Blob.init(this.internal_buffer.slice(), bun.default_allocator, globalThis); this.internal_buffer = bun.ByteList.init(""); - std.debug.assert(this.fd == std.math.maxInt(JSC.Node.FileDescriptor)); + std.debug.assert(this.fd == JSC.Node.invalid_fd); std.debug.assert(this.received_eof); return blob; } @@ -600,55 +656,61 @@ pub const Subprocess = struct { ).?; } + var poll_ref = this.poll_ref; + this.poll_ref = null; + return JSC.WebCore.ReadableStream.fromJS( - JSC.WebCore.ReadableStream.fromBlob( + JSC.WebCore.ReadableStream.fromBlobWithPoll( globalThis, &this.toBlob(globalThis), 0, + poll_ref, ), globalThis, ).?; } } - std.debug.assert(this.fd != std.math.maxInt(JSC.Node.FileDescriptor)); - - // BufferedOutput is going away - // let's make sure we don't watch it anymore - if (this.poll_ref.isActive()) { - this.unwatch(this.fd); - } - - // There could still be data waiting to be read in the pipe - // so we need to create a new stream that will read from the - // pipe and then return the blob. - var blob = JSC.WebCore.Blob.findOrCreateFileFromPath(.{ .fd = this.fd }, globalThis); - const result = JSC.WebCore.ReadableStream.fromJS( - JSC.WebCore.ReadableStream.fromBlob( + std.debug.assert(this.fd != JSC.Node.invalid_fd); + { + var poll_ref = this.poll_ref; + this.poll_ref = null; + + // There could still be data waiting to be read in the pipe + // so we need to create a new stream that will read from the + // pipe and then return the blob. + var blob = JSC.WebCore.Blob.findOrCreateFileFromPath(.{ .fd = this.fd }, globalThis); + const result = JSC.WebCore.ReadableStream.fromJS( + JSC.WebCore.ReadableStream.fromBlobWithPoll( + globalThis, + &blob, + 0, + poll_ref, + ), globalThis, - &blob, - 0, - ), - globalThis, - ).?; - blob.detach(); - result.ptr.File.buffered_data = this.internal_buffer; - result.ptr.File.stored_global_this_ = globalThis; - result.ptr.File.finished = exited; - this.internal_buffer = bun.ByteList.init(""); - this.fd = std.math.maxInt(JSC.Node.FileDescriptor); - this.received_eof = false; - return result; + ).?; + blob.detach(); + + result.ptr.File.buffered_data = this.internal_buffer; + result.ptr.File.stored_global_this_ = globalThis; + result.ptr.File.finished = exited; + this.internal_buffer = bun.ByteList.init(""); + this.fd = JSC.Node.invalid_fd; + this.received_eof = false; + return result; + } } pub fn autoCloseFileDescriptor(this: *BufferedOutput) void { const fd = this.fd; - if (fd == std.math.maxInt(JSC.Node.FileDescriptor)) + if (fd == JSC.Node.invalid_fd) return; - this.fd = std.math.maxInt(JSC.Node.FileDescriptor); + this.fd = JSC.Node.invalid_fd; - if (this.poll_ref.isActive()) - this.unwatch(fd); + if (this.poll_ref) |poll| { + this.poll_ref = null; + poll.deinit(); + } _ = JSC.Node.Syscall.close(fd); } @@ -826,8 +888,9 @@ pub const Subprocess = struct { .items = &.{}, .capacity = 0, }; + var jsc_vm = globalThis.bunVM(); - var cwd = globalThis.bunVM().bundler.fs.top_level_dir; + var cwd = jsc_vm.bundler.fs.top_level_dir; var stdio = [3]Stdio{ .{ .ignore = .{} }, @@ -841,13 +904,13 @@ pub const Subprocess = struct { } var on_exit_callback = JSValue.zero; - var PATH = globalThis.bunVM().bundler.env.get("PATH") orelse ""; + var PATH = jsc_vm.bundler.env.get("PATH") orelse ""; var argv: std.ArrayListUnmanaged(?[*:0]const u8) = undefined; var cmd_value = JSValue.zero; var args = args_; { if (args.isEmptyOrUndefinedOrNull()) { - globalThis.throwInvalidArguments("cmds must be an array", .{}); + globalThis.throwInvalidArguments("cmd must be an array", .{}); return .zero; } @@ -858,7 +921,7 @@ pub const Subprocess = struct { } else if (args.get(globalThis, "cmd")) |cmd_value_| { cmd_value = cmd_value_; } else { - globalThis.throwInvalidArguments("cmds must be an array", .{}); + globalThis.throwInvalidArguments("cmd must be an array", .{}); return .zero; } @@ -1015,7 +1078,7 @@ pub const Subprocess = struct { defer actions.deinit(); if (env_array.items.len == 0) { - env_array.items = globalThis.bunVM().bundler.env.map.createNullDelimitedEnvMap(allocator) catch |err| return globalThis.handleError(err, "in posix_spawn"); + env_array.items = jsc_vm.bundler.env.map.createNullDelimitedEnvMap(allocator) catch |err| return globalThis.handleError(err, "in posix_spawn"); env_array.capacity = env_array.items.len; } @@ -1100,7 +1163,7 @@ pub const Subprocess = struct { var status: u32 = 0; // ensure we don't leak the child process on error _ = std.os.linux.waitpid(pid, &status, 0); - return JSValue.jsUndefined(); + return .zero; }, } }; @@ -1135,11 +1198,12 @@ pub const Subprocess = struct { subprocess.this_jsvalue.set(globalThis, out); if (comptime !is_sync) { - switch (globalThis.bunVM().poller.watch( - @intCast(JSC.Node.FileDescriptor, pidfd), + var poll = JSC.FilePoll.init(jsc_vm, pidfd, .{}, Subprocess, subprocess); + subprocess.poll_ref = poll; + switch (poll.register( + jsc_vm.uws_event_loop.?, .process, - Subprocess, - subprocess, + true, )) { .result => {}, .err => |err| { @@ -1237,7 +1301,14 @@ pub const Subprocess = struct { if (!sync) { var vm = this.globalThis.bunVM(); - this.unrefWithoutGC(vm); + this.reffer.unref(vm); + + // prevent duplicate notifications + if (this.poll_ref) |poll| { + this.poll_ref = null; + poll.deinitWithVM(vm); + } + this.waitpid_task = JSC.AnyTask.New(Subprocess, onExit).init(this); this.has_waitpid_task = true; vm.eventLoop().enqueueTask(JSC.Task.init(&this.waitpid_task)); @@ -1245,7 +1316,7 @@ pub const Subprocess = struct { } pub fn unrefWithoutGC(this: *Subprocess, vm: *JSC.VirtualMachine) void { - this.poll_ref.unref(vm); + if (this.poll_ref) |poll| poll.unref(vm); this.reffer.unref(vm); } |