aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/bun.js/node/path_watcher.zig280
1 files changed, 174 insertions, 106 deletions
diff --git a/src/bun.js/node/path_watcher.zig b/src/bun.js/node/path_watcher.zig
index 49caa93d0..1d81661e9 100644
--- a/src/bun.js/node/path_watcher.zig
+++ b/src/bun.js/node/path_watcher.zig
@@ -35,6 +35,7 @@ pub const PathWatcherManager = struct {
deinit_on_last_watcher: bool = false,
pending_tasks: u32 = 0,
deinit_on_last_task: bool = false,
+ has_pending_tasks: std.atomic.Atomic(bool) = std.atomic.Atomic(bool).init(false),
mutex: Mutex,
const PathInfo = struct {
fd: StoredFileDescriptorType = 0,
@@ -45,6 +46,31 @@ pub const PathWatcherManager = struct {
hash: Watcher.HashType,
};
+ fn refPendingTask(this: *PathWatcherManager) bool {
+ @fence(.Release);
+ this.mutex.lock();
+ defer this.mutex.unlock();
+ if (this.deinit_on_last_task) return false;
+ this.pending_tasks += 1;
+ this.has_pending_tasks.store(true, .Release);
+ return true;
+ }
+
+ fn hasPendingTasks(this: *PathWatcherManager) callconv(.C) bool {
+ @fence(.Acquire);
+ return this.has_pending_tasks.load(.Acquire);
+ }
+
+ fn unrefPendingTask(this: *PathWatcherManager) void {
+ @fence(.Release);
+ this.mutex.lock();
+ defer this.mutex.unlock();
+ this.pending_tasks -= 1;
+ if (this.deinit_on_last_task and this.pending_tasks == 0) {
+ this.has_pending_tasks.store(false, .Release);
+ this.deinit();
+ }
+ }
// TODO: switch to using JSC.Maybe to avoid using "unreachable" and improve error messages
fn _fdFromAbsolutePathZ(
this: *PathWatcherManager,
@@ -302,26 +328,27 @@ pub const PathWatcherManager = struct {
this: *PathWatcherManager,
err: anyerror,
) void {
- this.mutex.lock();
- const watchers = this.watchers.slice();
- const timestamp = std.time.milliTimestamp();
-
- // stop all watchers
- for (watchers) |w| {
- if (w) |watcher| {
- log("[watch] error: {s}", .{@errorName(err)});
- watcher.emit(@errorName(err), 0, timestamp, false, .@"error");
- watcher.flush();
+ {
+ this.mutex.lock();
+ defer this.mutex.unlock();
+ const watchers = this.watchers.slice();
+ const timestamp = std.time.milliTimestamp();
+
+ // stop all watchers
+ for (watchers) |w| {
+ if (w) |watcher| {
+ log("[watch] error: {s}", .{@errorName(err)});
+ watcher.emit(@errorName(err), 0, timestamp, false, .@"error");
+ watcher.flush();
+ }
}
- }
-
- // we need a new manager at this point
- default_manager_mutex.lock();
- defer default_manager_mutex.unlock();
- default_manager = null;
+ // we need a new manager at this point
+ default_manager_mutex.lock();
+ defer default_manager_mutex.unlock();
+ default_manager = null;
+ }
// deinit manager when all watchers are closed
- this.mutex.unlock();
this.deinit();
}
@@ -338,46 +365,57 @@ pub const PathWatcherManager = struct {
}
fn schedule(manager: *PathWatcherManager, watcher: *PathWatcher, path: PathInfo) !void {
- manager.mutex.lock();
- defer manager.mutex.unlock();
// keep the path alive
- manager._incrementPathRefNoLock(path.path);
+ manager._incrementPathRef(path.path);
errdefer manager._decrementPathRef(path.path);
+ var routine: *DirectoryRegisterTask = undefined;
+ {
+ manager.mutex.lock();
+ defer manager.mutex.unlock();
+
+ // use the same thread for the same fd to avoid race conditions
+ if (manager.current_fd_task.getEntry(path.fd)) |entry| {
+ routine = entry.value_ptr.*;
+
+ if (watcher.refPendingDirectory()) {
+ routine.watcher_list.push(bun.default_allocator, watcher) catch |err| {
+ watcher.unrefPendingDirectory();
+ return err;
+ };
+ } else {
+ return error.UnexpectedFailure;
+ }
+ return;
+ }
- // use the same thread for the same fd to avoid race conditions
- if (manager.current_fd_task.getEntry(path.fd)) |entry| {
- var routine = entry.value_ptr.*;
- watcher.mutex.lock();
- defer watcher.mutex.unlock();
- watcher.pending_directories += 1;
- routine.watcher_list.push(bun.default_allocator, watcher) catch |err| {
- watcher.pending_directories -= 1;
+ routine = try bun.default_allocator.create(DirectoryRegisterTask);
+ routine.* = DirectoryRegisterTask{
+ .manager = manager,
+ .path = path,
+ .watcher_list = bun.BabyList(*PathWatcher).initCapacity(bun.default_allocator, 1) catch |err| {
+ bun.default_allocator.destroy(routine);
+ return err;
+ },
+ };
+ errdefer routine.deinit();
+ if (watcher.refPendingDirectory()) {
+ routine.watcher_list.push(bun.default_allocator, watcher) catch |err| {
+ watcher.unrefPendingDirectory();
+ return err;
+ };
+ } else {
+ return error.UnexpectedFailure;
+ }
+ manager.current_fd_task.put(path.fd, routine) catch |err| {
+ watcher.unrefPendingDirectory();
return err;
};
+ }
+ if (manager.refPendingTask()) {
+ JSC.WorkPool.schedule(&routine.task);
return;
}
- var routine = try bun.default_allocator.create(DirectoryRegisterTask);
- routine.* = DirectoryRegisterTask{
- .manager = manager,
- .path = path,
- .watcher_list = bun.BabyList(*PathWatcher).initCapacity(bun.default_allocator, 1) catch |err| {
- bun.default_allocator.destroy(routine);
- return err;
- },
- };
- errdefer routine.deinit();
- try routine.watcher_list.push(bun.default_allocator, watcher);
- watcher.mutex.lock();
- defer watcher.mutex.unlock();
- watcher.pending_directories += 1;
-
- manager.current_fd_task.put(path.fd, routine) catch |err| {
- watcher.pending_directories -= 1;
- return err;
- };
- manager.pending_tasks += 1;
- JSC.WorkPool.schedule(&routine.task);
- return;
+ return error.UnexpectedFailure;
}
fn getNext(this: *DirectoryRegisterTask) ?*PathWatcher {
@@ -405,17 +443,6 @@ pub const PathWatcherManager = struct {
var iter = (std.fs.IterableDir{ .dir = std.fs.Dir{
.fd = fd,
} }).iterate();
- defer {
- watcher.mutex.lock();
- watcher.pending_directories -= 1;
-
- if (watcher.pending_directories == 0 and watcher.finalized) {
- watcher.mutex.unlock();
- watcher.deinit();
- } else {
- watcher.mutex.unlock();
- }
- }
// now we iterate over all files and directories
while (try iter.next()) |entry| {
@@ -431,19 +458,20 @@ pub const PathWatcherManager = struct {
var entry_path_z = buf[0..entry_path.len :0];
var child_path = try manager._fdFromAbsolutePathZ(entry_path_z);
- watcher.mutex.lock();
- watcher.file_paths.push(bun.default_allocator, child_path.path) catch |err| {
- watcher.mutex.unlock();
- manager._decrementPathRef(entry_path_z);
- return err;
- };
- watcher.mutex.unlock();
+ {
+ watcher.mutex.lock();
+ defer watcher.mutex.unlock();
+ watcher.file_paths.push(bun.default_allocator, child_path.path) catch |err| {
+ manager._decrementPathRef(entry_path_z);
+ return err;
+ };
+ }
// we need to call this unlocked
if (child_path.is_file) {
try manager.main_watcher.addFile(child_path.fd, child_path.path, child_path.hash, options.Loader.file, 0, null, false);
} else {
- if (watcher.recursive and !watcher.finalized) {
+ if (watcher.recursive and !watcher.isClosed()) {
// this may trigger another thread with is desired when available to watch long trees
try manager._addDirectory(watcher, child_path);
}
@@ -455,6 +483,7 @@ pub const PathWatcherManager = struct {
var buf: [bun.MAX_PATH_BYTES + 1]u8 = undefined;
while (this.getNext()) |watcher| {
+ defer watcher.unrefPendingDirectory();
this.processWatcher(watcher, &buf) catch |err| {
log("[watch] error registering directory: {s}", .{@errorName(err)});
watcher.emit(@errorName(err), 0, std.time.milliTimestamp(), false, .@"error");
@@ -462,14 +491,7 @@ pub const PathWatcherManager = struct {
};
}
- this.manager.mutex.lock();
- this.manager.pending_tasks -= 1;
- if (this.manager.deinit_on_last_task and this.manager.pending_tasks == 0) {
- this.manager.mutex.unlock();
- this.manager.deinit();
- } else {
- this.manager.mutex.unlock();
- }
+ this.manager.unrefPendingTask();
}
fn deinit(this: *DirectoryRegisterTask) void {
@@ -485,29 +507,30 @@ pub const PathWatcherManager = struct {
return try DirectoryRegisterTask.schedule(this, watcher, path);
}
+ // register is always called form main thread
fn registerWatcher(this: *PathWatcherManager, watcher: *PathWatcher) !void {
- this.mutex.lock();
+ {
+ this.mutex.lock();
+ defer this.mutex.unlock();
- if (this.watcher_count == this.watchers.len) {
- this.watcher_count += 1;
- this.watchers.push(bun.default_allocator, watcher) catch |err| {
- this.watcher_count -= 1;
- this.mutex.unlock();
- return err;
- };
- } else {
- var watchers = this.watchers.slice();
- for (watchers, 0..) |w, i| {
- if (w == null) {
- watchers[i] = watcher;
- this.watcher_count += 1;
- break;
+ if (this.watcher_count == this.watchers.len) {
+ this.watcher_count += 1;
+ this.watchers.push(bun.default_allocator, watcher) catch |err| {
+ this.watcher_count -= 1;
+ return err;
+ };
+ } else {
+ var watchers = this.watchers.slice();
+ for (watchers, 0..) |w, i| {
+ if (w == null) {
+ watchers[i] = watcher;
+ this.watcher_count += 1;
+ break;
+ }
}
}
}
- this.mutex.unlock();
-
const path = watcher.path;
if (path.is_file) {
try this.main_watcher.addFile(path.fd, path.path, path.hash, options.Loader.file, 0, null, false);
@@ -521,7 +544,9 @@ pub const PathWatcherManager = struct {
}
}
- fn _incrementPathRefNoLock(this: *PathWatcherManager, file_path: [:0]const u8) void {
+ fn _incrementPathRef(this: *PathWatcherManager, file_path: [:0]const u8) void {
+ this.mutex.lock();
+ defer this.mutex.unlock();
if (this.file_paths.getEntry(file_path)) |entry| {
var path = entry.value_ptr;
if (path.refs > 0) {
@@ -551,6 +576,7 @@ pub const PathWatcherManager = struct {
this._decrementPathRefNoLock(file_path);
}
+ // unregister is always called form main thread
fn unregisterWatcher(this: *PathWatcherManager, watcher: *PathWatcher) void {
this.mutex.lock();
defer this.mutex.unlock();
@@ -579,11 +605,13 @@ pub const PathWatcherManager = struct {
}
}
- watcher.mutex.lock();
- while (watcher.file_paths.popOrNull()) |file_path| {
- this._decrementPathRefNoLock(file_path);
+ {
+ watcher.mutex.lock();
+ defer watcher.mutex.unlock();
+ while (watcher.file_paths.popOrNull()) |file_path| {
+ this._decrementPathRefNoLock(file_path);
+ }
}
- watcher.mutex.unlock();
break;
}
}
@@ -605,7 +633,9 @@ pub const PathWatcherManager = struct {
return;
}
- if (this.pending_tasks > 0) {
+ if (this.hasPendingTasks()) {
+ this.mutex.lock();
+ defer this.mutex.unlock();
// deinit when all tasks are done
this.deinit_on_last_task = true;
return;
@@ -655,9 +685,10 @@ pub const PathWatcher = struct {
fsevents_watcher: ?*FSEvents.FSEventsWatcher,
mutex: Mutex,
pending_directories: u32 = 0,
- finalized: bool = false,
// only used on macOS
resolved_path: ?string = null,
+ has_pending_directories: std.atomic.Atomic(bool) = std.atomic.Atomic(bool).init(false),
+ closed: std.atomic.Atomic(bool) = std.atomic.Atomic(bool).init(false),
pub const ChangeEvent = struct {
hash: PathWatcherManager.Watcher.HashType = 0,
event_type: EventType = .change,
@@ -738,6 +769,44 @@ pub const PathWatcher = struct {
return this;
}
+ pub fn refPendingDirectory(this: *PathWatcher) bool {
+ @fence(.Release);
+ this.mutex.lock();
+ defer this.mutex.unlock();
+ if (this.isClosed()) return false;
+ this.pending_directories += 1;
+ this.has_pending_directories.store(true, .Release);
+ return true;
+ }
+
+ pub fn hasPendingDirectories(this: *PathWatcher) callconv(.C) bool {
+ @fence(.Acquire);
+ return this.has_pending_directories.load(.Acquire);
+ }
+
+ pub fn isClosed(this: *PathWatcher) bool {
+ @fence(.Acquire);
+ return this.closed.load(.Acquire);
+ }
+
+ pub fn setClosed(this: *PathWatcher) void {
+ this.mutex.lock();
+ defer this.mutex.unlock();
+ @fence(.Release);
+ this.closed.store(true, .Release);
+ }
+
+ pub fn unrefPendingDirectory(this: *PathWatcher) void {
+ @fence(.Release);
+ this.mutex.lock();
+ defer this.mutex.unlock();
+ this.pending_directories -= 1;
+ if (this.isClosed() and this.pending_directories == 0) {
+ this.has_pending_directories.store(false, .Release);
+ this.deinit();
+ }
+ }
+
pub fn emit(this: *PathWatcher, path: string, hash: PathWatcherManager.Watcher.HashType, time_stamp: i64, is_file: bool, event_type: EventType) void {
const time_diff = time_stamp - this.last_change_event.time_stamp;
// skip consecutive duplicates
@@ -746,24 +815,23 @@ pub const PathWatcher = struct {
this.last_change_event.event_type = event_type;
this.last_change_event.hash = hash;
this.needs_flush = true;
+ if (this.isClosed()) return;
this.callback(this.ctx, path, is_file, event_type);
}
}
pub fn flush(this: *PathWatcher) void {
this.needs_flush = false;
+ if (this.isClosed()) return;
this.flushCallback(this.ctx);
}
pub fn deinit(this: *PathWatcher) void {
- this.mutex.lock();
- this.finalized = true;
- if (this.pending_directories > 0) {
+ this.setClosed();
+ if (this.hasPendingDirectories()) {
// will be freed on last directory
- this.mutex.unlock();
return;
}
- this.mutex.unlock();
if (this.manager) |manager| {
if (comptime Environment.isMac) {