diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/bun.js/node/fs_events.zig | 29 | ||||
-rw-r--r-- | src/bun.js/node/node_fs_watcher.zig | 104 | ||||
-rw-r--r-- | src/bun.js/node/path_watcher.zig | 393 | ||||
-rw-r--r-- | src/bun.zig | 32 | ||||
-rw-r--r-- | src/js/node/fs.promises.ts | 56 | ||||
-rw-r--r-- | src/js/out/modules/node/fs.promises.js | 2 |
6 files changed, 423 insertions, 193 deletions
diff --git a/src/bun.js/node/fs_events.zig b/src/bun.js/node/fs_events.zig index cfcd50993..70b41fc33 100644 --- a/src/bun.js/node/fs_events.zig +++ b/src/bun.js/node/fs_events.zig @@ -354,25 +354,27 @@ pub const FSEventsLoop = struct { for (loop.watchers.slice()) |watcher| { if (watcher) |handle| { + const handle_path = handle.path; + for (paths, 0..) |path_ptr, i| { var flags = event_flags[i]; var path = path_ptr[0..bun.len(path_ptr)]; // Filter out paths that are outside handle's request - if (path.len < handle.path.len or !bun.strings.startsWith(path, handle.path)) { + if (path.len < handle_path.len or !bun.strings.startsWith(path, handle_path)) { continue; } const is_file = (flags & kFSEventStreamEventFlagItemIsDir) == 0; // Remove common prefix, unless the watched folder is "/" - if (!(handle.path.len == 1 and handle.path[0] == '/')) { - path = path[handle.path.len..]; + if (!(handle_path.len == 1 and handle_path[0] == '/')) { + path = path[handle_path.len..]; // Ignore events with path equal to directory itself if (path.len <= 1 and is_file) { continue; } if (path.len == 0) { - // Since we're using fsevents to watch the file itself, path == handle.path, and we now need to get the basename of the file back + // Since we're using fsevents to watch the file itself, path == handle_path, and we now need to get the basename of the file back while (path.len > 0) { if (bun.strings.startsWithChar(path, '/')) { path = path[1..]; @@ -403,7 +405,7 @@ pub const FSEventsLoop = struct { } } - handle.emit(path, is_file, is_rename); + handle.emit(path, is_file, if (is_rename) .rename else .change); } handle.flush(); } @@ -554,7 +556,7 @@ pub const FSEventsLoop = struct { this.signal_source = null; this.sem.deinit(); - this.mutex.deinit(); + if (this.watcher_count > 0) { while (this.watchers.popOrNull()) |watcher| { if (watcher) |w| { @@ -578,11 +580,18 @@ pub const FSEventsWatcher = struct { recursive: bool, ctx: ?*anyopaque, - const Callback = *const fn (ctx: ?*anyopaque, path: string, is_file: bool, is_rename: bool) void; - const UpdateEndCallback = *const fn (ctx: ?*anyopaque) void; + pub const EventType = enum { + rename, + change, + @"error", + }; + + pub const Callback = *const fn (ctx: ?*anyopaque, path: string, is_file: bool, event_type: EventType) void; + pub const UpdateEndCallback = *const fn (ctx: ?*anyopaque) void; pub fn init(loop: *FSEventsLoop, path: string, recursive: bool, callback: Callback, updateEnd: UpdateEndCallback, ctx: ?*anyopaque) *FSEventsWatcher { var this = bun.default_allocator.create(FSEventsWatcher) catch unreachable; + this.* = FSEventsWatcher{ .path = path, .callback = callback, @@ -596,8 +605,8 @@ pub const FSEventsWatcher = struct { return this; } - pub fn emit(this: *FSEventsWatcher, path: string, is_file: bool, is_rename: bool) void { - this.callback(this.ctx, path, is_file, is_rename); + pub fn emit(this: *FSEventsWatcher, path: string, is_file: bool, event_type: EventType) void { + this.callback(this.ctx, path, is_file, event_type); } pub fn flush(this: *FSEventsWatcher) void { diff --git a/src/bun.js/node/node_fs_watcher.zig b/src/bun.js/node/node_fs_watcher.zig index 72d9861c0..d6d2f8a2f 100644 --- a/src/bun.js/node/node_fs_watcher.zig +++ b/src/bun.js/node/node_fs_watcher.zig @@ -6,7 +6,6 @@ const Path = @import("../../resolver/resolve_path.zig"); const Encoder = JSC.WebCore.Encoder; const Mutex = @import("../../lock.zig").Lock; -const FSEvents = @import("./fs_events.zig"); const PathWatcher = @import("./path_watcher.zig"); const VirtualMachine = JSC.VirtualMachine; @@ -21,15 +20,12 @@ const Environment = bun.Environment; pub const FSWatcher = struct { ctx: *VirtualMachine, verbose: bool = false, - entry_path: ?string = null, - entry_dir: string = "", // JSObject mutex: Mutex, signal: ?*JSC.AbortSignal, persistent: bool, - default_watcher: ?*PathWatcher.PathWatcher, - fsevents_watcher: ?*FSEvents.FSEventsWatcher, + path_watcher: ?*PathWatcher.PathWatcher, poll_ref: JSC.PollRef = .{}, globalThis: *JSC.JSGlobalObject, js_this: JSC.JSValue, @@ -53,10 +49,6 @@ pub const FSWatcher = struct { pub fn deinit(this: *FSWatcher) void { // stop all managers and signals this.detach(); - if (this.entry_path) |path| { - this.entry_path = null; - bun.default_allocator.free(path); - } bun.default_allocator.destroy(this); } @@ -154,33 +146,7 @@ pub const FSWatcher = struct { } }; - pub fn onFSEventUpdate( - ctx: ?*anyopaque, - path: string, - is_file: bool, - is_rename: bool, - ) void { - // only called by FSEventUpdate - - const this = bun.cast(*FSWatcher, ctx.?); - - const relative_path = bun.default_allocator.dupe(u8, path) catch unreachable; - const event_type: FSWatchTask.EventType = if (is_rename) .rename else .change; - - if (this.verbose) { - if (is_file) { - Output.prettyErrorln("<r> <d>File changed: {s}<r>", .{relative_path}); - } else { - Output.prettyErrorln("<r> <d>Dir changed: {s}<r>", .{relative_path}); - } - } - - this.current_task.append(relative_path, event_type, true); - } - pub fn onPathUpdate(ctx: ?*anyopaque, path: string, is_file: bool, event_type: PathWatcher.PathWatcher.EventType) void { - // only called by PathWatcher - const this = bun.cast(*FSWatcher, ctx.?); const relative_path = bun.default_allocator.dupe(u8, path) catch unreachable; @@ -212,7 +178,6 @@ pub const FSWatcher = struct { Output.flush(); } // we only enqueue after all events are processed - // this is called by FSEventsWatcher or PathWatcher this.current_task.enqueue(); } @@ -558,14 +523,9 @@ pub const FSWatcher = struct { signal.detach(this); } - if (this.default_watcher) |default_watcher| { - this.default_watcher = null; - default_watcher.deinit(); - } - - if (this.fsevents_watcher) |fsevents_watcher| { - this.fsevents_watcher = null; - fsevents_watcher.deinit(); + if (this.path_watcher) |path_watcher| { + this.path_watcher = null; + path_watcher.deinit(); } if (this.persistent) { @@ -584,37 +544,6 @@ pub const FSWatcher = struct { this.deinit(); } - const PathResult = struct { - fd: StoredFileDescriptorType = 0, - is_file: bool = true, - }; - - // TODO: switch to using JSC.Maybe to avoid using "unreachable" and improve error messages - fn fdFromAbsolutePathZ( - absolute_path_z: [:0]const u8, - ) !PathResult { - if (std.fs.openIterableDirAbsoluteZ(absolute_path_z, .{ - .access_sub_paths = true, - })) |iterable_dir| { - return PathResult{ - .fd = iterable_dir.dir.fd, - .is_file = false, - }; - } else |err| { - if (err == error.NotDir) { - var file = try std.fs.openFileAbsoluteZ(absolute_path_z, .{ .mode = .read_only }); - return PathResult{ - .fd = file.handle, - .is_file = true, - }; - } else { - return err; - } - } - - unreachable; - } - pub fn init(args: Arguments) !*FSWatcher { var buf: [bun.MAX_PATH_BYTES + 1]u8 = undefined; var slice = args.path.slice(); @@ -635,8 +564,6 @@ pub const FSWatcher = struct { buf[file_path.len] = 0; var file_path_z = buf[0..file_path.len :0]; - var fs_type = try fdFromAbsolutePathZ(file_path_z); - var ctx = try bun.default_allocator.create(FSWatcher); const vm = args.global_this.bunVM(); ctx.* = .{ @@ -648,8 +575,7 @@ pub const FSWatcher = struct { .mutex = Mutex.init(), .signal = if (args.signal) |s| s.ref() else null, .persistent = args.persistent, - .default_watcher = null, - .fsevents_watcher = null, + .path_watcher = null, .globalThis = args.global_this, .js_this = .zero, .encoding = args.encoding, @@ -661,25 +587,7 @@ pub const FSWatcher = struct { errdefer ctx.deinit(); - if (comptime Environment.isMac) { - if (!fs_type.is_file) { - var dir_path_clone = bun.default_allocator.dupeZ(u8, file_path) catch unreachable; - ctx.entry_path = dir_path_clone; - ctx.entry_dir = dir_path_clone; - - ctx.fsevents_watcher = try FSEvents.watch(dir_path_clone, args.recursive, onFSEventUpdate, onUpdateEnd, bun.cast(*anyopaque, ctx)); - - ctx.initJS(args.listener); - return ctx; - } - } - - var file_path_clone = bun.default_allocator.dupeZ(u8, file_path) catch unreachable; - - ctx.entry_path = file_path_clone; - ctx.entry_dir = std.fs.path.dirname(file_path_clone) orelse file_path_clone; - ctx.default_watcher = try PathWatcher.watch(vm, file_path_clone, args.recursive, onPathUpdate, onUpdateEnd, bun.cast(*anyopaque, ctx)); - + ctx.path_watcher = try PathWatcher.watch(vm, file_path_z, args.recursive, onPathUpdate, onUpdateEnd, bun.cast(*anyopaque, ctx)); ctx.initJS(args.listener); return ctx; } diff --git a/src/bun.js/node/path_watcher.zig b/src/bun.js/node/path_watcher.zig index 5a22cb783..49caa93d0 100644 --- a/src/bun.js/node/path_watcher.zig +++ b/src/bun.js/node/path_watcher.zig @@ -4,6 +4,7 @@ const UnboundedQueue = @import("../unbounded_queue.zig").UnboundedQueue; const Path = @import("../../resolver/resolve_path.zig"); const Fs = @import("../../fs.zig"); const Mutex = @import("../../lock.zig").Lock; +const FSEvents = @import("./fs_events.zig"); const bun = @import("root").bun; const Output = bun.Output; @@ -30,9 +31,11 @@ pub const PathWatcherManager = struct { watcher_count: u32 = 0, vm: *JSC.VirtualMachine, file_paths: bun.StringHashMap(PathInfo), + current_fd_task: bun.FDHashMap(*DirectoryRegisterTask), deinit_on_last_watcher: bool = false, + pending_tasks: u32 = 0, + deinit_on_last_task: bool = false, mutex: Mutex, - const PathInfo = struct { fd: StoredFileDescriptorType = 0, is_file: bool = true, @@ -42,10 +45,14 @@ pub const PathWatcherManager = struct { hash: Watcher.HashType, }; + // TODO: switch to using JSC.Maybe to avoid using "unreachable" and improve error messages fn _fdFromAbsolutePathZ( this: *PathWatcherManager, path: [:0]const u8, ) !PathInfo { + this.mutex.lock(); + defer this.mutex.unlock(); + if (this.file_paths.getEntry(path)) |entry| { var info = entry.value_ptr; info.refs += 1; @@ -54,42 +61,39 @@ pub const PathWatcherManager = struct { const cloned_path = try bun.default_allocator.dupeZ(u8, path); errdefer bun.default_allocator.destroy(cloned_path); - var stat = try bun.C.lstat_absolute(cloned_path); - var result = PathInfo{ - .path = cloned_path, - .dirname = cloned_path, - .hash = Watcher.getHash(cloned_path), - .refs = 1, - }; - - switch (stat.kind) { - .sym_link => { + if (std.fs.openIterableDirAbsoluteZ(cloned_path, .{ + .access_sub_paths = true, + })) |iterable_dir| { + const result = PathInfo{ + .fd = iterable_dir.dir.fd, + .is_file = false, + .path = cloned_path, + .dirname = cloned_path, + .hash = Watcher.getHash(cloned_path), + .refs = 1, + }; + _ = try this.file_paths.put(cloned_path, result); + return result; + } else |err| { + if (err == error.NotDir) { var file = try std.fs.openFileAbsoluteZ(cloned_path, .{ .mode = .read_only }); - result.fd = file.handle; - const _stat = try file.stat(); - - result.is_file = _stat.kind != .directory; - if (result.is_file) { - result.dirname = std.fs.path.dirname(cloned_path) orelse cloned_path; - } - }, - .directory => { - const dir = (try std.fs.openIterableDirAbsoluteZ(cloned_path, .{ - .access_sub_paths = true, - })).dir; - result.fd = dir.fd; - result.is_file = false; - }, - else => { - const file = try std.fs.openFileAbsoluteZ(cloned_path, .{ .mode = .read_only }); - result.fd = file.handle; - result.is_file = true; - result.dirname = std.fs.path.dirname(cloned_path) orelse cloned_path; - }, + const result = PathInfo{ + .fd = file.handle, + .is_file = true, + .path = cloned_path, + // if is really a file we need to get the dirname + .dirname = std.fs.path.dirname(cloned_path) orelse cloned_path, + .hash = Watcher.getHash(cloned_path), + .refs = 1, + }; + _ = try this.file_paths.put(cloned_path, result); + return result; + } else { + return err; + } } - _ = try this.file_paths.put(cloned_path, result); - return result; + unreachable; } pub fn init(vm: *JSC.VirtualMachine) !*PathWatcherManager { @@ -102,6 +106,7 @@ pub const PathWatcherManager = struct { errdefer watchers.deinitWithAllocator(bun.default_allocator); var manager = PathWatcherManager{ .file_paths = bun.StringHashMap(PathInfo).init(bun.default_allocator), + .current_fd_task = bun.FDHashMap(*DirectoryRegisterTask).init(bun.default_allocator), .watchers = watchers, .main_watcher = try Watcher.init( this, @@ -142,6 +147,7 @@ pub const PathWatcherManager = struct { const watchers = this.watchers.slice(); for (events) |event| { + if (event.index >= file_paths.len) continue; const file_path = file_paths[event.index]; const update_count = counts[event.index] + 1; counts[event.index] = update_count; @@ -168,6 +174,9 @@ pub const PathWatcherManager = struct { for (watchers) |w| { if (w) |watcher| { + if (comptime Environment.isMac) { + if (watcher.fsevents_watcher != null) continue; + } const entry_point = watcher.path.dirname; var path = file_path; @@ -238,6 +247,9 @@ pub const PathWatcherManager = struct { const event_type: PathWatcher.EventType = .rename; // renaming folders, creating folder or files will be always be rename for (watchers) |w| { if (w) |watcher| { + if (comptime Environment.isMac) { + if (watcher.fsevents_watcher != null) continue; + } const entry_point = watcher.path.dirname; var path = path_slice; @@ -297,6 +309,7 @@ pub const PathWatcherManager = struct { // stop all watchers for (watchers) |w| { if (w) |watcher| { + log("[watch] error: {s}", .{@errorName(err)}); watcher.emit(@errorName(err), 0, timestamp, false, .@"error"); watcher.flush(); } @@ -312,47 +325,176 @@ pub const PathWatcherManager = struct { this.deinit(); } - fn addDirectory(this: *PathWatcherManager, watcher: *PathWatcher, path: PathInfo, buf: *[bun.MAX_PATH_BYTES + 1]u8) !void { - const fd = path.fd; - try this.main_watcher.addDirectory(fd, path.path, path.hash, false); + pub const DirectoryRegisterTask = struct { + manager: *PathWatcherManager, + path: PathInfo, + task: JSC.WorkPoolTask = .{ .callback = callback }, + watcher_list: bun.BabyList(*PathWatcher) = .{}, - var iter = (std.fs.IterableDir{ .dir = std.fs.Dir{ - .fd = fd, - } }).iterate(); + pub fn callback(task: *JSC.WorkPoolTask) void { + var routine = @fieldParentPtr(@This(), "task", task); + defer routine.deinit(); + routine.run(); + } - while (try iter.next()) |entry| { - var parts = [2]string{ path.path, entry.name }; - var entry_path = Path.joinAbsStringBuf( - Fs.FileSystem.instance.topLevelDirWithoutTrailingSlash(), - buf, - &parts, - .auto, - ); + fn schedule(manager: *PathWatcherManager, watcher: *PathWatcher, path: PathInfo) !void { + manager.mutex.lock(); + defer manager.mutex.unlock(); + // keep the path alive + manager._incrementPathRefNoLock(path.path); + errdefer manager._decrementPathRef(path.path); + + // use the same thread for the same fd to avoid race conditions + if (manager.current_fd_task.getEntry(path.fd)) |entry| { + var routine = entry.value_ptr.*; + watcher.mutex.lock(); + defer watcher.mutex.unlock(); + watcher.pending_directories += 1; + routine.watcher_list.push(bun.default_allocator, watcher) catch |err| { + watcher.pending_directories -= 1; + return err; + }; + return; + } + var routine = try bun.default_allocator.create(DirectoryRegisterTask); + routine.* = DirectoryRegisterTask{ + .manager = manager, + .path = path, + .watcher_list = bun.BabyList(*PathWatcher).initCapacity(bun.default_allocator, 1) catch |err| { + bun.default_allocator.destroy(routine); + return err; + }, + }; + errdefer routine.deinit(); + try routine.watcher_list.push(bun.default_allocator, watcher); + watcher.mutex.lock(); + defer watcher.mutex.unlock(); + watcher.pending_directories += 1; + + manager.current_fd_task.put(path.fd, routine) catch |err| { + watcher.pending_directories -= 1; + return err; + }; + manager.pending_tasks += 1; + JSC.WorkPool.schedule(&routine.task); + return; + } - buf[entry_path.len] = 0; - var entry_path_z = buf[0..entry_path.len :0]; + fn getNext(this: *DirectoryRegisterTask) ?*PathWatcher { + this.manager.mutex.lock(); + defer this.manager.mutex.unlock(); - var child_path = try this._fdFromAbsolutePathZ(entry_path_z); - errdefer this._decrementPathRef(entry_path_z); - try watcher.file_paths.push(bun.default_allocator, child_path.path); + const watcher = this.watcher_list.popOrNull(); + if (watcher == null) { + // no more work todo, release the fd and path + _ = this.manager.current_fd_task.remove(this.path.fd); + this.manager._decrementPathRefNoLock(this.path.path); + return null; + } + return watcher; + } - if (child_path.is_file) { - try this.main_watcher.addFile(child_path.fd, child_path.path, child_path.hash, options.Loader.file, 0, null, false); - } else { - if (watcher.recursive) { - try this.addDirectory(watcher, child_path, buf); + fn processWatcher( + this: *DirectoryRegisterTask, + watcher: *PathWatcher, + buf: *[bun.MAX_PATH_BYTES + 1]u8, + ) !void { + const manager = this.manager; + const path = this.path; + const fd = path.fd; + var iter = (std.fs.IterableDir{ .dir = std.fs.Dir{ + .fd = fd, + } }).iterate(); + defer { + watcher.mutex.lock(); + watcher.pending_directories -= 1; + + if (watcher.pending_directories == 0 and watcher.finalized) { + watcher.mutex.unlock(); + watcher.deinit(); + } else { + watcher.mutex.unlock(); + } + } + + // now we iterate over all files and directories + while (try iter.next()) |entry| { + var parts = [2]string{ path.path, entry.name }; + var entry_path = Path.joinAbsStringBuf( + Fs.FileSystem.instance.topLevelDirWithoutTrailingSlash(), + buf, + &parts, + .auto, + ); + + buf[entry_path.len] = 0; + var entry_path_z = buf[0..entry_path.len :0]; + + var child_path = try manager._fdFromAbsolutePathZ(entry_path_z); + watcher.mutex.lock(); + watcher.file_paths.push(bun.default_allocator, child_path.path) catch |err| { + watcher.mutex.unlock(); + manager._decrementPathRef(entry_path_z); + return err; + }; + watcher.mutex.unlock(); + + // we need to call this unlocked + if (child_path.is_file) { + try manager.main_watcher.addFile(child_path.fd, child_path.path, child_path.hash, options.Loader.file, 0, null, false); + } else { + if (watcher.recursive and !watcher.finalized) { + // this may trigger another thread with is desired when available to watch long trees + try manager._addDirectory(watcher, child_path); + } } } } + + fn run(this: *DirectoryRegisterTask) void { + var buf: [bun.MAX_PATH_BYTES + 1]u8 = undefined; + + while (this.getNext()) |watcher| { + this.processWatcher(watcher, &buf) catch |err| { + log("[watch] error registering directory: {s}", .{@errorName(err)}); + watcher.emit(@errorName(err), 0, std.time.milliTimestamp(), false, .@"error"); + watcher.flush(); + }; + } + + this.manager.mutex.lock(); + this.manager.pending_tasks -= 1; + if (this.manager.deinit_on_last_task and this.manager.pending_tasks == 0) { + this.manager.mutex.unlock(); + this.manager.deinit(); + } else { + this.manager.mutex.unlock(); + } + } + + fn deinit(this: *DirectoryRegisterTask) void { + bun.default_allocator.destroy(this); + } + }; + + // this should only be called if thread pool is not null + fn _addDirectory(this: *PathWatcherManager, watcher: *PathWatcher, path: PathInfo) !void { + const fd = path.fd; + try this.main_watcher.addDirectory(fd, path.path, path.hash, false); + + return try DirectoryRegisterTask.schedule(this, watcher, path); } fn registerWatcher(this: *PathWatcherManager, watcher: *PathWatcher) !void { this.mutex.lock(); - defer this.mutex.unlock(); if (this.watcher_count == this.watchers.len) { this.watcher_count += 1; - this.watchers.push(bun.default_allocator, watcher) catch unreachable; + this.watchers.push(bun.default_allocator, watcher) catch |err| { + this.watcher_count -= 1; + this.mutex.unlock(); + return err; + }; } else { var watchers = this.watchers.slice(); for (watchers, 0..) |w, i| { @@ -363,16 +505,32 @@ pub const PathWatcherManager = struct { } } } + + this.mutex.unlock(); + const path = watcher.path; if (path.is_file) { try this.main_watcher.addFile(path.fd, path.path, path.hash, options.Loader.file, 0, null, false); } else { - var buf: [bun.MAX_PATH_BYTES + 1]u8 = undefined; - try this.addDirectory(watcher, path, &buf); + if (comptime Environment.isMac) { + if (watcher.fsevents_watcher != null) { + return; + } + } + try this._addDirectory(watcher, path); } } - fn _decrementPathRef(this: *PathWatcherManager, file_path: [:0]const u8) void { + fn _incrementPathRefNoLock(this: *PathWatcherManager, file_path: [:0]const u8) void { + if (this.file_paths.getEntry(file_path)) |entry| { + var path = entry.value_ptr; + if (path.refs > 0) { + path.refs += 1; + } + } + } + + fn _decrementPathRefNoLock(this: *PathWatcherManager, file_path: [:0]const u8) void { if (this.file_paths.getEntry(file_path)) |entry| { var path = entry.value_ptr; if (path.refs > 0) { @@ -387,6 +545,12 @@ pub const PathWatcherManager = struct { } } + fn _decrementPathRef(this: *PathWatcherManager, file_path: [:0]const u8) void { + this.mutex.lock(); + defer this.mutex.unlock(); + this._decrementPathRefNoLock(file_path); + } + fn unregisterWatcher(this: *PathWatcherManager, watcher: *PathWatcher) void { this.mutex.lock(); defer this.mutex.unlock(); @@ -408,9 +572,18 @@ pub const PathWatcherManager = struct { } this.watcher_count -= 1; + this._decrementPathRefNoLock(watcher.path.path); + if (comptime Environment.isMac) { + if (watcher.fsevents_watcher != null) { + break; + } + } + + watcher.mutex.lock(); while (watcher.file_paths.popOrNull()) |file_path| { - this._decrementPathRef(file_path); + this._decrementPathRefNoLock(file_path); } + watcher.mutex.unlock(); break; } } @@ -432,6 +605,12 @@ pub const PathWatcherManager = struct { return; } + if (this.pending_tasks > 0) { + // deinit when all tasks are done + this.deinit_on_last_task = true; + return; + } + this.main_watcher.deinit(false); if (this.watcher_count > 0) { @@ -454,7 +633,8 @@ pub const PathWatcherManager = struct { this.file_paths.deinit(); this.watchers.deinitWithAllocator(bun.default_allocator); - // this.mutex.deinit(); + + this.current_fd_task.deinit(); bun.default_allocator.destroy(this); } @@ -471,7 +651,13 @@ pub const PathWatcher = struct { // all watched file paths (including subpaths) except by path it self file_paths: bun.BabyList([:0]const u8) = .{}, last_change_event: ChangeEvent = .{}, - + // on MacOS we use this to watch for changes on directories and subdirectories + fsevents_watcher: ?*FSEvents.FSEventsWatcher, + mutex: Mutex, + pending_directories: u32 = 0, + finalized: bool = false, + // only used on macOS + resolved_path: ?string = null, pub const ChangeEvent = struct { hash: PathWatcherManager.Watcher.HashType = 0, event_type: EventType = .change, @@ -488,13 +674,58 @@ pub const PathWatcher = struct { pub fn init(manager: *PathWatcherManager, path: PathWatcherManager.PathInfo, recursive: bool, callback: Callback, updateEndCallback: UpdateEndCallback, ctx: ?*anyopaque) !*PathWatcher { var this = try bun.default_allocator.create(PathWatcher); + + if (comptime Environment.isMac) { + if (!path.is_file) { + var buffer: [bun.MAX_PATH_BYTES]u8 = undefined; + const resolved_path_temp = std.os.getFdPath(path.fd, &buffer) catch |err| { + bun.default_allocator.destroy(this); + return err; + }; + const resolved_path = bun.default_allocator.dupeZ(u8, resolved_path_temp) catch |err| { + bun.default_allocator.destroy(this); + return err; + }; + this.resolved_path = resolved_path; + this.* = PathWatcher{ + .path = path, + .callback = callback, + .fsevents_watcher = FSEvents.watch( + resolved_path, + recursive, + bun.cast(FSEvents.FSEventsWatcher.Callback, callback), + bun.cast(FSEvents.FSEventsWatcher.UpdateEndCallback, updateEndCallback), + bun.cast(*anyopaque, ctx), + ) catch |err| { + bun.default_allocator.destroy(this); + return err; + }, + .manager = manager, + .recursive = recursive, + .flushCallback = updateEndCallback, + .file_paths = .{}, + .ctx = ctx, + .mutex = Mutex.init(), + }; + + errdefer this.deinit(); + + // TODO: unify better FSEvents with PathWatcherManager + try manager.registerWatcher(this); + + return this; + } + } + this.* = PathWatcher{ + .fsevents_watcher = null, .path = path, .callback = callback, .manager = manager, .recursive = recursive, .flushCallback = updateEndCallback, .ctx = ctx, + .mutex = Mutex.init(), .file_paths = bun.BabyList([:0]const u8).initCapacity(bun.default_allocator, 1) catch |err| { bun.default_allocator.destroy(this); return err; @@ -525,10 +756,36 @@ pub const PathWatcher = struct { } pub fn deinit(this: *PathWatcher) void { + this.mutex.lock(); + this.finalized = true; + if (this.pending_directories > 0) { + // will be freed on last directory + this.mutex.unlock(); + return; + } + this.mutex.unlock(); + if (this.manager) |manager| { - manager.unregisterWatcher(this); + if (comptime Environment.isMac) { + if (this.fsevents_watcher) |watcher| { + // first unregister on FSEvents + watcher.deinit(); + manager.unregisterWatcher(this); + } else { + manager.unregisterWatcher(this); + this.file_paths.deinitWithAllocator(bun.default_allocator); + } + } else { + manager.unregisterWatcher(this); + this.file_paths.deinitWithAllocator(bun.default_allocator); + } + } + + if (comptime Environment.isMac) { + if (this.resolved_path) |path| { + bun.default_allocator.free(path); + } } - this.file_paths.deinitWithAllocator(bun.default_allocator); bun.default_allocator.destroy(this); } diff --git a/src/bun.zig b/src/bun.zig index 494e49cb4..8dd888b69 100644 --- a/src/bun.zig +++ b/src/bun.zig @@ -729,6 +729,34 @@ pub fn getenvZ(path_: [:0]const u8) ?[]const u8 { return sliceTo(ptr, 0); } +//TODO: add windows support +pub const FDHashMapContext = struct { + pub fn hash(_: @This(), fd: FileDescriptor) u64 { + return @as(u64, @intCast(fd)); + } + pub fn eql(_: @This(), a: FileDescriptor, b: FileDescriptor) bool { + return a == b; + } + pub fn pre(input: FileDescriptor) Prehashed { + return Prehashed{ + .value = @This().hash(.{}, input), + .input = input, + }; + } + + pub const Prehashed = struct { + value: u64, + input: FileDescriptor, + pub fn hash(this: @This(), fd: FileDescriptor) u64 { + if (fd == this.input) return this.value; + return @as(u64, @intCast(fd)); + } + + pub fn eql(_: @This(), a: FileDescriptor, b: FileDescriptor) bool { + return a == b; + } + }; +}; // These wrappers exist to use our strings.eqlLong function pub const StringArrayHashMapContext = struct { pub fn hash(_: @This(), s: []const u8) u32 { @@ -831,6 +859,10 @@ pub fn StringHashMapUnmanaged(comptime Type: type) type { return std.HashMapUnmanaged([]const u8, Type, StringHashMapContext, std.hash_map.default_max_load_percentage); } +pub fn FDHashMap(comptime Type: type) type { + return std.HashMap(StoredFileDescriptorType, Type, FDHashMapContext, std.hash_map.default_max_load_percentage); +} + const CopyFile = @import("./copy_file.zig"); pub const copyFileRange = CopyFile.copyFileRange; pub const copyFile = CopyFile.copyFile; diff --git a/src/js/node/fs.promises.ts b/src/js/node/fs.promises.ts index 0bf6eb9b2..bdbacd27d 100644 --- a/src/js/node/fs.promises.ts +++ b/src/js/node/fs.promises.ts @@ -2,6 +2,7 @@ // Note: `constants` is injected into the top of this file declare var constants: typeof import("node:fs/promises").constants; +const { createFIFO } = $lazy("primordials"); var fs = Bun.fs(); @@ -26,7 +27,7 @@ export function watch( eventType: string; filename: string | Buffer | undefined; }; - const events: Array<Event> = []; + if (filename instanceof URL) { throw new TypeError("Watch URLs are not supported yet"); } else if (Buffer.isBuffer(filename)) { @@ -38,32 +39,55 @@ export function watch( if (typeof options === "string") { options = { encoding: options }; } - fs.watch(filename, options || {}, (eventType: string, filename: string | Buffer | undefined) => { - events.push({ eventType, filename }); + const queue = createFIFO(); + + const watcher = fs.watch(filename, options || {}, (eventType: string, filename: string | Buffer | undefined) => { + queue.push({ eventType, filename }); if (nextEventResolve) { const resolve = nextEventResolve; nextEventResolve = null; resolve(); } }); + return { - async *[Symbol.asyncIterator]() { + [Symbol.asyncIterator]() { let closed = false; - while (!closed) { - while (events.length) { - let event = events.shift() as Event; - if (event.eventType === "close") { - closed = true; - break; + return { + async next() { + while (!closed) { + let event: Event; + while ((event = queue.shift() as Event)) { + if (event.eventType === "close") { + closed = true; + return { value: undefined, done: true }; + } + if (event.eventType === "error") { + closed = true; + throw event.filename; + } + return { value: event, done: false }; + } + const { promise, resolve } = Promise.withResolvers(); + nextEventResolve = resolve; + await promise; } - if (event.eventType === "error") { + return { value: undefined, done: true }; + }, + + return() { + if (!closed) { + watcher.close(); closed = true; - throw event.filename; + if (nextEventResolve) { + const resolve = nextEventResolve; + nextEventResolve = null; + resolve(); + } } - yield event; - } - await new Promise((resolve: Function) => (nextEventResolve = resolve)); - } + return { value: undefined, done: true }; + }, + }; }, }; } diff --git a/src/js/out/modules/node/fs.promises.js b/src/js/out/modules/node/fs.promises.js index 852836098..df383069b 100644 --- a/src/js/out/modules/node/fs.promises.js +++ b/src/js/out/modules/node/fs.promises.js @@ -1 +1 @@ -var o=(S)=>{return import.meta.require(S)};function G(S,U={}){const A=[];if(S instanceof URL)throw new TypeError("Watch URLs are not supported yet");else if(Buffer.isBuffer(S))S=S.toString();else if(typeof S!=="string")throw new TypeError("Expected path to be a string or Buffer");let z=null;if(typeof U==="string")U={encoding:U};return C.watch(S,U||{},(q,g)=>{if(A.push({eventType:q,filename:g}),z){const B=z;z=null,B()}}),{async*[Symbol.asyncIterator](){let q=!1;while(!q){while(A.length){let g=A.shift();if(g.eventType==="close"){q=!0;break}if(g.eventType==="error")throw q=!0,g.filename;yield g}await new Promise((g)=>z=g)}}}}var C=Bun.fs(),D="::bunternal::",J={[D]:(S)=>{return async function(...U){return await 1,S.apply(C,U)}}}[D],H=J(C.accessSync),I=J(C.appendFileSync),K=J(C.closeSync),L=J(C.copyFileSync),M=J(C.existsSync),N=J(C.chownSync),O=J(C.chmodSync),P=J(C.fchmodSync),Q=J(C.fchownSync),V=J(C.fstatSync),X=J(C.fsyncSync),Y=J(C.ftruncateSync),Z=J(C.futimesSync),_=J(C.lchmodSync),$=J(C.lchownSync),T=J(C.linkSync),W=C.lstat.bind(C),j=J(C.mkdirSync),x=J(C.mkdtempSync),E=J(C.openSync),F=J(C.readSync),k=J(C.writeSync),R=C.readdir.bind(C),w=C.readFile.bind(C),h=J(C.writeFileSync),b=J(C.readlinkSync),u=J(C.realpathSync),d=J(C.renameSync),c=C.stat.bind(C),v=J(C.symlinkSync),a=J(C.truncateSync),l=J(C.unlinkSync),y=J(C.utimesSync),p=J(C.lutimesSync),m=J(C.rmSync),n=J(C.rmdirSync),t=(S,U,A)=>{return new Promise((z,q)=>{try{var g=C.writevSync(S,U,A)}catch(B){q(B);return}z({bytesWritten:g,buffers:U})})},r=(S,U,A)=>{return new Promise((z,q)=>{try{var g=C.readvSync(S,U,A)}catch(B){q(B);return}z({bytesRead:g,buffers:U})})},i={access:H,appendFile:I,close:K,copyFile:L,exists:M,chown:N,chmod:O,fchmod:P,fchown:Q,fstat:V,fsync:X,ftruncate:Y,futimes:Z,lchmod:_,lchown:$,link:T,lstat:W,mkdir:j,mkdtemp:x,open:E,read:F,write:k,readdir:R,readFile:w,writeFile:h,readlink:b,realpath:u,rename:d,stat:c,symlink:v,truncate:a,unlink:l,utimes:y,lutimes:p,rm:m,rmdir:n,watch:G,writev:t,readv:r,constants,[Symbol.for("CommonJS")]:0};export{t as writev,h as writeFile,k as write,G as watch,y as utimes,l as unlink,a as truncate,v as symlink,c as stat,n as rmdir,m as rm,d as rename,u as realpath,r as readv,b as readlink,R as readdir,w as readFile,F as read,E as open,x as mkdtemp,j as mkdir,p as lutimes,W as lstat,T as link,$ as lchown,_ as lchmod,Z as futimes,Y as ftruncate,X as fsync,V as fstat,Q as fchown,P as fchmod,M as exists,i as default,L as copyFile,K as close,N as chown,O as chmod,I as appendFile,H as access}; +var s=(z)=>{return import.meta.require(z)};function N(z,B={}){if(z instanceof URL)throw new TypeError("Watch URLs are not supported yet");else if(Buffer.isBuffer(z))z=z.toString();else if(typeof z!=="string")throw new TypeError("Expected path to be a string or Buffer");let C=null;if(typeof B==="string")B={encoding:B};const G=M(),H=S.watch(z,B||{},(D,A)=>{if(G.push({eventType:D,filename:A}),C){const I=C;C=null,I()}});return{[Symbol.asyncIterator](){let D=!1;return{async next(){while(!D){let A;while(A=G.shift()){if(A.eventType==="close")return D=!0,{value:void 0,done:!0};if(A.eventType==="error")throw D=!0,A.filename;return{value:A,done:!1}}const{promise:I,resolve:L}=Promise.withResolvers();C=L,await I}return{value:void 0,done:!0}},return(){if(!D){if(H.close(),D=!0,C){const A=C;C=null,A()}}return{value:void 0,done:!0}}}}}}var{createFIFO:M}=globalThis[Symbol.for("Bun.lazy")]("primordials"),S=Bun.fs(),K="::bunternal::",J={[K]:(z)=>{return async function(...B){return await 1,z.apply(S,B)}}}[K],P=J(S.accessSync),Q=J(S.appendFileSync),U=J(S.closeSync),V=J(S.copyFileSync),X=J(S.existsSync),Y=J(S.chownSync),Z=J(S.chmodSync),_=J(S.fchmodSync),$=J(S.fchownSync),q=J(S.fstatSync),O=J(S.fsyncSync),g=J(S.ftruncateSync),T=J(S.futimesSync),W=J(S.lchmodSync),j=J(S.lchownSync),k=J(S.linkSync),E=S.lstat.bind(S),h=J(S.mkdirSync),w=J(S.mkdtempSync),x=J(S.openSync),F=J(S.readSync),R=J(S.writeSync),b=S.readdir.bind(S),u=S.readFile.bind(S),d=J(S.writeFileSync),c=J(S.readlinkSync),v=J(S.realpathSync),a=J(S.renameSync),y=S.stat.bind(S),l=J(S.symlinkSync),p=J(S.truncateSync),m=J(S.unlinkSync),n=J(S.utimesSync),t=J(S.lutimesSync),r=J(S.rmSync),o=J(S.rmdirSync),f=(z,B,C)=>{return new Promise((G,H)=>{try{var D=S.writevSync(z,B,C)}catch(A){H(A);return}G({bytesWritten:D,buffers:B})})},i=(z,B,C)=>{return new Promise((G,H)=>{try{var D=S.readvSync(z,B,C)}catch(A){H(A);return}G({bytesRead:D,buffers:B})})},SS={access:P,appendFile:Q,close:U,copyFile:V,exists:X,chown:Y,chmod:Z,fchmod:_,fchown:$,fstat:q,fsync:O,ftruncate:g,futimes:T,lchmod:W,lchown:j,link:k,lstat:E,mkdir:h,mkdtemp:w,open:x,read:F,write:R,readdir:b,readFile:u,writeFile:d,readlink:c,realpath:v,rename:a,stat:y,symlink:l,truncate:p,unlink:m,utimes:n,lutimes:t,rm:r,rmdir:o,watch:N,writev:f,readv:i,constants,[Symbol.for("CommonJS")]:0};export{f as writev,d as writeFile,R as write,N as watch,n as utimes,m as unlink,p as truncate,l as symlink,y as stat,o as rmdir,r as rm,a as rename,v as realpath,i as readv,c as readlink,b as readdir,u as readFile,F as read,x as open,w as mkdtemp,h as mkdir,t as lutimes,E as lstat,k as link,j as lchown,W as lchmod,T as futimes,g as ftruncate,O as fsync,q as fstat,$ as fchown,_ as fchmod,X as exists,SS as default,V as copyFile,U as close,Y as chown,Z as chmod,Q as appendFile,P as access}; |