diff options
author | 2023-08-02 17:00:01 -0300 | |
---|---|---|
committer | 2023-08-02 13:00:01 -0700 | |
commit | 25553e62c1cb8151c901f2e09ccb1a71f6a1e7bd (patch) | |
tree | be0f650cedbeec02ba317c784bec9c260cffd5b9 | |
parent | 4f39d5b54a21454f1c71922dbca87a5d6d69e3a0 (diff) | |
download | bun-25553e62c1cb8151c901f2e09ccb1a71f6a1e7bd.tar.gz bun-25553e62c1cb8151c901f2e09ccb1a71f6a1e7bd.tar.zst bun-25553e62c1cb8151c901f2e09ccb1a71f6a1e7bd.zip |
Fix path_watcher (#3920)
* fix callback and onError
* fix main watcher error return
* fixup
* rename to be more clear
-rw-r--r-- | src/bun.js/node/path_watcher.zig | 280 |
1 files changed, 174 insertions, 106 deletions
diff --git a/src/bun.js/node/path_watcher.zig b/src/bun.js/node/path_watcher.zig index 49caa93d0..1d81661e9 100644 --- a/src/bun.js/node/path_watcher.zig +++ b/src/bun.js/node/path_watcher.zig @@ -35,6 +35,7 @@ pub const PathWatcherManager = struct { deinit_on_last_watcher: bool = false, pending_tasks: u32 = 0, deinit_on_last_task: bool = false, + has_pending_tasks: std.atomic.Atomic(bool) = std.atomic.Atomic(bool).init(false), mutex: Mutex, const PathInfo = struct { fd: StoredFileDescriptorType = 0, @@ -45,6 +46,31 @@ pub const PathWatcherManager = struct { hash: Watcher.HashType, }; + fn refPendingTask(this: *PathWatcherManager) bool { + @fence(.Release); + this.mutex.lock(); + defer this.mutex.unlock(); + if (this.deinit_on_last_task) return false; + this.pending_tasks += 1; + this.has_pending_tasks.store(true, .Release); + return true; + } + + fn hasPendingTasks(this: *PathWatcherManager) callconv(.C) bool { + @fence(.Acquire); + return this.has_pending_tasks.load(.Acquire); + } + + fn unrefPendingTask(this: *PathWatcherManager) void { + @fence(.Release); + this.mutex.lock(); + defer this.mutex.unlock(); + this.pending_tasks -= 1; + if (this.deinit_on_last_task and this.pending_tasks == 0) { + this.has_pending_tasks.store(false, .Release); + this.deinit(); + } + } // TODO: switch to using JSC.Maybe to avoid using "unreachable" and improve error messages fn _fdFromAbsolutePathZ( this: *PathWatcherManager, @@ -302,26 +328,27 @@ pub const PathWatcherManager = struct { this: *PathWatcherManager, err: anyerror, ) void { - this.mutex.lock(); - const watchers = this.watchers.slice(); - const timestamp = std.time.milliTimestamp(); - - // stop all watchers - for (watchers) |w| { - if (w) |watcher| { - log("[watch] error: {s}", .{@errorName(err)}); - watcher.emit(@errorName(err), 0, timestamp, false, .@"error"); - watcher.flush(); + { + this.mutex.lock(); + defer this.mutex.unlock(); + const watchers = this.watchers.slice(); + const timestamp = std.time.milliTimestamp(); + + // stop all watchers + for (watchers) |w| { + if (w) |watcher| { + log("[watch] error: {s}", .{@errorName(err)}); + watcher.emit(@errorName(err), 0, timestamp, false, .@"error"); + watcher.flush(); + } } - } - - // we need a new manager at this point - default_manager_mutex.lock(); - defer default_manager_mutex.unlock(); - default_manager = null; + // we need a new manager at this point + default_manager_mutex.lock(); + defer default_manager_mutex.unlock(); + default_manager = null; + } // deinit manager when all watchers are closed - this.mutex.unlock(); this.deinit(); } @@ -338,46 +365,57 @@ pub const PathWatcherManager = struct { } fn schedule(manager: *PathWatcherManager, watcher: *PathWatcher, path: PathInfo) !void { - manager.mutex.lock(); - defer manager.mutex.unlock(); // keep the path alive - manager._incrementPathRefNoLock(path.path); + manager._incrementPathRef(path.path); errdefer manager._decrementPathRef(path.path); + var routine: *DirectoryRegisterTask = undefined; + { + manager.mutex.lock(); + defer manager.mutex.unlock(); + + // use the same thread for the same fd to avoid race conditions + if (manager.current_fd_task.getEntry(path.fd)) |entry| { + routine = entry.value_ptr.*; + + if (watcher.refPendingDirectory()) { + routine.watcher_list.push(bun.default_allocator, watcher) catch |err| { + watcher.unrefPendingDirectory(); + return err; + }; + } else { + return error.UnexpectedFailure; + } + return; + } - // use the same thread for the same fd to avoid race conditions - if (manager.current_fd_task.getEntry(path.fd)) |entry| { - var routine = entry.value_ptr.*; - watcher.mutex.lock(); - defer watcher.mutex.unlock(); - watcher.pending_directories += 1; - routine.watcher_list.push(bun.default_allocator, watcher) catch |err| { - watcher.pending_directories -= 1; + routine = try bun.default_allocator.create(DirectoryRegisterTask); + routine.* = DirectoryRegisterTask{ + .manager = manager, + .path = path, + .watcher_list = bun.BabyList(*PathWatcher).initCapacity(bun.default_allocator, 1) catch |err| { + bun.default_allocator.destroy(routine); + return err; + }, + }; + errdefer routine.deinit(); + if (watcher.refPendingDirectory()) { + routine.watcher_list.push(bun.default_allocator, watcher) catch |err| { + watcher.unrefPendingDirectory(); + return err; + }; + } else { + return error.UnexpectedFailure; + } + manager.current_fd_task.put(path.fd, routine) catch |err| { + watcher.unrefPendingDirectory(); return err; }; + } + if (manager.refPendingTask()) { + JSC.WorkPool.schedule(&routine.task); return; } - var routine = try bun.default_allocator.create(DirectoryRegisterTask); - routine.* = DirectoryRegisterTask{ - .manager = manager, - .path = path, - .watcher_list = bun.BabyList(*PathWatcher).initCapacity(bun.default_allocator, 1) catch |err| { - bun.default_allocator.destroy(routine); - return err; - }, - }; - errdefer routine.deinit(); - try routine.watcher_list.push(bun.default_allocator, watcher); - watcher.mutex.lock(); - defer watcher.mutex.unlock(); - watcher.pending_directories += 1; - - manager.current_fd_task.put(path.fd, routine) catch |err| { - watcher.pending_directories -= 1; - return err; - }; - manager.pending_tasks += 1; - JSC.WorkPool.schedule(&routine.task); - return; + return error.UnexpectedFailure; } fn getNext(this: *DirectoryRegisterTask) ?*PathWatcher { @@ -405,17 +443,6 @@ pub const PathWatcherManager = struct { var iter = (std.fs.IterableDir{ .dir = std.fs.Dir{ .fd = fd, } }).iterate(); - defer { - watcher.mutex.lock(); - watcher.pending_directories -= 1; - - if (watcher.pending_directories == 0 and watcher.finalized) { - watcher.mutex.unlock(); - watcher.deinit(); - } else { - watcher.mutex.unlock(); - } - } // now we iterate over all files and directories while (try iter.next()) |entry| { @@ -431,19 +458,20 @@ pub const PathWatcherManager = struct { var entry_path_z = buf[0..entry_path.len :0]; var child_path = try manager._fdFromAbsolutePathZ(entry_path_z); - watcher.mutex.lock(); - watcher.file_paths.push(bun.default_allocator, child_path.path) catch |err| { - watcher.mutex.unlock(); - manager._decrementPathRef(entry_path_z); - return err; - }; - watcher.mutex.unlock(); + { + watcher.mutex.lock(); + defer watcher.mutex.unlock(); + watcher.file_paths.push(bun.default_allocator, child_path.path) catch |err| { + manager._decrementPathRef(entry_path_z); + return err; + }; + } // we need to call this unlocked if (child_path.is_file) { try manager.main_watcher.addFile(child_path.fd, child_path.path, child_path.hash, options.Loader.file, 0, null, false); } else { - if (watcher.recursive and !watcher.finalized) { + if (watcher.recursive and !watcher.isClosed()) { // this may trigger another thread with is desired when available to watch long trees try manager._addDirectory(watcher, child_path); } @@ -455,6 +483,7 @@ pub const PathWatcherManager = struct { var buf: [bun.MAX_PATH_BYTES + 1]u8 = undefined; while (this.getNext()) |watcher| { + defer watcher.unrefPendingDirectory(); this.processWatcher(watcher, &buf) catch |err| { log("[watch] error registering directory: {s}", .{@errorName(err)}); watcher.emit(@errorName(err), 0, std.time.milliTimestamp(), false, .@"error"); @@ -462,14 +491,7 @@ pub const PathWatcherManager = struct { }; } - this.manager.mutex.lock(); - this.manager.pending_tasks -= 1; - if (this.manager.deinit_on_last_task and this.manager.pending_tasks == 0) { - this.manager.mutex.unlock(); - this.manager.deinit(); - } else { - this.manager.mutex.unlock(); - } + this.manager.unrefPendingTask(); } fn deinit(this: *DirectoryRegisterTask) void { @@ -485,29 +507,30 @@ pub const PathWatcherManager = struct { return try DirectoryRegisterTask.schedule(this, watcher, path); } + // register is always called form main thread fn registerWatcher(this: *PathWatcherManager, watcher: *PathWatcher) !void { - this.mutex.lock(); + { + this.mutex.lock(); + defer this.mutex.unlock(); - if (this.watcher_count == this.watchers.len) { - this.watcher_count += 1; - this.watchers.push(bun.default_allocator, watcher) catch |err| { - this.watcher_count -= 1; - this.mutex.unlock(); - return err; - }; - } else { - var watchers = this.watchers.slice(); - for (watchers, 0..) |w, i| { - if (w == null) { - watchers[i] = watcher; - this.watcher_count += 1; - break; + if (this.watcher_count == this.watchers.len) { + this.watcher_count += 1; + this.watchers.push(bun.default_allocator, watcher) catch |err| { + this.watcher_count -= 1; + return err; + }; + } else { + var watchers = this.watchers.slice(); + for (watchers, 0..) |w, i| { + if (w == null) { + watchers[i] = watcher; + this.watcher_count += 1; + break; + } } } } - this.mutex.unlock(); - const path = watcher.path; if (path.is_file) { try this.main_watcher.addFile(path.fd, path.path, path.hash, options.Loader.file, 0, null, false); @@ -521,7 +544,9 @@ pub const PathWatcherManager = struct { } } - fn _incrementPathRefNoLock(this: *PathWatcherManager, file_path: [:0]const u8) void { + fn _incrementPathRef(this: *PathWatcherManager, file_path: [:0]const u8) void { + this.mutex.lock(); + defer this.mutex.unlock(); if (this.file_paths.getEntry(file_path)) |entry| { var path = entry.value_ptr; if (path.refs > 0) { @@ -551,6 +576,7 @@ pub const PathWatcherManager = struct { this._decrementPathRefNoLock(file_path); } + // unregister is always called form main thread fn unregisterWatcher(this: *PathWatcherManager, watcher: *PathWatcher) void { this.mutex.lock(); defer this.mutex.unlock(); @@ -579,11 +605,13 @@ pub const PathWatcherManager = struct { } } - watcher.mutex.lock(); - while (watcher.file_paths.popOrNull()) |file_path| { - this._decrementPathRefNoLock(file_path); + { + watcher.mutex.lock(); + defer watcher.mutex.unlock(); + while (watcher.file_paths.popOrNull()) |file_path| { + this._decrementPathRefNoLock(file_path); + } } - watcher.mutex.unlock(); break; } } @@ -605,7 +633,9 @@ pub const PathWatcherManager = struct { return; } - if (this.pending_tasks > 0) { + if (this.hasPendingTasks()) { + this.mutex.lock(); + defer this.mutex.unlock(); // deinit when all tasks are done this.deinit_on_last_task = true; return; @@ -655,9 +685,10 @@ pub const PathWatcher = struct { fsevents_watcher: ?*FSEvents.FSEventsWatcher, mutex: Mutex, pending_directories: u32 = 0, - finalized: bool = false, // only used on macOS resolved_path: ?string = null, + has_pending_directories: std.atomic.Atomic(bool) = std.atomic.Atomic(bool).init(false), + closed: std.atomic.Atomic(bool) = std.atomic.Atomic(bool).init(false), pub const ChangeEvent = struct { hash: PathWatcherManager.Watcher.HashType = 0, event_type: EventType = .change, @@ -738,6 +769,44 @@ pub const PathWatcher = struct { return this; } + pub fn refPendingDirectory(this: *PathWatcher) bool { + @fence(.Release); + this.mutex.lock(); + defer this.mutex.unlock(); + if (this.isClosed()) return false; + this.pending_directories += 1; + this.has_pending_directories.store(true, .Release); + return true; + } + + pub fn hasPendingDirectories(this: *PathWatcher) callconv(.C) bool { + @fence(.Acquire); + return this.has_pending_directories.load(.Acquire); + } + + pub fn isClosed(this: *PathWatcher) bool { + @fence(.Acquire); + return this.closed.load(.Acquire); + } + + pub fn setClosed(this: *PathWatcher) void { + this.mutex.lock(); + defer this.mutex.unlock(); + @fence(.Release); + this.closed.store(true, .Release); + } + + pub fn unrefPendingDirectory(this: *PathWatcher) void { + @fence(.Release); + this.mutex.lock(); + defer this.mutex.unlock(); + this.pending_directories -= 1; + if (this.isClosed() and this.pending_directories == 0) { + this.has_pending_directories.store(false, .Release); + this.deinit(); + } + } + pub fn emit(this: *PathWatcher, path: string, hash: PathWatcherManager.Watcher.HashType, time_stamp: i64, is_file: bool, event_type: EventType) void { const time_diff = time_stamp - this.last_change_event.time_stamp; // skip consecutive duplicates @@ -746,24 +815,23 @@ pub const PathWatcher = struct { this.last_change_event.event_type = event_type; this.last_change_event.hash = hash; this.needs_flush = true; + if (this.isClosed()) return; this.callback(this.ctx, path, is_file, event_type); } } pub fn flush(this: *PathWatcher) void { this.needs_flush = false; + if (this.isClosed()) return; this.flushCallback(this.ctx); } pub fn deinit(this: *PathWatcher) void { - this.mutex.lock(); - this.finalized = true; - if (this.pending_directories > 0) { + this.setClosed(); + if (this.hasPendingDirectories()) { // will be freed on last directory - this.mutex.unlock(); return; } - this.mutex.unlock(); if (this.manager) |manager| { if (comptime Environment.isMac) { |